import sys import os import re from datetime import datetime # 传参示例测试环境 -catalog=imports -database_name=venezuela_bol -run_type=test -dt=20200101 -cdt=20200102 # 生产环境 -catalog=imports -database_name=america -run_type=print -dt=20240808 -cdt=20240808 abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.scheduler.mg2es.es_operator import ESOperator from dw_base.scheduler.mg2es.es_tmpl_gen import EsTmplGen from dw_base.scheduler.mg2es.git_helper import GitHelper from dw_base.scheduler.mg2es.hive_sql import HiveSQL from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.config_utils import parse_args if __name__ == '__main__': git_helper = GitHelper() git_helper.git_pull_etlconfig() catalog_dict = { 'exports': 'ex', 'imports': 'im'} CONFIG, _ = parse_args(sys.argv[1:]) run_type = CONFIG.get('run_type', 'print') dt = CONFIG.get('dt') cdt = CONFIG.get('cdt') ydt = CONFIG.get('ydt') run_year = CONFIG.get('run_year') catalog = CONFIG.get('catalog') database_name = CONFIG.get('database_name') data_source = f'{database_name}_{catalog_dict.get(catalog)}' es_gen = EsTmplGen(catalog, database_name) es_bak_ddl = es_gen.make_2es_ddl().replace("'", '"') es_mapping_ddl = es_gen.make_es_mapping_ddl().replace("'", '"') es_bak_dml = es_gen.make_2es_dml().replace("'", '"') es_mapping_dml = es_gen.make_es_mapping_dml().replace("'", '"') if data_source == 'india_im' or data_source == 'america_im': es_bak_ddl, es_mapping_ddl, es_bak_dml = es_gen.replace_sql(es_bak_ddl, es_mapping_ddl, es_bak_dml, data_source) hive_host = '192.168.30.3' es_operator = ESOperator('192.168.11.100', 9003) if run_type not in ['print', 'test', 'prod']: print('【error】 run_type 参数错误,请检查!') if run_type != 'prod': es_mapping_ddl = es_mapping_ddl.replace('192.168.11.100', '192.168.0.200') es_mapping_ddl = es_mapping_ddl.replace('9003', '9201') hive_host = '192.168.15.3' es_operator = ESOperator('192.168.0.200', 9201) if run_type == 'print': print(f'\n\n【es_bak_ddl】\n{es_bak_ddl}') print(f'\n\n【es_mapping_ddl】\n{es_mapping_ddl}') print(f'\n\n【es_bak_dml】\n{es_bak_dml}') print(f'\n\n【es_mapping_dml】\n{es_mapping_dml}') exit() incr_tbl = f'to_mongo.cts_{database_name}_{catalog_dict[catalog]}' spark = SparkSQL(spark_driver_memory='4g', spark_executor_memory='6g', spark_executor_memory_overhead='2048', spark_driver_cores=1, spark_executor_cores=2, spark_executor_instances=10, udf_files=['dw_base/spark/udf/customs/cts_common.py']) hive = HiveSQL(hive_host, 10000, 'hive', None, 'to_es') spark._final_spark_config = {'hive.exec.dynamic.partition': 'true', 'hive.exec.dynamic.partition.mode': 'nonstrict', 'spark.yarn.queue': 'cts', 'spark.sql.crossJoin.enabled': 'true'} cnt_sql = f'select count(1) cnt from {incr_tbl} where dt = "{dt}"' cnt = spark.query(cnt_sql)[0].collect()[0]['cnt'] if cnt == 0: print('\n\n【info】Today, there is no incremental data,exit') exit() else: print(f'\n\n【info】Today, there are {cnt} incremental data,continue') actived_sql = f'select git_last_time from task.es_template where `data_source` ="{data_source}" and is_actived = "1"' res = spark.query(actived_sql)[0].collect() tbl_git_last_time = None if len(res) != 0: tbl_git_last_time = res[0]['git_last_time'] git_last_time = git_helper.get_etlconfig_last_uptime(catalog, database_name) if tbl_git_last_time is None: print('\n\n【info】No active template, insert template') insert_sql = f''' insert overwrite table task.es_template select '{es_bak_ddl}' as `es_bak_ddl` , '{es_mapping_ddl}' as `es_mapping_ddl` , '{es_bak_dml}' as `es_bak_dml` , '{es_mapping_dml}' as `es_mapping_dml` , '{git_last_time}' as `git_last_time` , '1' as `is_actived` , date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as `updated_time` , '{data_source}' as `data_source` , '{cdt}' as `created_dt` ''' spark.execute(insert_sql) print('\n\n【info】execute es_bak_ddl') hive.execute(es_bak_ddl) else: print(f'\n\n【info】git_last_time: {git_last_time} tbl_git_last_time: {tbl_git_last_time}') if git_last_time > tbl_git_last_time: print('\n\n【info】Template is updated, update template') update_sql = f''' insert overwrite table task.es_template select '{es_bak_ddl}' as `es_bak_ddl` , '{es_mapping_ddl}' as `es_mapping_ddl` , '{es_bak_dml}' as `es_bak_dml` , '{es_mapping_dml}' as `es_mapping_dml` , '{git_last_time}' as `git_last_time` , '1' as `is_actived` , date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as `updated_time` , '{data_source}' as `data_source` , '{cdt}' as `created_dt` UNION ALL SELECT i.`es_bak_ddl` , i.`es_mapping_ddl` , i.`es_bak_dml` , i.`es_mapping_dml` , i.`git_last_time` , if(i.`is_actived` = '1' , '0', i.`is_actived`) , if(i.`is_actived` = '1' , date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss'), i.`updated_time`) , i.`data_source` , i.`created_dt` FROM task.es_template i where data_source = '{data_source}' and created_dt != '{cdt}' ''' spark.execute(update_sql) print('\n\n【info】es_bak is rename') rename_sql = f''' alter table to_es.cts_{database_name}_{catalog_dict[catalog]} rename to to_es.cts_{database_name}_{catalog_dict[catalog]}_{dt}_bak ''' hive.execute(rename_sql) print('\n\n【info】execute es_bak_ddl') hive.execute(es_bak_ddl) rs = hive.query('show tables') tbl_prefix = f'es_cts_{database_name}_{catalog_dict[catalog]}_' for tbl in rs: if tbl[0].startswith(tbl_prefix): drop_sql = f'drop table {tbl[0]}' hive.execute(drop_sql) else: print('\n\n【info】Template is not updated') print('\n\n【info】execute es_bak_dml') es_bak_dml = es_bak_dml.replace('dtNeedReplace', dt) spark.execute(es_bak_dml) years_sql = f"select year_from_date,count(1) as cnt from to_es.cts_{database_name}_{catalog_dict[catalog]} where dt = '{dt}' group by year_from_date order by year_from_date desc" rows = spark.query(years_sql)[0].collect() if run_year: years = [run_year] else: years = [row['year_from_date'] for row in rows] print(f'\n\n【info】run_year is {years}') year_cnt_dict = {row['year_from_date']: row['cnt'] for row in rows} print(f'\n\n【info】year and cnt :{year_cnt_dict}') spark._spark_session.stop() print('\n\n【info】execute es_mapping_ddl&dml') mem_sql = 'SET mapreduce.map.memory.mb=16384' hive.execute(mem_sql) jvm_sql = 'SET mapreduce.map.java.opts=-Xmx8192m' hive.execute(jvm_sql) queue_sql = 'SET mapreduce.job.queuename=cts' hive.execute(queue_sql) print(hive.query('SET mapreduce.job.queuename')) print(hive.query('select current_user()')) for year in years: print(f'\n\n【info】execute year :{year} ') start_time = datetime.now() year_es_mapping_ddl = es_mapping_ddl.replace('yearNeedReplace', year) year_es_mapping_dml = es_mapping_dml.replace('yearNeedReplace', year) year_es_mapping_dml = year_es_mapping_dml.replace('dtNeedReplace', dt) hive.execute(year_es_mapping_ddl) hive.execute(year_es_mapping_dml) end_time = datetime.now() time_taken = (end_time - start_time).seconds print( f'\n\n【info】execute year :{year} cnt : {year_cnt_dict[year]} time_taken : {time_taken} seconds-----------------------') index_name = f'customs_{catalog}_{database_name}-{year}' es_operator.refresh(index_name) print(f'\n\n【info】index_name : {index_name} refresh done')