| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- import sys
- import os
- import re
- # 配置参数示例 -catalog=imports -database_name=venezuela_bol -year=2023
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- import sys
- from time import sleep
- from dw_base.scheduler.mg2es.conf_reader import ConfReader
- from dw_base.scheduler.mg2es.es_operator import ESOperator
- from dw_base.utils.config_utils import parse_args
- if __name__ == '__main__':
- CONFIG, _ = parse_args(sys.argv[1:])
- catalog = CONFIG.get('catalog')
- database_name = CONFIG.get('database_name')
- env = CONFIG.get('env','test')
- host='192.168.0.200'
- port='9201'
- if env == 'prod':
- host = '192.168.11.100'
- port = '9003'
- year = CONFIG.get('year')
- bak_suffix = CONFIG.get('bak_suffix','bak')
- es_operator = ESOperator(host, port)
- index_name = f'customs_{catalog}_{database_name}-{year}'
- bak_index_name = f'{index_name}-{bak_suffix}'
- es_operator.create_index(bak_index_name)
- task_id = es_operator.reindex(index_name, bak_index_name)['task']
- total_time = 0
- while True:
- sleep(60)
- total_time += 60
- task_info = es_operator.get_task_status(task_id)
- if task_info['completed'] == True:
- print('迁移完成--------------------------')
- print(f'迁移耗时:{total_time}秒')
- cnt = es_operator.get_index_document_count(bak_index_name)
- print(f'迁移文档数:{cnt}')
- break
- else:
- print('迁移中----------------------------')
- print(task_info)
- # es_operator.delete_index(index_name)
|