| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import sys
- import os
- import re
- from datetime import datetime
- # 传参示例测试环境 -catalog=imports -database_name=venezuela_bol -run_type=test -dt=20200101 -cdt=20200102
- # 生产环境 -catalog=imports -database_name=america -run_type=print -dt=20240808 -cdt=20240808
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.scheduler.mg2es.es_operator import ESOperator
- from dw_base.scheduler.mg2es.es_tmpl_gen import EsTmplGen
- from dw_base.scheduler.mg2es.git_helper import GitHelper
- from dw_base.scheduler.mg2es.hive_sql import HiveSQL
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.config_utils import parse_args
- if __name__ == '__main__':
- git_helper = GitHelper()
- git_helper.git_pull_etlconfig()
- catalog_dict = {
- 'exports': 'ex',
- 'imports': 'im'}
- CONFIG, _ = parse_args(sys.argv[1:])
- run_type = CONFIG.get('run_type', 'print')
- dt = CONFIG.get('dt')
- cdt = CONFIG.get('cdt')
- ydt = CONFIG.get('ydt')
- run_year = CONFIG.get('run_year')
- catalog = CONFIG.get('catalog')
- database_name = CONFIG.get('database_name')
- data_source = f'{database_name}_{catalog_dict.get(catalog)}'
- es_gen = EsTmplGen(catalog, database_name)
- es_bak_ddl = es_gen.make_2es_ddl().replace("'", '"')
- es_mapping_ddl = es_gen.make_es_mapping_ddl().replace("'", '"')
- es_bak_dml = es_gen.make_2es_dml().replace("'", '"')
- es_mapping_dml = es_gen.make_es_mapping_dml().replace("'", '"')
- if data_source == 'india_im' or data_source == 'america_im':
- es_bak_ddl, es_mapping_ddl, es_bak_dml = es_gen.replace_sql(es_bak_ddl, es_mapping_ddl, es_bak_dml, data_source)
- hive_host = '192.168.30.3'
- es_operator = ESOperator('192.168.11.100', 9003)
- if run_type not in ['print', 'test', 'prod']:
- print('【error】 run_type 参数错误,请检查!')
- if run_type != 'prod':
- es_mapping_ddl = es_mapping_ddl.replace('192.168.11.100', '192.168.0.200')
- es_mapping_ddl = es_mapping_ddl.replace('9003', '9201')
- hive_host = '192.168.15.3'
- es_operator = ESOperator('192.168.0.200', 9201)
- if run_type == 'print':
- print(f'\n\n【es_bak_ddl】\n{es_bak_ddl}')
- print(f'\n\n【es_mapping_ddl】\n{es_mapping_ddl}')
- print(f'\n\n【es_bak_dml】\n{es_bak_dml}')
- print(f'\n\n【es_mapping_dml】\n{es_mapping_dml}')
- exit()
- incr_tbl = f'to_mongo.cts_{database_name}_{catalog_dict[catalog]}'
- spark = SparkSQL(spark_driver_memory='4g',
- spark_executor_memory='6g',
- spark_executor_memory_overhead='2048',
- spark_driver_cores=1,
- spark_executor_cores=2,
- spark_executor_instances=10,
- udf_files=['dw_base/spark/udf/customs/cts_common.py'])
- hive = HiveSQL(hive_host, 10000, 'hive', None, 'to_es')
- spark._final_spark_config = {'hive.exec.dynamic.partition': 'true', 'hive.exec.dynamic.partition.mode': 'nonstrict',
- 'spark.yarn.queue': 'cts', 'spark.sql.crossJoin.enabled': 'true'}
- cnt_sql = f'select count(1) cnt from {incr_tbl} where dt = "{dt}"'
- cnt = spark.query(cnt_sql)[0].collect()[0]['cnt']
- if cnt == 0:
- print('\n\n【info】Today, there is no incremental data,exit')
- exit()
- else:
- print(f'\n\n【info】Today, there are {cnt} incremental data,continue')
- actived_sql = f'select git_last_time from task.es_template where `data_source` ="{data_source}" and is_actived = "1"'
- res = spark.query(actived_sql)[0].collect()
- tbl_git_last_time = None
- if len(res) != 0:
- tbl_git_last_time = res[0]['git_last_time']
- git_last_time = git_helper.get_etlconfig_last_uptime(catalog, database_name)
- if tbl_git_last_time is None:
- print('\n\n【info】No active template, insert template')
- insert_sql = f'''
- insert overwrite table task.es_template
- select '{es_bak_ddl}' as `es_bak_ddl`
- , '{es_mapping_ddl}' as `es_mapping_ddl`
- , '{es_bak_dml}' as `es_bak_dml`
- , '{es_mapping_dml}' as `es_mapping_dml`
- , '{git_last_time}' as `git_last_time`
- , '1' as `is_actived`
- , date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as `updated_time`
- , '{data_source}' as `data_source`
- , '{cdt}' as `created_dt`
- '''
- spark.execute(insert_sql)
- print('\n\n【info】execute es_bak_ddl')
- hive.execute(es_bak_ddl)
- else:
- print(f'\n\n【info】git_last_time: {git_last_time} tbl_git_last_time: {tbl_git_last_time}')
- if git_last_time > tbl_git_last_time:
- print('\n\n【info】Template is updated, update template')
- update_sql = f'''
- insert overwrite table task.es_template
- select '{es_bak_ddl}' as `es_bak_ddl`
- , '{es_mapping_ddl}' as `es_mapping_ddl`
- , '{es_bak_dml}' as `es_bak_dml`
- , '{es_mapping_dml}' as `es_mapping_dml`
- , '{git_last_time}' as `git_last_time`
- , '1' as `is_actived`
- , date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as `updated_time`
- , '{data_source}' as `data_source`
- , '{cdt}' as `created_dt`
- UNION ALL
- SELECT i.`es_bak_ddl`
- , i.`es_mapping_ddl`
- , i.`es_bak_dml`
- , i.`es_mapping_dml`
- , i.`git_last_time`
- , if(i.`is_actived` = '1' , '0', i.`is_actived`)
- , if(i.`is_actived` = '1' ,
- date_format(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss'), i.`updated_time`)
- , i.`data_source`
- , i.`created_dt`
- FROM task.es_template i
- where data_source = '{data_source}' and created_dt != '{cdt}'
- '''
- spark.execute(update_sql)
- print('\n\n【info】es_bak is rename')
- rename_sql = f'''
- alter table to_es.cts_{database_name}_{catalog_dict[catalog]} rename to to_es.cts_{database_name}_{catalog_dict[catalog]}_{dt}_bak
- '''
- hive.execute(rename_sql)
- print('\n\n【info】execute es_bak_ddl')
- hive.execute(es_bak_ddl)
- rs = hive.query('show tables')
- tbl_prefix = f'es_cts_{database_name}_{catalog_dict[catalog]}_'
- for tbl in rs:
- if tbl[0].startswith(tbl_prefix):
- drop_sql = f'drop table {tbl[0]}'
- hive.execute(drop_sql)
- else:
- print('\n\n【info】Template is not updated')
- print('\n\n【info】execute es_bak_dml')
- es_bak_dml = es_bak_dml.replace('dtNeedReplace', dt)
- spark.execute(es_bak_dml)
- years_sql = f"select year_from_date,count(1) as cnt from to_es.cts_{database_name}_{catalog_dict[catalog]} where dt = '{dt}' group by year_from_date order by year_from_date desc"
- rows = spark.query(years_sql)[0].collect()
- if run_year:
- years = [run_year]
- else:
- years = [row['year_from_date'] for row in rows]
- print(f'\n\n【info】run_year is {years}')
- year_cnt_dict = {row['year_from_date']: row['cnt'] for row in rows}
- print(f'\n\n【info】year and cnt :{year_cnt_dict}')
- spark._spark_session.stop()
- print('\n\n【info】execute es_mapping_ddl&dml')
- mem_sql = 'SET mapreduce.map.memory.mb=16384'
- hive.execute(mem_sql)
- jvm_sql = 'SET mapreduce.map.java.opts=-Xmx8192m'
- hive.execute(jvm_sql)
- queue_sql = 'SET mapreduce.job.queuename=cts'
- hive.execute(queue_sql)
- print(hive.query('SET mapreduce.job.queuename'))
- print(hive.query('select current_user()'))
- for year in years:
- print(f'\n\n【info】execute year :{year} ')
- start_time = datetime.now()
- year_es_mapping_ddl = es_mapping_ddl.replace('yearNeedReplace', year)
- year_es_mapping_dml = es_mapping_dml.replace('yearNeedReplace', year)
- year_es_mapping_dml = year_es_mapping_dml.replace('dtNeedReplace', dt)
- hive.execute(year_es_mapping_ddl)
- hive.execute(year_es_mapping_dml)
- end_time = datetime.now()
- time_taken = (end_time - start_time).seconds
- print(
- f'\n\n【info】execute year :{year} cnt : {year_cnt_dict[year]} time_taken : {time_taken} seconds-----------------------')
- index_name = f'customs_{catalog}_{database_name}-{year}'
- es_operator.refresh(index_name)
- print(f'\n\n【info】index_name : {index_name} refresh done')
|