to_es.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import sys
  2. import os
  3. import re
  4. from datetime import datetime
  5. # 传参示例测试环境 -catalog=imports -database_name=venezuela_bol -run_type=test -dt=20200101 -cdt=20200102
  6. # 生产环境 -catalog=imports -database_name=america -run_type=print -dt=20240808 -cdt=20240808
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.scheduler.mg2es.es_operator import ESOperator
  11. from dw_base.scheduler.mg2es.es_tmpl_gen import EsTmplGen
  12. from dw_base.scheduler.mg2es.git_helper import GitHelper
  13. from dw_base.scheduler.mg2es.hive_sql import HiveSQL
  14. from dw_base.spark.spark_sql import SparkSQL
  15. from dw_base.utils.config_utils import parse_args
  16. if __name__ == '__main__':
  17. git_helper = GitHelper()
  18. git_helper.git_pull_etlconfig()
  19. catalog_dict = {
  20. 'exports': 'ex',
  21. 'imports': 'im'}
  22. CONFIG, _ = parse_args(sys.argv[1:])
  23. run_type = CONFIG.get('run_type', 'print')
  24. dt = CONFIG.get('dt')
  25. cdt = CONFIG.get('cdt')
  26. ydt = CONFIG.get('ydt')
  27. run_year = CONFIG.get('run_year')
  28. catalog = CONFIG.get('catalog')
  29. database_name = CONFIG.get('database_name')
  30. data_source = f'{database_name}_{catalog_dict.get(catalog)}'
  31. es_gen = EsTmplGen(catalog, database_name)
  32. es_bak_ddl = es_gen.make_2es_ddl().replace("'", '"')
  33. es_mapping_ddl = es_gen.make_es_mapping_ddl().replace("'", '"')
  34. es_bak_dml = es_gen.make_2es_dml().replace("'", '"')
  35. es_mapping_dml = es_gen.make_es_mapping_dml().replace("'", '"')
  36. if data_source == 'india_im' or data_source == 'america_im':
  37. es_bak_ddl, es_mapping_ddl, es_bak_dml = es_gen.replace_sql(es_bak_ddl, es_mapping_ddl, es_bak_dml, data_source)
  38. hive_host = '192.168.30.3'
  39. es_operator = ESOperator('192.168.11.100', 9003)
  40. if run_type not in ['print', 'test', 'prod']:
  41. print('【error】 run_type 参数错误,请检查!')
  42. if run_type != 'prod':
  43. es_mapping_ddl = es_mapping_ddl.replace('192.168.11.100', '192.168.0.200')
  44. es_mapping_ddl = es_mapping_ddl.replace('9003', '9201')
  45. hive_host = '192.168.15.3'
  46. es_operator = ESOperator('192.168.0.200', 9201)
  47. if run_type == 'print':
  48. print(f'\n\n【es_bak_ddl】\n{es_bak_ddl}')
  49. print(f'\n\n【es_mapping_ddl】\n{es_mapping_ddl}')
  50. print(f'\n\n【es_bak_dml】\n{es_bak_dml}')
  51. print(f'\n\n【es_mapping_dml】\n{es_mapping_dml}')
  52. exit()
  53. incr_tbl = f'to_mongo.cts_{database_name}_{catalog_dict[catalog]}'
  54. spark = SparkSQL(spark_driver_memory='4g',
  55. spark_executor_memory='6g',
  56. spark_executor_memory_overhead='2048',
  57. spark_driver_cores=1,
  58. spark_executor_cores=2,
  59. spark_executor_instances=10,
  60. udf_files=['dw_base/spark/udf/customs/cts_common.py'])
  61. hive = HiveSQL(hive_host, 10000, 'hive', None, 'to_es')
  62. spark._final_spark_config = {'hive.exec.dynamic.partition': 'true', 'hive.exec.dynamic.partition.mode': 'nonstrict',
  63. 'spark.yarn.queue': 'cts', 'spark.sql.crossJoin.enabled': 'true'}
  64. cnt_sql = f'select count(1) cnt from {incr_tbl} where dt = "{dt}"'
  65. cnt = spark.query(cnt_sql)[0].collect()[0]['cnt']
  66. if cnt == 0:
  67. print('\n\n【info】Today, there is no incremental data,exit')
  68. exit()
  69. else:
  70. print(f'\n\n【info】Today, there are {cnt} incremental data,continue')
  71. actived_sql = f'select git_last_time from task.es_template where `data_source` ="{data_source}" and is_actived = "1"'
  72. res = spark.query(actived_sql)[0].collect()
  73. tbl_git_last_time = None
  74. if len(res) != 0:
  75. tbl_git_last_time = res[0]['git_last_time']
  76. git_last_time = git_helper.get_etlconfig_last_uptime(catalog, database_name)
  77. if tbl_git_last_time is None:
  78. print('\n\n【info】No active template, insert template')
  79. insert_sql = f'''
  80. insert overwrite table task.es_template
  81. select '{es_bak_ddl}' as `es_bak_ddl`
  82. , '{es_mapping_ddl}' as `es_mapping_ddl`
  83. , '{es_bak_dml}' as `es_bak_dml`
  84. , '{es_mapping_dml}' as `es_mapping_dml`
  85. , '{git_last_time}' as `git_last_time`
  86. , '1' as `is_actived`
  87. , date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as `updated_time`
  88. , '{data_source}' as `data_source`
  89. , '{cdt}' as `created_dt`
  90. '''
  91. spark.execute(insert_sql)
  92. print('\n\n【info】execute es_bak_ddl')
  93. hive.execute(es_bak_ddl)
  94. else:
  95. print(f'\n\n【info】git_last_time: {git_last_time} tbl_git_last_time: {tbl_git_last_time}')
  96. if git_last_time > tbl_git_last_time:
  97. print('\n\n【info】Template is updated, update template')
  98. update_sql = f'''
  99. insert overwrite table task.es_template
  100. select '{es_bak_ddl}' as `es_bak_ddl`
  101. , '{es_mapping_ddl}' as `es_mapping_ddl`
  102. , '{es_bak_dml}' as `es_bak_dml`
  103. , '{es_mapping_dml}' as `es_mapping_dml`
  104. , '{git_last_time}' as `git_last_time`
  105. , '1' as `is_actived`
  106. , date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as `updated_time`
  107. , '{data_source}' as `data_source`
  108. , '{cdt}' as `created_dt`
  109. UNION ALL
  110. SELECT i.`es_bak_ddl`
  111. , i.`es_mapping_ddl`
  112. , i.`es_bak_dml`
  113. , i.`es_mapping_dml`
  114. , i.`git_last_time`
  115. , if(i.`is_actived` = '1' , '0', i.`is_actived`)
  116. , if(i.`is_actived` = '1' ,
  117. date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss'), i.`updated_time`)
  118. , i.`data_source`
  119. , i.`created_dt`
  120. FROM task.es_template i
  121. where data_source = '{data_source}' and created_dt != '{cdt}'
  122. '''
  123. spark.execute(update_sql)
  124. print('\n\n【info】es_bak is rename')
  125. rename_sql = f'''
  126. alter table to_es.cts_{database_name}_{catalog_dict[catalog]} rename to to_es.cts_{database_name}_{catalog_dict[catalog]}_{dt}_bak
  127. '''
  128. hive.execute(rename_sql)
  129. print('\n\n【info】execute es_bak_ddl')
  130. hive.execute(es_bak_ddl)
  131. rs = hive.query('show tables')
  132. tbl_prefix = f'es_cts_{database_name}_{catalog_dict[catalog]}_'
  133. for tbl in rs:
  134. if tbl[0].startswith(tbl_prefix):
  135. drop_sql = f'drop table {tbl[0]}'
  136. hive.execute(drop_sql)
  137. else:
  138. print('\n\n【info】Template is not updated')
  139. print('\n\n【info】execute es_bak_dml')
  140. es_bak_dml = es_bak_dml.replace('dtNeedReplace', dt)
  141. spark.execute(es_bak_dml)
  142. years_sql = f"select year_from_date,count(1) as cnt from to_es.cts_{database_name}_{catalog_dict[catalog]} where dt = '{dt}' group by year_from_date order by year_from_date desc"
  143. rows = spark.query(years_sql)[0].collect()
  144. if run_year:
  145. years = [run_year]
  146. else:
  147. years = [row['year_from_date'] for row in rows]
  148. print(f'\n\n【info】run_year is {years}')
  149. year_cnt_dict = {row['year_from_date']: row['cnt'] for row in rows}
  150. print(f'\n\n【info】year and cnt :{year_cnt_dict}')
  151. spark._spark_session.stop()
  152. print('\n\n【info】execute es_mapping_ddl&dml')
  153. mem_sql = 'SET mapreduce.map.memory.mb=16384'
  154. hive.execute(mem_sql)
  155. jvm_sql = 'SET mapreduce.map.java.opts=-Xmx8192m'
  156. hive.execute(jvm_sql)
  157. queue_sql = 'SET mapreduce.job.queuename=cts'
  158. hive.execute(queue_sql)
  159. print(hive.query('SET mapreduce.job.queuename'))
  160. print(hive.query('select current_user()'))
  161. for year in years:
  162. print(f'\n\n【info】execute year :{year} ')
  163. start_time = datetime.now()
  164. year_es_mapping_ddl = es_mapping_ddl.replace('yearNeedReplace', year)
  165. year_es_mapping_dml = es_mapping_dml.replace('yearNeedReplace', year)
  166. year_es_mapping_dml = year_es_mapping_dml.replace('dtNeedReplace', dt)
  167. hive.execute(year_es_mapping_ddl)
  168. hive.execute(year_es_mapping_dml)
  169. end_time = datetime.now()
  170. time_taken = (end_time - start_time).seconds
  171. print(
  172. f'\n\n【info】execute year :{year} cnt : {year_cnt_dict[year]} time_taken : {time_taken} seconds-----------------------')
  173. index_name = f'customs_{catalog}_{database_name}-{year}'
  174. es_operator.refresh(index_name)
  175. print(f'\n\n【info】index_name : {index_name} refresh done')