es_tmpl_gen.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import sys
  2. import os
  3. import re
  4. abspath = os.path.abspath(__file__)
  5. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  6. sys.path.append(root_path)
  7. import json
  8. from dw_base.scheduler.mg2es.path_util import PathUtil
  9. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  10. class EsTmplGen:
  11. def __init__(self, catalog, database_name):
  12. self.catalog = catalog
  13. self.database_name = database_name
  14. es_json_path, mg2es_mapping_path = PathUtil.get_conf_abspath(catalog, database_name)
  15. conf_reader = ConfReader()
  16. self.yml_dict = conf_reader.get_yml_data(mg2es_mapping_path)
  17. self.es_json = conf_reader.get_json_data(es_json_path)
  18. self.type_dict = {
  19. 'date': 'string',
  20. 'text': 'string',
  21. 'keyword': 'string',
  22. 'scaled_float': 'double',
  23. }
  24. self.catalog_dict = {
  25. 'exports': 'ex',
  26. 'imports': 'im'}
  27. def get_clos_with_type(self):
  28. yml_fields = self.yml_dict['transformer']['mapping']['fields']
  29. for field in yml_fields:
  30. if field.get('name') == 'productTag':
  31. yml_fields.remove(field)
  32. yml_clos = [c['name'] for c in yml_fields]
  33. # 将_id替换成id
  34. yml_clos[0] = 'id'
  35. # 数组类型:字段handler为text_split_handler,或source类型为数组
  36. arr_clos = [c['name'] for c in yml_fields if
  37. c.get('handler') == 'text_split_handler' or isinstance(c.get('source'), list)]
  38. # print(f'全字段:{yml_clos}')
  39. # print(f'数组类型字段:{arr_clos}')
  40. # 创建一个新的列表,用于存放键值对
  41. json_list = {}
  42. # 遍历 JSON 字典的键值对
  43. for key, value in self.es_json['mappings']['properties'].items():
  44. # 提取键和 type 的值,并组成新的键值对
  45. json_list[key] = value['type']
  46. # print(f'es类型:{json_list}')
  47. res_list = []
  48. for c in yml_clos:
  49. if c in arr_clos:
  50. res_list.append((c, 'array<string>'))
  51. elif c in json_list:
  52. res_list.append((c, self.type_dict[json_list[c]]))
  53. else:
  54. res_list.append((c, 'string'))
  55. return res_list
  56. def make_ddl_body(self):
  57. clos_with_type = self.get_clos_with_type()
  58. clos_len = [len(c[0]) for c in clos_with_type]
  59. max_len = max(clos_len) + 2
  60. formatted_clos = ['\t{:<{width}} {}'.format(f'`{c[0]}`', c[1], width=max_len) for c in clos_with_type]
  61. clos_str = ",\n".join(formatted_clos)
  62. return clos_str
  63. def make_2es_ddl(self):
  64. clos_str = self.make_ddl_body()
  65. ddl = (f'create table to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]}\n'
  66. f'(\n'
  67. f'{clos_str}'
  68. f'\n) PARTITIONED BY ( `dt` string,year_from_date string) \n'
  69. f'\tSTORED AS ORC'
  70. )
  71. return ddl
  72. def make_es_mapping_ddl(self):
  73. clos_str = self.make_ddl_body()
  74. clos = [f'{ct[0]}:{ct[0]}' for ct in self.get_clos_with_type()]
  75. clos = clos[1:]
  76. mapping_prop = ','.join(clos)
  77. ddl = (
  78. f'create external table if not exists to_es.es_cts_{self.database_name}_{self.catalog_dict[self.catalog]}_yearNeedReplace\n'
  79. f'(\n'
  80. f'{clos_str}'
  81. f"\n) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' \n"
  82. f'\tTBLPROPERTIES ('
  83. f'''\n'es.nodes' = '192.168.11.100',
  84. 'es.port' = '9003',
  85. 'es.http.timeout' = '100m',
  86. 'es.input.use.sliced.partitions' = 'false',
  87. 'es.input.json' = 'false',
  88. 'es.index.auto.create' = 'true',
  89. 'es.write.operation' = 'upsert',
  90. 'es.mapping.date.rich' = 'false',
  91. 'es.batch.write.refresh' = 'false',
  92. 'es.batch.size.bytes' = '60mb',
  93. 'es.batch.size.entries' = '5000',
  94. 'es.batch.write.retry.count' = '10',
  95. 'es.batch.write.retry.wait' = '60s',
  96. 'es.update.retry.on.conflict' = '5' ,
  97. 'es.resource' = 'customs_{self.catalog}_{self.database_name}-yearNeedReplace/_doc',
  98. 'es.mapping.id' = 'id',
  99. 'es.mapping.names' =
  100. '{mapping_prop}')'''
  101. )
  102. return ddl
  103. def make_2es_dml(self):
  104. ct_list = self.get_clos_with_type()[2:]
  105. yml_fields = self.yml_dict['transformer']['mapping']['fields'][2:]
  106. field_list = []
  107. filed_dict = {}
  108. handler_list = []
  109. for i in range(len(ct_list)):
  110. yml_field = yml_fields[i]
  111. field_tuple = self.get_field(yml_field)
  112. field = field_tuple[0]
  113. if ct_list[i][1] == 'string':
  114. field = f"merge_ws({field})"
  115. field_sql = f"{field} as `{field_tuple[1]}`"
  116. field_list.append(field_sql)
  117. filed_dict[field_tuple[1]] = field_tuple[0]
  118. if 'handler' in yml_field:
  119. if 'dict_handler' in yml_field['handler']:
  120. handler_list.append(yml_field)
  121. dim_join_sql = self.get_dim_join_sql(handler_list, filed_dict)
  122. dml_body = '\n\t, '.join(field_list)
  123. dml = (f'insert overwrite table to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]}'
  124. f'\nselect i.`id`'
  125. f"\n\t, concat(replace(from_unixtime((i.`date` / 1000) - 8 * 60 * 60, 'yyyy-MM-dd HH:mm:ss'),' ','T'),'Z') as `date`"
  126. f'\n\t, {dml_body}'
  127. f'\n\t, i.`dt`'
  128. f"\n\t, from_unixtime((i.`date` / 1000) - 8 * 60 * 60, 'yyyy') as `year_from_date`"
  129. f'\nfrom to_mongo.cts_{self.database_name}_{self.catalog_dict[self.catalog]} i'
  130. f'\n{dim_join_sql}'
  131. f'\nwhere i.dt = "dtNeedReplace"'
  132. )
  133. return dml
  134. def make_es_mapping_dml(self):
  135. clos_with_type = self.get_clos_with_type()
  136. clos = [f'i.`{c[0]}`' for c in clos_with_type]
  137. dml_body = '\n , '.join(clos)
  138. dml = (
  139. f'insert overwrite table to_es.es_cts_{self.database_name}_{self.catalog_dict[self.catalog]}_yearNeedReplace'
  140. f'\nselect {dml_body}'
  141. f'\nfrom to_es.cts_{self.database_name}_{self.catalog_dict[self.catalog]} i'
  142. '\nwhere dt = "dtNeedReplace" and year_from_date = "yearNeedReplace"')
  143. return dml
  144. def get_field(self, fields):
  145. name = fields["name"]
  146. field = ''
  147. if 'default' in fields and 'source' not in fields:
  148. field = f"'{fields['default']}'"
  149. if 'handler' in fields:
  150. if 'dict_handler' in fields['handler']:
  151. field = f'{fields["name"]}_dim.`value`'
  152. elif fields['handler'] == 'text_split_handler':
  153. d = fields['delimiter']
  154. if len(d) == 1:
  155. delimiter = d[0]
  156. field = f"array_distinct(split(i.`{fields['source']}`,'{delimiter}'))"
  157. else:
  158. delimiter = fields['delimiter'][1]
  159. delimiters = fields['delimiter'][2:-1]
  160. field = f"array_distinct(split(regexp_replace(i.`{fields['source']}`, '[{delimiters}]', '{delimiter}'),'{delimiter}'))"
  161. if 'source' in fields and 'handler' not in fields:
  162. source = fields['source']
  163. if isinstance(source, list):
  164. # source = [f"`i.{s}`" for s in source]
  165. source = [f"merge_ws(i.`{s}`)" for s in source]
  166. # field = f"coalesce({','.join(source)})"
  167. field = f"filter(array_distinct(array({','.join(source)})),x -> x is not null)"
  168. else:
  169. field = f"i.`{source}`"
  170. if 'default' in fields and 'source' in fields:
  171. field = f"coalesce({field}, '{fields['default']}')"
  172. return (field, name)
  173. def get_dim_join_sql(self, handler_list, filed_dict):
  174. sql_list = []
  175. for field in handler_list:
  176. handler = field['handler']
  177. dim = f'{field["name"]}_dim'
  178. source = field['source']
  179. if '__' in source:
  180. source = filed_dict.get(source.split('__')[1])
  181. else:
  182. source = f'i.`{source}`'
  183. if handler == 'country_dict_handler':
  184. sql_list.append(
  185. f'left join dim.redis_cts_country_dict as {dim} on {dim}.dt = "dtNeedReplace" and lower({source}) = {dim}.`field`')
  186. elif handler == 'state_dict_handler':
  187. sql_list.append(
  188. f'left join dim.redis_cts_state_dict as {dim} on {dim}.dt = "dtNeedReplace" and lower({source}) = {dim}.`field`')
  189. return '\n'.join(sql_list)
  190. def replace_sql(self, es_bak_ddl, es_mapping_ddl, es_bak_dml, data_source):
  191. if data_source == 'india_im':
  192. es_bak_ddl = es_bak_ddl.replace("`importerAddress` string,",
  193. "`importerAddress` array<string>,")
  194. es_bak_ddl = es_bak_ddl.replace("`exporterAddress` string,",
  195. "`exporterAddress` array<string>,")
  196. es_mapping_ddl = es_mapping_ddl.replace("`importerAddress` string,",
  197. "`importerAddress` array<string>,")
  198. es_mapping_ddl = es_mapping_ddl.replace("`exporterAddress` string,",
  199. "`exporterAddress` array<string>,")
  200. es_bak_dml = es_bak_dml.replace("merge_ws(i.`jksdz`) as `importerAddress`",
  201. "str_to_arr(i.`jksdz`) as `importerAddress`")
  202. es_bak_dml = es_bak_dml.replace("merge_ws(i.`cksdz`) as `exporterAddress`",
  203. "str_to_arr(i.`cksdz`) as `exporterAddress`")
  204. if data_source == 'america_im':
  205. es_bak_ddl = es_bak_ddl.replace("`importerAddress` string,",
  206. "`importerAddress` array<string>,")
  207. es_bak_ddl = es_bak_ddl.replace("`exporterAddress` string,",
  208. "`exporterAddress` array<string>,")
  209. es_bak_ddl = es_bak_ddl.replace("`notifyPartyAddress` string,",
  210. "`notifyPartyAddress` array<string>,")
  211. es_mapping_ddl = es_mapping_ddl.replace("`importerAddress` string,",
  212. "`importerAddress` array<string>,")
  213. es_mapping_ddl = es_mapping_ddl.replace("`exporterAddress` string,",
  214. "`exporterAddress` array<string>,")
  215. es_mapping_ddl = es_mapping_ddl.replace("`notifyPartyAddress` string,",
  216. "`notifyPartyAddress` array<string>,")
  217. es_bak_dml = es_bak_dml.replace("merge_ws(i.`shrdz`) as `importerAddress`",
  218. "str_to_arr(i.`shrdz`) as `importerAddress`")
  219. es_bak_dml = es_bak_dml.replace("merge_ws(i.`fhrdz`) as `exporterAddress`",
  220. "str_to_arr(i.`fhrdz`) as `exporterAddress`")
  221. es_bak_dml = es_bak_dml.replace("merge_ws(i.`tzrdz`) as `notifyPartyAddress`",
  222. "str_to_arr(i.`tzrdz`) as `notifyPartyAddress`")
  223. return es_bak_ddl, es_mapping_ddl, es_bak_dml
  224. if __name__ == '__main__':
  225. # es = EsDDLGen('exports', 'america')
  226. es = EsTmplGen('imports', 'america')
  227. print('\n\n--2es_ddl-------------------------------------------------------')
  228. print(es.make_2es_ddl())
  229. print('\n\n--es_mapping_ddl-------------------------------------------------------')
  230. print(es.make_es_mapping_ddl())
  231. print('\n\n--2es_dml-------------------------------------------------------')
  232. print(es.make_2es_dml())
  233. print('\n\n--es_mapping_dml-------------------------------------------------------')
  234. print(es.make_es_mapping_dml())