| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- import sys
- import os
- import re
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- import json
- from elasticsearch import Elasticsearch
- from elasticsearch.exceptions import NotFoundError
- from dw_base import NORM_CYN, NORM_RED, NORM_BLU, NORM_MGT
- class ESOperator:
- def __init__(self, host, port, timeout=30):
- self.es = Elasticsearch([{'host': host, 'port': port}], timeout=timeout)
- def get_all_indices(self):
- try:
- indices = self.es.indices.get_alias("*").keys()
- return list(indices)
- except Exception as e:
- print("Error:", e)
- return []
- def get_cts_indices(self, catalog, database_name):
- indices = self.get_all_indices()
- return [index for index in indices if catalog in index and database_name in index]
- def get_aliases_for_index(self, index_name):
- aliases = self.es.indices.get_alias(index=index_name)
- return list(aliases[index_name]['aliases'].keys())
- def add_alias_to_index(self, index_name, alias_name):
- self.es.indices.put_alias(index=index_name, name=alias_name)
- def get_indices_by_alias(self, alias_name):
- result = self.es.indices.get_alias(name=alias_name)
- return list(result.keys())
- def get_random_documents(self, index_name, size=10):
- try:
- # 构造随机排序查询
- query_body = {
- "size": size,
- "query": {
- "function_score": {
- "query": {"match_all": {}},
- "random_score": {}
- }
- }
- }
- result = self.es.search(index=index_name, body=query_body)
- return result['hits']['hits']
- except Exception as e:
- print("Error:", e)
- return []
- def get_random_doc_with_id(self, index_name, size=10):
- doc_list = self.get_random_documents(index_name, size)
- return {d['_id']: d['_source'] for d in doc_list}
- # 注意!此方法为异步方法,要想获取任务执行结果需配合get_task_status方法使用
- def reindex(self, source_index, target_index):
- body = {
- "source": {
- "index": source_index
- },
- "dest": {
- "index": target_index
- }
- }
- response = self.es.reindex(body=body, wait_for_completion=False)
- return response
- def get_task_status(self, task_id):
- try:
- task_info = self.es.tasks.get(task_id)
- return task_info
- except NotFoundError:
- return None
- def get_data_from_ids(self, index_name, id_list):
- doc_list = [self.es.get(index=index_name, id=id) for id in id_list]
- return {d['_id']: d['_source'] for d in doc_list}
- def delete_index(self, index_name):
- try:
- response = self.es.indices.delete(index=index_name, ignore=[400, 404])
- if response['acknowledged']:
- print(f"Index '{index_name}' deleted successfully.")
- else:
- print(f"Failed to delete index '{index_name}'.")
- except Exception as e:
- print("Error:", e)
- def dict_diff(self, old_dict, new_dict):
- old_keys = set(old_dict.keys())
- new_keys = set(new_dict.keys())
- old_only_keys = old_keys - new_keys
- new_only_keys = new_keys - old_keys
- common_keys = old_keys & new_keys
- if old_only_keys:
- print(f"{NORM_CYN} old_only_keys:")
- for key in old_only_keys:
- print(f"{NORM_BLU} {key} :{new_dict[key]}")
- if new_only_keys:
- print(f"{NORM_CYN} new_only_keys:")
- for key in new_only_keys:
- print(f"{NORM_BLU} {key} :{new_dict[key]}")
- diff_data = {}
- for key in common_keys:
- if old_dict[key] != new_dict[key] or type(old_dict[key]) != type(new_dict[key]):
- diff_data[key] = (old_dict[key], new_dict[key])
- if diff_data:
- print(f"{NORM_CYN} diff_data:")
- for key in diff_data:
- print(f"{NORM_BLU} {key}: ")
- print(
- f"{NORM_RED} value:{NORM_MGT} {diff_data[key][0]}{NORM_RED} -> {NORM_MGT}{diff_data[key][1]}")
- print(
- f"{NORM_RED} type:{NORM_MGT} {type(old_dict[key])}{NORM_RED} -> {NORM_MGT}{type(new_dict[key])}")
- def get_data_from_id(self, index_name, id):
- response = self.es.get(index=index_name, id=id)
- return response['_source']
- def create_index_from_json(self, index_name, settings_and_mappings):
- try:
- self.es.indices.create(index=index_name, body=settings_and_mappings)
- print(f"Index '{index_name}' created successfully.")
- except Exception as e:
- print(f"Error creating index '{index_name}':", e)
- def create_index(self, index_name):
- try:
- if self.es.indices.exists(index=index_name):
- print(f"Index '{index_name}' already exists.")
- return False
- self.es.indices.create(index=index_name)
- print(f"Index '{index_name}' created successfully.")
- return True
- except Exception as e:
- print(f"Error creating index '{index_name}': {e}")
- return False
- def get_index_document_count(self, index_name):
- try:
- result = self.es.count(index=index_name)
- return result['count']
- except Exception as e:
- print("Error:", e)
- return None
- def random_diff(self, new_index, old_index):
- new_dicts = self.get_random_doc_with_id(new_index)
- id_list = [id for id in new_dicts.keys()]
- old_dicts = self.get_data_from_ids(old_index, id_list)
- for id in id_list:
- print(f'【id:{id}】------------------------------------------------------')
- self.dict_diff(old_dicts[id], new_dicts[id])
- def refresh(self, index):
- if not index:
- raise ValueError("Index name must be specified.")
- try:
- self.es.indices.refresh(index=index)
- print(f"Index {index} refreshed.")
- except Exception as e:
- print("Error during refresh:", e)
- # 此方法很重,不建议在生产环境中使用(可能导致线上性能下降)
- def flush(self, index):
- if not index:
- raise ValueError("Index name must be specified.")
- try:
- self.es.indices.flush(index=index)
- print(f"Index {index} flushed.")
- except Exception as e:
- print("Error during flush:", e)
- if __name__ == '__main__':
- # es_operator = ESOperator('192.168.0.200', 9201)
- es_operator = ESOperator('192.168.11.99', 9005)
- es_operator.get_data_from_id('corp','b7730f7f75f47296e9261eb5934b140a')
- # es_operator.refresh('customs_imports_venezuela-2020test')
- # es_operator.refresh('customs_exports_pakistan-2020test')
- es_operator.random_diff('customs_exports_mexico-2020test', 'customs_exports_mexico-2020')
- # print(es_operator.get_cts_indices('exports', 'kazakhstan'))
- # es_operator.add_alias_to_index('customs_exports_kazakhstan-2023-ctytest', 'cts_kazakhstan_ex-2023-ctytest')
- # print(es_operator.get_aliases_for_index('customs_exports_kazakhstan-2023-ctytest'))
- # print(es_operator.get_indices_by_alias('cts_kazakhstan_ex-2023-ctytest'))
- # es_operator.reindex('customs_exports_kazakhstan-2023','customs_exports_kazakhstan-2023-bak')
- # old_dict = es_operator.get_random_doc_with_id('customs_exports_kazakhstan-2023-bak')
- # id_list = [id for id in old_dict.keys()]
- # new_dict = es_operator.get_data_from_ids('customs_exports_kazakhstan-2023-ctytest', id_list)
- # for id in id_list:
- # print(f'【id:{id}】------------------------------------------------------')
- # print(old_dict[id])
- # print(new_dict[id])
- # es_operator.dict_diff( old_dict[id],new_dict[id])
- # old = es_operator.get_data_from_id('customs_exports_kazakhstan-2023-benchmark', '656d8f637e0d39686b8206e2')
- # new = es_operator.get_data_from_id('customs_exports_kazakhstan-2023-bak', '656d8f637e0d39686b8206e2')
- # print(old['exporterOrig'])
- # print(new['exporterOrig'])
- # rp = es_operator.reindex('customs_exports_kazakhstan-2023-ctytest','customs_exports_kazakhstan-2023-bak')
- # print(rp)
- # es_operator.delete_index('customs_exports_kazakhstan-2023-bak')
|