import sys import os import re abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) import json from elasticsearch import Elasticsearch from elasticsearch.exceptions import NotFoundError from dw_base import NORM_CYN, NORM_RED, NORM_BLU, NORM_MGT class ESOperator: def __init__(self, host, port, timeout=30): self.es = Elasticsearch([{'host': host, 'port': port}], timeout=timeout) def get_all_indices(self): try: indices = self.es.indices.get_alias("*").keys() return list(indices) except Exception as e: print("Error:", e) return [] def get_cts_indices(self, catalog, database_name): indices = self.get_all_indices() return [index for index in indices if catalog in index and database_name in index] def get_aliases_for_index(self, index_name): aliases = self.es.indices.get_alias(index=index_name) return list(aliases[index_name]['aliases'].keys()) def add_alias_to_index(self, index_name, alias_name): self.es.indices.put_alias(index=index_name, name=alias_name) def get_indices_by_alias(self, alias_name): result = self.es.indices.get_alias(name=alias_name) return list(result.keys()) def get_random_documents(self, index_name, size=10): try: # 构造随机排序查询 query_body = { "size": size, "query": { "function_score": { "query": {"match_all": {}}, "random_score": {} } } } result = self.es.search(index=index_name, body=query_body) return result['hits']['hits'] except Exception as e: print("Error:", e) return [] def get_random_doc_with_id(self, index_name, size=10): doc_list = self.get_random_documents(index_name, size) return {d['_id']: d['_source'] for d in doc_list} # 注意!此方法为异步方法,要想获取任务执行结果需配合get_task_status方法使用 def reindex(self, source_index, target_index): body = { "source": { "index": source_index }, "dest": { "index": target_index } } response = self.es.reindex(body=body, wait_for_completion=False) return response def get_task_status(self, task_id): try: task_info = self.es.tasks.get(task_id) return task_info except NotFoundError: return None def get_data_from_ids(self, index_name, id_list): doc_list = [self.es.get(index=index_name, id=id) for id in id_list] return {d['_id']: d['_source'] for d in doc_list} def delete_index(self, index_name): try: response = self.es.indices.delete(index=index_name, ignore=[400, 404]) if response['acknowledged']: print(f"Index '{index_name}' deleted successfully.") else: print(f"Failed to delete index '{index_name}'.") except Exception as e: print("Error:", e) def dict_diff(self, old_dict, new_dict): old_keys = set(old_dict.keys()) new_keys = set(new_dict.keys()) old_only_keys = old_keys - new_keys new_only_keys = new_keys - old_keys common_keys = old_keys & new_keys if old_only_keys: print(f"{NORM_CYN} old_only_keys:") for key in old_only_keys: print(f"{NORM_BLU} {key} :{new_dict[key]}") if new_only_keys: print(f"{NORM_CYN} new_only_keys:") for key in new_only_keys: print(f"{NORM_BLU} {key} :{new_dict[key]}") diff_data = {} for key in common_keys: if old_dict[key] != new_dict[key] or type(old_dict[key]) != type(new_dict[key]): diff_data[key] = (old_dict[key], new_dict[key]) if diff_data: print(f"{NORM_CYN} diff_data:") for key in diff_data: print(f"{NORM_BLU} {key}: ") print( f"{NORM_RED} value:{NORM_MGT} {diff_data[key][0]}{NORM_RED} -> {NORM_MGT}{diff_data[key][1]}") print( f"{NORM_RED} type:{NORM_MGT} {type(old_dict[key])}{NORM_RED} -> {NORM_MGT}{type(new_dict[key])}") def get_data_from_id(self, index_name, id): response = self.es.get(index=index_name, id=id) return response['_source'] def create_index_from_json(self, index_name, settings_and_mappings): try: self.es.indices.create(index=index_name, body=settings_and_mappings) print(f"Index '{index_name}' created successfully.") except Exception as e: print(f"Error creating index '{index_name}':", e) def create_index(self, index_name): try: if self.es.indices.exists(index=index_name): print(f"Index '{index_name}' already exists.") return False self.es.indices.create(index=index_name) print(f"Index '{index_name}' created successfully.") return True except Exception as e: print(f"Error creating index '{index_name}': {e}") return False def get_index_document_count(self, index_name): try: result = self.es.count(index=index_name) return result['count'] except Exception as e: print("Error:", e) return None def random_diff(self, new_index, old_index): new_dicts = self.get_random_doc_with_id(new_index) id_list = [id for id in new_dicts.keys()] old_dicts = self.get_data_from_ids(old_index, id_list) for id in id_list: print(f'【id:{id}】------------------------------------------------------') self.dict_diff(old_dicts[id], new_dicts[id]) def refresh(self, index): if not index: raise ValueError("Index name must be specified.") try: self.es.indices.refresh(index=index) print(f"Index {index} refreshed.") except Exception as e: print("Error during refresh:", e) # 此方法很重,不建议在生产环境中使用(可能导致线上性能下降) def flush(self, index): if not index: raise ValueError("Index name must be specified.") try: self.es.indices.flush(index=index) print(f"Index {index} flushed.") except Exception as e: print("Error during flush:", e) if __name__ == '__main__': # es_operator = ESOperator('192.168.0.200', 9201) es_operator = ESOperator('192.168.11.99', 9005) es_operator.get_data_from_id('corp','b7730f7f75f47296e9261eb5934b140a') # es_operator.refresh('customs_imports_venezuela-2020test') # es_operator.refresh('customs_exports_pakistan-2020test') es_operator.random_diff('customs_exports_mexico-2020test', 'customs_exports_mexico-2020') # print(es_operator.get_cts_indices('exports', 'kazakhstan')) # es_operator.add_alias_to_index('customs_exports_kazakhstan-2023-ctytest', 'cts_kazakhstan_ex-2023-ctytest') # print(es_operator.get_aliases_for_index('customs_exports_kazakhstan-2023-ctytest')) # print(es_operator.get_indices_by_alias('cts_kazakhstan_ex-2023-ctytest')) # es_operator.reindex('customs_exports_kazakhstan-2023','customs_exports_kazakhstan-2023-bak') # old_dict = es_operator.get_random_doc_with_id('customs_exports_kazakhstan-2023-bak') # id_list = [id for id in old_dict.keys()] # new_dict = es_operator.get_data_from_ids('customs_exports_kazakhstan-2023-ctytest', id_list) # for id in id_list: # print(f'【id:{id}】------------------------------------------------------') # print(old_dict[id]) # print(new_dict[id]) # es_operator.dict_diff( old_dict[id],new_dict[id]) # old = es_operator.get_data_from_id('customs_exports_kazakhstan-2023-benchmark', '656d8f637e0d39686b8206e2') # new = es_operator.get_data_from_id('customs_exports_kazakhstan-2023-bak', '656d8f637e0d39686b8206e2') # print(old['exporterOrig']) # print(new['exporterOrig']) # rp = es_operator.reindex('customs_exports_kazakhstan-2023-ctytest','customs_exports_kazakhstan-2023-bak') # print(rp) # es_operator.delete_index('customs_exports_kazakhstan-2023-bak')