es_index_backup.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import sys
  2. import os
  3. import re
  4. # 配置参数示例 -catalog=imports -database_name=venezuela_bol -year=2023
  5. abspath = os.path.abspath(__file__)
  6. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  7. sys.path.append(root_path)
  8. import sys
  9. from time import sleep
  10. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  11. from dw_base.scheduler.mg2es.es_operator import ESOperator
  12. from dw_base.utils.config_utils import parse_args
  13. if __name__ == '__main__':
  14. CONFIG, _ = parse_args(sys.argv[1:])
  15. catalog = CONFIG.get('catalog')
  16. database_name = CONFIG.get('database_name')
  17. env = CONFIG.get('env','test')
  18. host='192.168.0.200'
  19. port='9201'
  20. if env == 'prod':
  21. host = '192.168.11.100'
  22. port = '9003'
  23. year = CONFIG.get('year')
  24. bak_suffix = CONFIG.get('bak_suffix','bak')
  25. es_operator = ESOperator(host, port)
  26. index_name = f'customs_{catalog}_{database_name}-{year}'
  27. bak_index_name = f'{index_name}-{bak_suffix}'
  28. es_operator.create_index(bak_index_name)
  29. task_id = es_operator.reindex(index_name, bak_index_name)['task']
  30. total_time = 0
  31. while True:
  32. sleep(60)
  33. total_time += 60
  34. task_info = es_operator.get_task_status(task_id)
  35. if task_info['completed'] == True:
  36. print('迁移完成--------------------------')
  37. print(f'迁移耗时:{total_time}秒')
  38. cnt = es_operator.get_index_document_count(bak_index_name)
  39. print(f'迁移文档数:{cnt}')
  40. break
  41. else:
  42. print('迁移中----------------------------')
  43. print(task_info)
  44. # es_operator.delete_index(index_name)