import sys import os import re abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) import json from dw_base.scheduler.mg2es.path_util import PathUtil from dw_base.scheduler.mg2es.conf_reader import ConfReader class EsTmplGen: def __init__(self, catalog, database_name): self.catalog = catalog self.database_name = database_name es_json_path, mg2es_mapping_path = PathUtil.get_conf_abspath(catalog, database_name) conf_reader = ConfReader() self.yml_dict = conf_reader.get_yml_data(mg2es_mapping_path) self.es_json = conf_reader.get_json_data(es_json_path) self.type_dict = { 'date': 'string', 'text': 'string', 'keyword': 'string', 'scaled_float': 'double', } self.catalog_dict = { 'exports': 'ex', 'imports': 'im'} def get_clos_with_type(self): yml_fields = self.yml_dict['transformer']['mapping']['fields'] for field in yml_fields: if field.get('name') == 'productTag': yml_fields.remove(field) yml_clos = [c['name'] for c in yml_fields] # 将_id替换成id yml_clos[0] = 'id' # 数组类型:字段handler为text_split_handler,或source类型为数组 arr_clos = [c['name'] for c in yml_fields if c.get('handler') == 'text_split_handler' or isinstance(c.get('source'), list)] # print(f'全字段:{yml_clos}') # print(f'数组类型字段:{arr_clos}') # 创建一个新的列表,用于存放键值对 json_list = {} # 遍历 JSON 字典的键值对 for key, value in self.es_json['mappings']['properties'].items(): # 提取键和 type 的值,并组成新的键值对 json_list[key] = value['type'] # print(f'es类型:{json_list}') res_list = [] for c in yml_clos: if c in arr_clos: res_list.append((c, 'array')) elif c in json_list: res_list.append((c, self.type_dict[json_list[c]])) else: res_list.append((c, 'string')) return res_list def make_ddl_body(self): clos_with_type = self.get_clos_with_type() clos_len = [len(c[0]) for c in clos_with_type] max_len = max(clos_len) + 2 formatted_clos = ['\t{:<{width}} {}'.format(f'`{c[0]}`', c[1], width=max_len) for c in clos_with_type] clos_str = ",\n".join(formatted_clos) return clos_str def make_2es_ddl(self): clos_str = self.make_ddl_body() ddl = (f'create table to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]}\n' f'(\n' f'{clos_str}' f'\n) PARTITIONED BY ( `dt` string,year_from_date string) \n' f'\tSTORED AS ORC' ) return ddl def make_es_mapping_ddl(self): clos_str = self.make_ddl_body() clos = [f'{ct[0]}:{ct[0]}' for ct in self.get_clos_with_type()] clos = clos[1:] mapping_prop = ','.join(clos) ddl = ( f'create external table if not exists to_es.es_cts_{self.database_name}_{self.catalog_dict[self.catalog]}_yearNeedReplace\n' f'(\n' f'{clos_str}' f"\n) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' \n" f'\tTBLPROPERTIES (' f'''\n'es.nodes' = '192.168.11.100', 'es.port' = '9003', 'es.http.timeout' = '100m', 'es.input.use.sliced.partitions' = 'false', 'es.input.json' = 'false', 'es.index.auto.create' = 'true', 'es.write.operation' = 'upsert', 'es.mapping.date.rich' = 'false', 'es.batch.write.refresh' = 'false', 'es.batch.size.bytes' = '60mb', 'es.batch.size.entries' = '5000', 'es.batch.write.retry.count' = '10', 'es.batch.write.retry.wait' = '60s', 'es.update.retry.on.conflict' = '5' , 'es.resource' = 'customs_{self.catalog}_{self.database_name}-yearNeedReplace/_doc', 'es.mapping.id' = 'id', 'es.mapping.names' = '{mapping_prop}')''' ) return ddl def make_2es_dml(self): ct_list = self.get_clos_with_type()[2:] yml_fields = self.yml_dict['transformer']['mapping']['fields'][2:] field_list = [] filed_dict = {} handler_list = [] for i in range(len(ct_list)): yml_field = yml_fields[i] field_tuple = self.get_field(yml_field) field = field_tuple[0] if ct_list[i][1] == 'string': field = f"merge_ws({field})" field_sql = f"{field} as `{field_tuple[1]}`" field_list.append(field_sql) filed_dict[field_tuple[1]] = field_tuple[0] if 'handler' in yml_field: if 'dict_handler' in yml_field['handler']: handler_list.append(yml_field) dim_join_sql = self.get_dim_join_sql(handler_list, filed_dict) dml_body = '\n\t, '.join(field_list) dml = (f'insert overwrite table to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]}' f'\nselect i.`id`' f"\n\t, concat(replace(from_unixtime((i.`date` / 1000) - 8 * 60 * 60, 'yyyy-MM-dd HH:mm:ss'),' ','T'),'Z') as `date`" f'\n\t, {dml_body}' f'\n\t, i.`dt`' f"\n\t, from_unixtime((i.`date` / 1000) - 8 * 60 * 60, 'yyyy') as `year_from_date`" f'\nfrom to_mongo.cts_{self.database_name}_{self.catalog_dict[self.catalog]} i' f'\n{dim_join_sql}' f'\nwhere i.dt = "dtNeedReplace"' ) return dml def make_es_mapping_dml(self): clos_with_type = self.get_clos_with_type() clos = [f'i.`{c[0]}`' for c in clos_with_type] dml_body = '\n , '.join(clos) dml = ( f'insert overwrite table to_es.es_cts_{self.database_name}_{self.catalog_dict[self.catalog]}_yearNeedReplace' f'\nselect {dml_body}' f'\nfrom to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]} i' '\nwhere dt = "dtNeedReplace" and year_from_date = "yearNeedReplace"') return dml def get_field(self, fields): name = fields["name"] field = '' if 'default' in fields and 'source' not in fields: field = f"'{fields['default']}'" if 'handler' in fields: if 'dict_handler' in fields['handler']: field = f'{fields["name"]}_dim.`value`' elif fields['handler'] == 'text_split_handler': d = fields['delimiter'] if len(d) == 1: delimiter = d[0] field = f"array_distinct(split(i.`{fields['source']}`,'{delimiter}'))" else: delimiter = fields['delimiter'][1] delimiters = fields['delimiter'][2:-1] field = f"array_distinct(split(regexp_replace(i.`{fields['source']}`, '[{delimiters}]', '{delimiter}'),'{delimiter}'))" if 'source' in fields and 'handler' not in fields: source = fields['source'] if isinstance(source, list): # source = [f"`i.{s}`" for s in source] source = [f"merge_ws(i.`{s}`)" for s in source] # field = f"coalesce({','.join(source)})" field = f"filter(array_distinct(array({','.join(source)})),x -> x is not null)" else: field = f"i.`{source}`" if 'default' in fields and 'source' in fields: field = f"coalesce({field}, '{fields['default']}')" return (field, name) def get_dim_join_sql(self, handler_list, filed_dict): sql_list = [] for field in handler_list: handler = field['handler'] dim = f'{field["name"]}_dim' source = field['source'] if '__' in source: source = filed_dict.get(source.split('__')[1]) else: source = f'i.`{source}`' if handler == 'country_dict_handler': sql_list.append( f'left join dim.redis_cts_country_dict as {dim} on {dim}.dt = "dtNeedReplace" and lower({source}) = {dim}.`field`') elif handler == 'state_dict_handler': sql_list.append( f'left join dim.redis_cts_state_dict as {dim} on {dim}.dt = "dtNeedReplace" and lower({source}) = {dim}.`field`') return '\n'.join(sql_list) def replace_sql(self, es_bak_ddl, es_mapping_ddl, es_bak_dml, data_source): if data_source == 'india_im': es_bak_ddl = es_bak_ddl.replace("`importerAddress` string,", "`importerAddress` array,") es_bak_ddl = es_bak_ddl.replace("`exporterAddress` string,", "`exporterAddress` array,") es_mapping_ddl = es_mapping_ddl.replace("`importerAddress` string,", "`importerAddress` array,") es_mapping_ddl = es_mapping_ddl.replace("`exporterAddress` string,", "`exporterAddress` array,") es_bak_dml = es_bak_dml.replace("merge_ws(i.`jksdz`) as `importerAddress`", "str_to_arr(i.`jksdz`) as `importerAddress`") es_bak_dml = es_bak_dml.replace("merge_ws(i.`cksdz`) as `exporterAddress`", "str_to_arr(i.`cksdz`) as `exporterAddress`") if data_source == 'america_im': es_bak_ddl = es_bak_ddl.replace("`importerAddress` string,", "`importerAddress` array,") es_bak_ddl = es_bak_ddl.replace("`exporterAddress` string,", "`exporterAddress` array,") es_bak_ddl = es_bak_ddl.replace("`notifyPartyAddress` string,", "`notifyPartyAddress` array,") es_mapping_ddl = es_mapping_ddl.replace("`importerAddress` string,", "`importerAddress` array,") es_mapping_ddl = es_mapping_ddl.replace("`exporterAddress` string,", "`exporterAddress` array,") es_mapping_ddl = es_mapping_ddl.replace("`notifyPartyAddress` string,", "`notifyPartyAddress` array,") es_bak_dml = es_bak_dml.replace("merge_ws(i.`shrdz`) as `importerAddress`", "str_to_arr(i.`shrdz`) as `importerAddress`") es_bak_dml = es_bak_dml.replace("merge_ws(i.`fhrdz`) as `exporterAddress`", "str_to_arr(i.`fhrdz`) as `exporterAddress`") es_bak_dml = es_bak_dml.replace("merge_ws(i.`tzrdz`) as `notifyPartyAddress`", "str_to_arr(i.`tzrdz`) as `notifyPartyAddress`") return es_bak_ddl, es_mapping_ddl, es_bak_dml if __name__ == '__main__': # es = EsDDLGen('exports', 'america') es = EsTmplGen('imports', 'america') print('\n\n--2es_ddl-------------------------------------------------------') print(es.make_2es_ddl()) print('\n\n--es_mapping_ddl-------------------------------------------------------') print(es.make_es_mapping_ddl()) print('\n\n--2es_dml-------------------------------------------------------') print(es.make_2es_dml()) print('\n\n--es_mapping_dml-------------------------------------------------------') print(es.make_es_mapping_dml())