es_operator.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import sys
  2. import os
  3. import re
  4. abspath = os.path.abspath(__file__)
  5. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  6. sys.path.append(root_path)
  7. import json
  8. from elasticsearch import Elasticsearch
  9. from elasticsearch.exceptions import NotFoundError
  10. from dw_base import NORM_CYN, NORM_RED, NORM_BLU, NORM_MGT
  11. class ESOperator:
  12. def __init__(self, host, port, timeout=30):
  13. self.es = Elasticsearch([{'host': host, 'port': port}], timeout=timeout)
  14. def get_all_indices(self):
  15. try:
  16. indices = self.es.indices.get_alias("*").keys()
  17. return list(indices)
  18. except Exception as e:
  19. print("Error:", e)
  20. return []
  21. def get_cts_indices(self, catalog, database_name):
  22. indices = self.get_all_indices()
  23. return [index for index in indices if catalog in index and database_name in index]
  24. def get_aliases_for_index(self, index_name):
  25. aliases = self.es.indices.get_alias(index=index_name)
  26. return list(aliases[index_name]['aliases'].keys())
  27. def add_alias_to_index(self, index_name, alias_name):
  28. self.es.indices.put_alias(index=index_name, name=alias_name)
  29. def get_indices_by_alias(self, alias_name):
  30. result = self.es.indices.get_alias(name=alias_name)
  31. return list(result.keys())
  32. def get_random_documents(self, index_name, size=10):
  33. try:
  34. # 构造随机排序查询
  35. query_body = {
  36. "size": size,
  37. "query": {
  38. "function_score": {
  39. "query": {"match_all": {}},
  40. "random_score": {}
  41. }
  42. }
  43. }
  44. result = self.es.search(index=index_name, body=query_body)
  45. return result['hits']['hits']
  46. except Exception as e:
  47. print("Error:", e)
  48. return []
  49. def get_random_doc_with_id(self, index_name, size=10):
  50. doc_list = self.get_random_documents(index_name, size)
  51. return {d['_id']: d['_source'] for d in doc_list}
  52. # 注意!此方法为异步方法,要想获取任务执行结果需配合get_task_status方法使用
  53. def reindex(self, source_index, target_index):
  54. body = {
  55. "source": {
  56. "index": source_index
  57. },
  58. "dest": {
  59. "index": target_index
  60. }
  61. }
  62. response = self.es.reindex(body=body, wait_for_completion=False)
  63. return response
  64. def get_task_status(self, task_id):
  65. try:
  66. task_info = self.es.tasks.get(task_id)
  67. return task_info
  68. except NotFoundError:
  69. return None
  70. def get_data_from_ids(self, index_name, id_list):
  71. doc_list = [self.es.get(index=index_name, id=id) for id in id_list]
  72. return {d['_id']: d['_source'] for d in doc_list}
  73. def delete_index(self, index_name):
  74. try:
  75. response = self.es.indices.delete(index=index_name, ignore=[400, 404])
  76. if response['acknowledged']:
  77. print(f"Index '{index_name}' deleted successfully.")
  78. else:
  79. print(f"Failed to delete index '{index_name}'.")
  80. except Exception as e:
  81. print("Error:", e)
  82. def dict_diff(self, old_dict, new_dict):
  83. old_keys = set(old_dict.keys())
  84. new_keys = set(new_dict.keys())
  85. old_only_keys = old_keys - new_keys
  86. new_only_keys = new_keys - old_keys
  87. common_keys = old_keys & new_keys
  88. if old_only_keys:
  89. print(f"{NORM_CYN} old_only_keys:")
  90. for key in old_only_keys:
  91. print(f"{NORM_BLU} {key} :{new_dict[key]}")
  92. if new_only_keys:
  93. print(f"{NORM_CYN} new_only_keys:")
  94. for key in new_only_keys:
  95. print(f"{NORM_BLU} {key} :{new_dict[key]}")
  96. diff_data = {}
  97. for key in common_keys:
  98. if old_dict[key] != new_dict[key] or type(old_dict[key]) != type(new_dict[key]):
  99. diff_data[key] = (old_dict[key], new_dict[key])
  100. if diff_data:
  101. print(f"{NORM_CYN} diff_data:")
  102. for key in diff_data:
  103. print(f"{NORM_BLU} {key}: ")
  104. print(
  105. f"{NORM_RED} value:{NORM_MGT} {diff_data[key][0]}{NORM_RED} -> {NORM_MGT}{diff_data[key][1]}")
  106. print(
  107. f"{NORM_RED} type:{NORM_MGT} {type(old_dict[key])}{NORM_RED} -> {NORM_MGT}{type(new_dict[key])}")
  108. def get_data_from_id(self, index_name, id):
  109. response = self.es.get(index=index_name, id=id)
  110. return response['_source']
  111. def create_index_from_json(self, index_name, settings_and_mappings):
  112. try:
  113. self.es.indices.create(index=index_name, body=settings_and_mappings)
  114. print(f"Index '{index_name}' created successfully.")
  115. except Exception as e:
  116. print(f"Error creating index '{index_name}':", e)
  117. def create_index(self, index_name):
  118. try:
  119. if self.es.indices.exists(index=index_name):
  120. print(f"Index '{index_name}' already exists.")
  121. return False
  122. self.es.indices.create(index=index_name)
  123. print(f"Index '{index_name}' created successfully.")
  124. return True
  125. except Exception as e:
  126. print(f"Error creating index '{index_name}': {e}")
  127. return False
  128. def get_index_document_count(self, index_name):
  129. try:
  130. result = self.es.count(index=index_name)
  131. return result['count']
  132. except Exception as e:
  133. print("Error:", e)
  134. return None
  135. def random_diff(self, new_index, old_index):
  136. new_dicts = self.get_random_doc_with_id(new_index)
  137. id_list = [id for id in new_dicts.keys()]
  138. old_dicts = self.get_data_from_ids(old_index, id_list)
  139. for id in id_list:
  140. print(f'【id:{id}】------------------------------------------------------')
  141. self.dict_diff(old_dicts[id], new_dicts[id])
  142. def refresh(self, index):
  143. if not index:
  144. raise ValueError("Index name must be specified.")
  145. try:
  146. self.es.indices.refresh(index=index)
  147. print(f"Index {index} refreshed.")
  148. except Exception as e:
  149. print("Error during refresh:", e)
  150. # 此方法很重,不建议在生产环境中使用(可能导致线上性能下降)
  151. def flush(self, index):
  152. if not index:
  153. raise ValueError("Index name must be specified.")
  154. try:
  155. self.es.indices.flush(index=index)
  156. print(f"Index {index} flushed.")
  157. except Exception as e:
  158. print("Error during flush:", e)
  159. if __name__ == '__main__':
  160. # es_operator = ESOperator('192.168.0.200', 9201)
  161. es_operator = ESOperator('192.168.11.99', 9005)
  162. es_operator.get_data_from_id('corp','b7730f7f75f47296e9261eb5934b140a')
  163. # es_operator.refresh('customs_imports_venezuela-2020test')
  164. # es_operator.refresh('customs_exports_pakistan-2020test')
  165. es_operator.random_diff('customs_exports_mexico-2020test', 'customs_exports_mexico-2020')
  166. # print(es_operator.get_cts_indices('exports', 'kazakhstan'))
  167. # es_operator.add_alias_to_index('customs_exports_kazakhstan-2023-ctytest', 'cts_kazakhstan_ex-2023-ctytest')
  168. # print(es_operator.get_aliases_for_index('customs_exports_kazakhstan-2023-ctytest'))
  169. # print(es_operator.get_indices_by_alias('cts_kazakhstan_ex-2023-ctytest'))
  170. # es_operator.reindex('customs_exports_kazakhstan-2023','customs_exports_kazakhstan-2023-bak')
  171. # old_dict = es_operator.get_random_doc_with_id('customs_exports_kazakhstan-2023-bak')
  172. # id_list = [id for id in old_dict.keys()]
  173. # new_dict = es_operator.get_data_from_ids('customs_exports_kazakhstan-2023-ctytest', id_list)
  174. # for id in id_list:
  175. # print(f'【id:{id}】------------------------------------------------------')
  176. # print(old_dict[id])
  177. # print(new_dict[id])
  178. # es_operator.dict_diff( old_dict[id],new_dict[id])
  179. # old = es_operator.get_data_from_id('customs_exports_kazakhstan-2023-benchmark', '656d8f637e0d39686b8206e2')
  180. # new = es_operator.get_data_from_id('customs_exports_kazakhstan-2023-bak', '656d8f637e0d39686b8206e2')
  181. # print(old['exporterOrig'])
  182. # print(new['exporterOrig'])
  183. # rp = es_operator.reindex('customs_exports_kazakhstan-2023-ctytest','customs_exports_kazakhstan-2023-bak')
  184. # print(rp)
  185. # es_operator.delete_index('customs_exports_kazakhstan-2023-bak')