| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- import sys
- import os
- import re
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- import json
- from dw_base.scheduler.mg2es.path_util import PathUtil
- from dw_base.scheduler.mg2es.conf_reader import ConfReader
- class EsTmplGen:
- def __init__(self, catalog, database_name):
- self.catalog = catalog
- self.database_name = database_name
- es_json_path, mg2es_mapping_path = PathUtil.get_conf_abspath(catalog, database_name)
- conf_reader = ConfReader()
- self.yml_dict = conf_reader.get_yml_data(mg2es_mapping_path)
- self.es_json = conf_reader.get_json_data(es_json_path)
- self.type_dict = {
- 'date': 'string',
- 'text': 'string',
- 'keyword': 'string',
- 'scaled_float': 'double',
- }
- self.catalog_dict = {
- 'exports': 'ex',
- 'imports': 'im'}
- def get_clos_with_type(self):
- yml_fields = self.yml_dict['transformer']['mapping']['fields']
- for field in yml_fields:
- if field.get('name') == 'productTag':
- yml_fields.remove(field)
- yml_clos = [c['name'] for c in yml_fields]
- # 将_id替换成id
- yml_clos[0] = 'id'
- # 数组类型:字段handler为text_split_handler,或source类型为数组
- arr_clos = [c['name'] for c in yml_fields if
- c.get('handler') == 'text_split_handler' or isinstance(c.get('source'), list)]
- # print(f'全字段:{yml_clos}')
- # print(f'数组类型字段:{arr_clos}')
- # 创建一个新的列表,用于存放键值对
- json_list = {}
- # 遍历 JSON 字典的键值对
- for key, value in self.es_json['mappings']['properties'].items():
- # 提取键和 type 的值,并组成新的键值对
- json_list[key] = value['type']
- # print(f'es类型:{json_list}')
- res_list = []
- for c in yml_clos:
- if c in arr_clos:
- res_list.append((c, 'array<string>'))
- elif c in json_list:
- res_list.append((c, self.type_dict[json_list[c]]))
- else:
- res_list.append((c, 'string'))
- return res_list
- def make_ddl_body(self):
- clos_with_type = self.get_clos_with_type()
- clos_len = [len(c[0]) for c in clos_with_type]
- max_len = max(clos_len) + 2
- formatted_clos = ['\t{:<{width}} {}'.format(f'`{c[0]}`', c[1], width=max_len) for c in clos_with_type]
- clos_str = ",\n".join(formatted_clos)
- return clos_str
- def make_2es_ddl(self):
- clos_str = self.make_ddl_body()
- ddl = (f'create table to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]}\n'
- f'(\n'
- f'{clos_str}'
- f'\n) PARTITIONED BY ( `dt` string,year_from_date string) \n'
- f'\tSTORED AS ORC'
- )
- return ddl
- def make_es_mapping_ddl(self):
- clos_str = self.make_ddl_body()
- clos = [f'{ct[0]}:{ct[0]}' for ct in self.get_clos_with_type()]
- clos = clos[1:]
- mapping_prop = ','.join(clos)
- ddl = (
- f'create external table if not exists to_es.es_cts_{self.database_name}_{self.catalog_dict[self.catalog]}_yearNeedReplace\n'
- f'(\n'
- f'{clos_str}'
- f"\n) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' \n"
- f'\tTBLPROPERTIES ('
- f'''\n'es.nodes' = '192.168.11.100',
- 'es.port' = '9003',
- 'es.http.timeout' = '100m',
- 'es.input.use.sliced.partitions' = 'false',
- 'es.input.json' = 'false',
- 'es.index.auto.create' = 'true',
- 'es.write.operation' = 'upsert',
- 'es.mapping.date.rich' = 'false',
- 'es.batch.write.refresh' = 'false',
- 'es.batch.size.bytes' = '60mb',
- 'es.batch.size.entries' = '5000',
- 'es.batch.write.retry.count' = '10',
- 'es.batch.write.retry.wait' = '60s',
- 'es.update.retry.on.conflict' = '5' ,
- 'es.resource' = 'customs_{self.catalog}_{self.database_name}-yearNeedReplace/_doc',
- 'es.mapping.id' = 'id',
- 'es.mapping.names' =
- '{mapping_prop}')'''
- )
- return ddl
- def make_2es_dml(self):
- ct_list = self.get_clos_with_type()[2:]
- yml_fields = self.yml_dict['transformer']['mapping']['fields'][2:]
- field_list = []
- filed_dict = {}
- handler_list = []
- for i in range(len(ct_list)):
- yml_field = yml_fields[i]
- field_tuple = self.get_field(yml_field)
- field = field_tuple[0]
- if ct_list[i][1] == 'string':
- field = f"merge_ws({field})"
- field_sql = f"{field} as `{field_tuple[1]}`"
- field_list.append(field_sql)
- filed_dict[field_tuple[1]] = field_tuple[0]
- if 'handler' in yml_field:
- if 'dict_handler' in yml_field['handler']:
- handler_list.append(yml_field)
- dim_join_sql = self.get_dim_join_sql(handler_list, filed_dict)
- dml_body = '\n\t, '.join(field_list)
- dml = (f'insert overwrite table to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]}'
- f'\nselect i.`id`'
- f"\n\t, concat(replace(from_unixtime((i.`date` / 1000) - 8 * 60 * 60, 'yyyy-MM-dd HH:mm:ss'),' ','T'),'Z') as `date`"
- f'\n\t, {dml_body}'
- f'\n\t, i.`dt`'
- f"\n\t, from_unixtime((i.`date` / 1000) - 8 * 60 * 60, 'yyyy') as `year_from_date`"
- f'\nfrom to_mongo.cts_{self.database_name}_{self.catalog_dict[self.catalog]} i'
- f'\n{dim_join_sql}'
- f'\nwhere i.dt = "dtNeedReplace"'
- )
- return dml
- def make_es_mapping_dml(self):
- clos_with_type = self.get_clos_with_type()
- clos = [f'i.`{c[0]}`' for c in clos_with_type]
- dml_body = '\n , '.join(clos)
- dml = (
- f'insert overwrite table to_es.es_cts_{self.database_name}_{self.catalog_dict[self.catalog]}_yearNeedReplace'
- f'\nselect {dml_body}'
- f'\nfrom to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]} i'
- '\nwhere dt = "dtNeedReplace" and year_from_date = "yearNeedReplace"')
- return dml
- def get_field(self, fields):
- name = fields["name"]
- field = ''
- if 'default' in fields and 'source' not in fields:
- field = f"'{fields['default']}'"
- if 'handler' in fields:
- if 'dict_handler' in fields['handler']:
- field = f'{fields["name"]}_dim.`value`'
- elif fields['handler'] == 'text_split_handler':
- d = fields['delimiter']
- if len(d) == 1:
- delimiter = d[0]
- field = f"array_distinct(split(i.`{fields['source']}`,'{delimiter}'))"
- else:
- delimiter = fields['delimiter'][1]
- delimiters = fields['delimiter'][2:-1]
- field = f"array_distinct(split(regexp_replace(i.`{fields['source']}`, '[{delimiters}]', '{delimiter}'),'{delimiter}'))"
- if 'source' in fields and 'handler' not in fields:
- source = fields['source']
- if isinstance(source, list):
- # source = [f"`i.{s}`" for s in source]
- source = [f"merge_ws(i.`{s}`)" for s in source]
- # field = f"coalesce({','.join(source)})"
- field = f"filter(array_distinct(array({','.join(source)})),x -> x is not null)"
- else:
- field = f"i.`{source}`"
- if 'default' in fields and 'source' in fields:
- field = f"coalesce({field}, '{fields['default']}')"
- return (field, name)
- def get_dim_join_sql(self, handler_list, filed_dict):
- sql_list = []
- for field in handler_list:
- handler = field['handler']
- dim = f'{field["name"]}_dim'
- source = field['source']
- if '__' in source:
- source = filed_dict.get(source.split('__')[1])
- else:
- source = f'i.`{source}`'
- if handler == 'country_dict_handler':
- sql_list.append(
- f'left join dim.redis_cts_country_dict as {dim} on {dim}.dt = "dtNeedReplace" and lower({source}) = {dim}.`field`')
- elif handler == 'state_dict_handler':
- sql_list.append(
- f'left join dim.redis_cts_state_dict as {dim} on {dim}.dt = "dtNeedReplace" and lower({source}) = {dim}.`field`')
- return '\n'.join(sql_list)
- def replace_sql(self, es_bak_ddl, es_mapping_ddl, es_bak_dml, data_source):
- if data_source == 'india_im':
- es_bak_ddl = es_bak_ddl.replace("`importerAddress` string,",
- "`importerAddress` array<string>,")
- es_bak_ddl = es_bak_ddl.replace("`exporterAddress` string,",
- "`exporterAddress` array<string>,")
- es_mapping_ddl = es_mapping_ddl.replace("`importerAddress` string,",
- "`importerAddress` array<string>,")
- es_mapping_ddl = es_mapping_ddl.replace("`exporterAddress` string,",
- "`exporterAddress` array<string>,")
- es_bak_dml = es_bak_dml.replace("merge_ws(i.`jksdz`) as `importerAddress`",
- "str_to_arr(i.`jksdz`) as `importerAddress`")
- es_bak_dml = es_bak_dml.replace("merge_ws(i.`cksdz`) as `exporterAddress`",
- "str_to_arr(i.`cksdz`) as `exporterAddress`")
- if data_source == 'america_im':
- es_bak_ddl = es_bak_ddl.replace("`importerAddress` string,",
- "`importerAddress` array<string>,")
- es_bak_ddl = es_bak_ddl.replace("`exporterAddress` string,",
- "`exporterAddress` array<string>,")
- es_bak_ddl = es_bak_ddl.replace("`notifyPartyAddress` string,",
- "`notifyPartyAddress` array<string>,")
- es_mapping_ddl = es_mapping_ddl.replace("`importerAddress` string,",
- "`importerAddress` array<string>,")
- es_mapping_ddl = es_mapping_ddl.replace("`exporterAddress` string,",
- "`exporterAddress` array<string>,")
- es_mapping_ddl = es_mapping_ddl.replace("`notifyPartyAddress` string,",
- "`notifyPartyAddress` array<string>,")
- es_bak_dml = es_bak_dml.replace("merge_ws(i.`shrdz`) as `importerAddress`",
- "str_to_arr(i.`shrdz`) as `importerAddress`")
- es_bak_dml = es_bak_dml.replace("merge_ws(i.`fhrdz`) as `exporterAddress`",
- "str_to_arr(i.`fhrdz`) as `exporterAddress`")
- es_bak_dml = es_bak_dml.replace("merge_ws(i.`tzrdz`) as `notifyPartyAddress`",
- "str_to_arr(i.`tzrdz`) as `notifyPartyAddress`")
- return es_bak_ddl, es_mapping_ddl, es_bak_dml
- if __name__ == '__main__':
- # es = EsDDLGen('exports', 'america')
- es = EsTmplGen('imports', 'america')
- print('\n\n--2es_ddl-------------------------------------------------------')
- print(es.make_2es_ddl())
- print('\n\n--es_mapping_ddl-------------------------------------------------------')
- print(es.make_es_mapping_ddl())
- print('\n\n--2es_dml-------------------------------------------------------')
- print(es.make_2es_dml())
- print('\n\n--es_mapping_dml-------------------------------------------------------')
- print(es.make_es_mapping_dml())
|