hdfs_merge_small_file.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. import os
  2. import re
  3. import subprocess
  4. import sys
  5. abspath = os.path.abspath(__file__)
  6. root_path = re.sub(r"poyee-data-warehouse.*", "poyee-data-warehouse", abspath)
  7. sys.path.append(root_path)
  8. from pyspark.sql import SparkSession
  9. from dw_base import NORM_MGT, NORM_GRN
  10. from dw_base.utils.log_utils import pretty_print
  11. """
  12. author: HQL
  13. create_time:2024-05-13
  14. update_time:2024-05-14
  15. remarks: 该脚本内方法为了实现hdfs小文件文件数据合并的方案,其中有两种方式合并
  16. 1.merge_hdfs_file(db_name, table_name, partition_name, file_merge_size) 建议就是hive数据表方式(不支持外部表,外部表下面得绝对路径)
  17. db_name: 数据库名称
  18. table_name: 表名称
  19. partition_name: 分区目录,支持多级,写法为 例如dt=20240202/topic=aaaa
  20. file_merge_size: 合并文件大小,默认128MB
  21. 2.merge_hdfs_file_absolute_path(file_path, file_merge_size, file_type) 建议直接对文件绝对路径下的数据进行合并(例如:/user/dev005/workspace/flume_test/dt=20240418)
  22. file_path: 文件绝对路径
  23. file_merge_size: 文件拆分大小
  24. file_type: 文件类型,例如orc,parquet,text,默认text
  25. 3.scan_hdfs_in_hive_tbl(table_name) 传入指定数据库下表名(db_name.table_name),返回当前表优化建议和小文件数据量以及空间占用情况
  26. 如需实现全库扫描,需要自己写个show tables ,然后遍历scan_hdfs_in_hive_tbl方法传入表名(采用dfs语法,速度一般,还是建议运维直接拉mysql中hive元数据)
  27. """
  28. def get_spark_session():
  29. """
  30. Returns:返回sparkSession
  31. """
  32. session = SparkSession.builder \
  33. .appName("collect_hdfs_file") \
  34. .master("yarn") \
  35. .config('spark.default.parallelism', 8) \
  36. .config('spark.driver.cores', 1) \
  37. .config('spark.driver.memory', '1g') \
  38. .config('spark.executor.cores', 2) \
  39. .config('spark.executor.instances', 4) \
  40. .config('spark.executor.memory', '6g') \
  41. .config('spark.sql.shuffle.partitions', '8') \
  42. .config('spark.yarn.queue', 'spark') \
  43. .config('spark.sql.hive.convertMetastoreOrc', 'false')
  44. return session
  45. def get_hive_mete():
  46. df = spark.read.format("jdbc") \
  47. .option("driver", "com.mysql.cj.jdbc.Driver") \
  48. .option("url", "jdbc:mysql://m3:3306/hive") \
  49. .option("dbtable", "TBLS") \
  50. .option("user", "hive") \
  51. .option("password", "Tendata_hive2024") \
  52. .load()
  53. df.show(10)
  54. def get_hive_context():
  55. """
  56. Returns:返回 hiveContext对象
  57. """
  58. session = get_spark_session()
  59. hive = session.enableHiveSupport() \
  60. .config('hive.input.format', 'org.apache.hadoop.hive.ql.io.CombineHiveInputFormat') \
  61. .config('hive.exec.dynamic.partition.mode', 'nonstrict') \
  62. .config('hive.exec.dynamic.partition', 'true') \
  63. .getOrCreate()
  64. return hive
  65. spark = get_spark_session().getOrCreate()
  66. hive = get_hive_context()
  67. def hdfs_estimate_num_partitions(db_name, table_name, partition_name, file_merge_size):
  68. """
  69. Args:
  70. db_name: 数据库名称
  71. table_name: 表名称
  72. partition_name: 分区目录信息 例如dt=20240202/topic=aaaa
  73. file_merge_size: 合并文件大小,默认128
  74. Returns: 返回应该拆分的文件切片数
  75. """
  76. if file_merge_size is None or file_merge_size == '':
  77. file_merge_size = 128
  78. if db_name is None or table_name is None or partition_name == '':
  79. pretty_print(
  80. f'{NORM_MGT}请输入正确的db_name:{db_name}, table_name:{table_name}, partition_name:{partition_name}\n{NORM_GRN}')
  81. sys.exit()
  82. if partition_name is None:
  83. pretty_print(f'{NORM_MGT}非分区数据表\n{NORM_GRN}')
  84. # warehouse_path 文件路径
  85. default_warehouse_path = "/user/hive/warehouse/"
  86. # file_path 文件绝对路径位置
  87. if partition_name is not None:
  88. file_path = default_warehouse_path + db_name + ".db/" + table_name + "/" + partition_name
  89. else:
  90. file_path = default_warehouse_path + db_name + ".db/" + table_name
  91. # 构建 Hadoop 命令
  92. command = ["hadoop", "fs", "-du", "-s", file_path]
  93. # 执行命令
  94. result = subprocess.check_output(command).decode("utf-8")
  95. # 解析命令输出,提取文件大小
  96. file_size = result.strip().split()[0]
  97. pretty_print(f'{NORM_MGT}{file_path}未压缩前,当前路径下文件共计{file_size}byte\n{NORM_GRN}')
  98. # 按照fileMergeSize 传入大小 进行切分,获取应该切分多少片
  99. if file_size == 0:
  100. return 0
  101. else:
  102. return int(int(file_size) / file_merge_size / 1024 / 1024) + 1
  103. def hdfs_estimate_num_partitions_absolute_path(file_path, file_merge_size=128):
  104. """
  105. Args:
  106. file_path: 文件绝对路径
  107. file_merge_size: 合并文件大小,默认128
  108. Returns:返回应该拆分的文件切片数
  109. """
  110. # 构建 Hadoop 命令
  111. command = ["hadoop", "fs", "-du", "-s", file_path]
  112. # 执行命令
  113. result = subprocess.check_output(command).decode("utf-8")
  114. # 解析命令输出,提取文件大小
  115. file_size = result.strip().split()[0]
  116. pretty_print(f'{NORM_MGT}{file_path}未压缩前,当前路径下文件共计{file_size}byte\n{NORM_GRN}')
  117. # 按照fileMergeSize 传入大小 进行切分,获取应该切分多少片
  118. if file_size == 0:
  119. return 0
  120. else:
  121. return int(int(file_size) / file_merge_size / 1024 / 1024) + 1
  122. def merge_hdfs_file(db_name, table_name, partitions, file_merge_size):
  123. """
  124. Args:
  125. db_name: 数据库名称
  126. table_name: 表名称
  127. partitions: 分区目录信息 例如dt=20240202/topic=aaaa
  128. file_merge_size: 合并文件大小,默认128
  129. Returns:返回合并后文件个数 num_partitions
  130. """
  131. if db_name is None or table_name is None or partitions == '':
  132. pretty_print(
  133. f'{NORM_MGT}请输入正确的db_name:{db_name}, table_name:{table_name}, partition_name:{partitions}\n{NORM_GRN}')
  134. sys.exit()
  135. # 获取hive on spark 配置环境
  136. # 获取应该压缩多个数据分片
  137. num_partitions = hdfs_estimate_num_partitions(db_name, table_name, partitions, file_merge_size)
  138. # 当解析分区内文件为0的时候,不进行文件切人
  139. if num_partitions == 0:
  140. print(f"表{table_name}在分区{partitions}下没有文件数据,或文件已损坏")
  141. sys.exit()
  142. # 判断当是否分区表,分开处理
  143. if partitions is not None:
  144. # 处理多级分区问题
  145. partition_list = partitions.split('/')
  146. # 提取分区键值对
  147. conditions = [segment.split('=') for segment in partition_list]
  148. # 提取分区键
  149. keys = ','.join([segment.split('=')[0] for segment in partition_list])
  150. # 构建条件语句
  151. condition_str = ' and '.join([f"{key}='{value}'" for key, value in conditions])
  152. # 组装 WHERE 子句
  153. where_clause = f"WHERE {condition_str}"
  154. pretty_print(
  155. f'{NORM_MGT}分区表开始执行合并 db_name:{db_name}, table_name:{table_name}, partition={partitions}\n{NORM_GRN}')
  156. hive.sql(f"select * from {db_name}.{table_name} {where_clause} ").coalesce(
  157. num_partitions).createOrReplaceTempView("mid_table")
  158. hive.sql(f"INSERT OVERWRITE TABLE {db_name}.{table_name} PARTITION({keys}) select * from mid_table ")
  159. # hive.stop()
  160. pretty_print(
  161. f'{NORM_MGT}分区表结束合并文件, 共合并db_name:{db_name}, table_name:{table_name}, partition={partitions} 为{num_partitions}个文件\n{NORM_GRN}')
  162. return num_partitions
  163. else:
  164. pretty_print(f'{NORM_MGT}非分区表开始执行压缩 db_name:{db_name}, table_name:{table_name}\n{NORM_GRN}')
  165. hive.sql(f"select * from {db_name}.{table_name}").coalesce(num_partitions).createOrReplaceTempView("mid_table")
  166. hive.sql(f"INSERT OVERWRITE TABLE {db_name}.{table_name} select * from mid_table ")
  167. # hive.stop()
  168. pretty_print(
  169. f'{NORM_MGT}非分区表结束合并文件, 共合并db_name:{db_name}, table_name:{table_name} 为{num_partitions}个文件\n{NORM_GRN}')
  170. return num_partitions
  171. def hadoop_fs_test(hdfs_file):
  172. """
  173. 检查 HDFS 目录下是否包含 指定目录或文件
  174. Args:
  175. hdfs_file: HDFS 目录路径
  176. Returns:
  177. bool: 如果存在,返回 True;否则返回 False
  178. """
  179. try:
  180. # 检查 _SUCCESS 文件是否存在
  181. result = subprocess.run(f"hadoop fs -test -e {hdfs_file}", shell=True,
  182. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  183. return result.returncode == 0
  184. except subprocess.CalledProcessError as e:
  185. print(f"执行命令时发生错误: {e}")
  186. return False
  187. def hadoop_fs_count(location_hdfs_path):
  188. try:
  189. output = subprocess.check_output(f"hadoop fs -count -q -v {location_hdfs_path}", shell=True).decode("utf-8")
  190. lines = output.strip().split("\n")
  191. if len(lines) >= 2:
  192. parts = lines[1].split()
  193. num_files = parts[5]
  194. data_size = parts[6]
  195. if int(num_files) >= 1:
  196. # 判断是否有_SUCCESS文件,需要排除
  197. exists_success = hadoop_fs_test(f"{location_hdfs_path}/_SUCCESS")
  198. if exists_success:
  199. num_files = int(num_files) - 1
  200. return num_files, data_size
  201. except subprocess.CalledProcessError as e:
  202. # 捕获文件为空的状态
  203. print(f"当前文件为空 {location_hdfs_path}")
  204. return 0, 0
  205. def merge_hdfs_file_absolute_path(file_path, file_type="text", file_merge_size=128, compression_type="none"):
  206. """
  207. Args:
  208. compression_type: 压缩格式
  209. file_path: 文件绝对路径
  210. file_merge_size: 文件拆分大小
  211. file_type: 文件类型,例如orc,parquet,text
  212. Returns:返回合并后文件个数 num_partitions
  213. """
  214. if file_type not in ['orc', 'parquet', 'text']:
  215. pretty_print(
  216. f'{NORM_MGT}不支持的文件类型{file_type},目前支持orc,parquet,text\n{NORM_GRN}')
  217. return
  218. # 获取hdfs文件大小,并解析需要拆分多少数据切片
  219. file_count, file_size = hadoop_fs_count(file_path)
  220. # 当解析分区内文件为0的时候,不进行文件切人
  221. if file_count == 0:
  222. print(f"该路径{file_path}下没有文件数据,或文件已损坏")
  223. sys.exit()
  224. num_partitions = int(int(file_size) / file_merge_size / 1024 / 1024) + 1
  225. if num_partitions == file_count:
  226. print(f"该路径{file_path}下文件数量合适,不需要合并小文件")
  227. return
  228. # 复写小文件到当前文件夹
  229. new_file_path = file_path + "merge"
  230. # 读取原始HDFS文件路径
  231. dataset = spark.read.format(file_type).load(file_path)
  232. pretty_print(f'{NORM_MGT}路径{file_path}文件合并开始, 预划分为{num_partitions}个文件\n{NORM_GRN}')
  233. # 重新写入到HDFS中,因目前无法直接覆盖当前文件夹数据,改为写入新文件,老文件进行删除
  234. if num_partitions >= 3:
  235. dataset = dataset.repartition(num_partitions)
  236. else:
  237. dataset = dataset.coalesce(num_partitions)
  238. dataset.write \
  239. .option("compression", compression_type) \
  240. .save(path=new_file_path, format=file_type, mode="overwrite")
  241. # 执行命令删除文件
  242. subprocess.run(f"hdfs dfs -rm -r {file_path}", shell=True)
  243. # 移动目录
  244. subprocess.run(f"hdfs dfs -mv {new_file_path} {file_path}", shell=True)
  245. pretty_print(f'{NORM_MGT}路径{file_path}文件合并完成, 共压缩为{num_partitions}个文件\n{NORM_GRN}')
  246. # 关闭session
  247. # session.stop()
  248. return num_partitions
  249. def scan_dir_merge_file(scan_dir, file_type="text", file_merge_size=128, compression_type="gzip"):
  250. _dirs, _files = list_hdfs_directory_contents(scan_dir)
  251. print(f"当前目录{scan_dir}下包含以下目录: {_dirs}")
  252. print(f"当前目录{scan_dir}下包含以下文件: {_files}")
  253. if len(_dirs) != 0:
  254. for _dir in _dirs:
  255. print(f"{'*' * 20 } 开始合并目录{_dir}下的小文件 {'*' * 20 }")
  256. merge_hdfs_file_absolute_path(file_path=_dir, file_type=file_type, file_merge_size=file_merge_size,
  257. compression_type=compression_type)
  258. elif len(_files) != 0:
  259. print(f"当前目录{scan_dir}下没有子目录,合并当前目录下的文件")
  260. merge_hdfs_file_absolute_path(scan_dir, file_type=file_type, file_merge_size=file_merge_size,
  261. compression_type=compression_type)
  262. else:
  263. print(f"当前目录{scan_dir}下没有文件,跳过合并")
  264. def scan_hdfs_in_hive_tbl(tbl_name):
  265. """
  266. Args:
  267. tbl_name: 表名称
  268. 指定表名称,扫描该表下面得文件个数,以及文件总大小,建议文件个数等
  269. 如果需要去全库扫描,需要自己写一个show tables然后遍历该方法,就是太消耗资源了
  270. Returns:
  271. """
  272. # 获取表的详细信息
  273. table_info = hive.sql(f"DESCRIBE FORMATTED {tbl_name}")
  274. # 获取表文档目录(分区表的根目录)
  275. table_location = str(table_info.filter("col_name = 'Location'").select("data_type").collect()[0][0])[19:]
  276. # 检查表是否是分区表
  277. is_partitioned = any(row["col_name"] == "# Partition Information" for row in table_info.collect())
  278. if is_partitioned:
  279. # 获取每个分区的空间占用和小文件个数
  280. partitions = hive.sql(f"SHOW PARTITIONS {tbl_name}").select("partition").collect()
  281. # 遍历分区集合
  282. for partition_row in partitions:
  283. partition = partition_row["partition"]
  284. location_name = table_location + "/" + partition
  285. # 获取hdfs元数据信息
  286. num_files, data_size = hadoop_fs_count(location_name)
  287. if num_files != '0' and data_size != '0':
  288. # 获取小于20M得小文件数据
  289. small_file_cnt = count_small_files(get_hdfs_file_sizes(location_name), 20 * 1024 * 1024)
  290. # 返回分区表名、分区、空间占用和小文件个数
  291. return {
  292. "表名": tbl_name,
  293. "分区": partition,
  294. "空间占用": data_size + "byte≈" + str(round(int(data_size) / 1024 / 1024, 3)) + "Mb",
  295. "当前文件个数": num_files,
  296. "小于20Mb文件个数": small_file_cnt,
  297. "建议设置文件个数": int(int(data_size) / 128 / 1024 / 1024) + 1
  298. }
  299. else:
  300. return {
  301. "表名": tbl_name,
  302. "分区": partition,
  303. "空间占用": 0,
  304. "当前文件个数": 0,
  305. "小于20Mb文件个数": 0,
  306. "建议设置文件个数": "无建议,当前分区无数据"
  307. }
  308. else:
  309. # 获取hdfs元数据信息
  310. num_files, data_size = hadoop_fs_count(table_location)
  311. if num_files != '0' and data_size != '0':
  312. # 获取小于20M得小文件数据
  313. small_file_cnt = count_small_files(get_hdfs_file_sizes(table_location), 20 * 1024 * 1024)
  314. # 返回表名、空间占用和小文件个数
  315. return {
  316. "表名": tbl_name,
  317. "分区": "无分区",
  318. "空间占用": data_size + "byte≈" + str(round(int(data_size) / 1024 / 1024, 3)) + "Mb",
  319. "当前文件个数": num_files,
  320. "小于20Mb文件个数": small_file_cnt,
  321. "建议设置文件个数": int(int(data_size) / 128 / 1024 / 1024) + 1
  322. }
  323. else:
  324. return {
  325. "表名": tbl_name,
  326. "分区": "无分区",
  327. "空间占用": 0,
  328. "当前文件个数": 0,
  329. "小于20Mb文件个数": 0,
  330. "建议设置文件个数": "无建议,当前表无数据"
  331. }
  332. # spark.stop()
  333. def get_hdfs_file_sizes(path):
  334. """
  335. Args:
  336. path:HDFS文件路径
  337. Returns:文件路径以及大小集合
  338. """
  339. # 执行 hadoop fs -du 命令获取文件大小信息
  340. command = f'hadoop fs -du {path}'
  341. result = subprocess.check_output(command, shell=True).decode("utf-8")
  342. # 解析命令输出,获取每个文件的大小
  343. file_sizes = {}
  344. lines = result.strip().split("\n")
  345. if not any(lines):
  346. return 0
  347. else:
  348. for line in lines:
  349. line_split = line.split(" ")
  350. file_sizes[line_split[2]] = line_split[0]
  351. return file_sizes
  352. def count_small_files(file_sizes, threshold):
  353. """
  354. Args:
  355. file_sizes: 文件路径以及大小集合
  356. threshold: 指定阈值小文件大小
  357. Returns:返回小于阈值得小文件个数
  358. """
  359. # 统计小于指定阈值的文件数量
  360. if file_sizes == 0:
  361. return 0
  362. else:
  363. count = 0
  364. for file_path, size in file_sizes.items():
  365. if int(size) < threshold:
  366. count += 1
  367. return count
  368. def scan_hdfs_in_db(db_name):
  369. # 查找指定数据库得全部数据表
  370. db_tables = hive.sql(f"SHOW TABLES IN {db_name}").collect()
  371. # 对数据进行遍历
  372. for table in db_tables:
  373. print(scan_hdfs_in_hive_tbl(f"{db_name}.{table.tableName}"))
  374. hive.stop()
  375. spark.stop()
  376. def list_hdfs_directory_contents(hdfs_path):
  377. """
  378. 查询HDFS目录下的所有目录名称和文件名称(非递归模式)
  379. Args:
  380. hdfs_path (str): HDFS目录路径
  381. Returns:
  382. tuple: (dirs, files) 其中dirs是目录列表,files是文件列表
  383. """
  384. try:
  385. # 调用hadoop fs -ls命令
  386. output = subprocess.check_output(f"hadoop fs -ls {hdfs_path}", shell=True).decode("utf-8")
  387. lines = output.strip().split("\n")
  388. dirs = []
  389. files = []
  390. for line in lines:
  391. parts = line.split()
  392. if len(parts) >= 8:
  393. path = parts[-1]
  394. if parts[0].startswith("d"): # 目录
  395. dirs.append(path)
  396. else: # 文件
  397. files.append(path)
  398. return dirs, files
  399. except subprocess.CalledProcessError as e:
  400. print(f"Error executing HDFS command: {e}")
  401. return [], []
  402. if __name__ == '__main__':
  403. # nums = hdfs_estimate_num_partitions('ent_ods', 'ent_globiz_mg_companies', '19700101', 128)
  404. # print(nums)
  405. # collect_hdfs_file_test('20240512', 'ent_raw', 'ent_crawler_base', 'ent_bing_crawler')
  406. # merge_hdfs_file('tmp', 'ent_shh_api_company_logs_tmp2', None, 128)
  407. # merge_hdfs_file_absolute_path("/user/hive/warehouse/tmp.db/cts_peru_cpdl_fill", 128,'text')
  408. # print(scan_hdfs_in_hive_tbl('tmp.country_mapping_514'))
  409. scan_hdfs_in_db('tmp')
  410. # merge_hdfs_file('ent_dwd','ent_venezuela_biz_basic','dt=19700101',128)
  411. # get_hive_mete()