polling_scheduler.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # 海关数据项目t0清洗 - 轮询调度入口
  2. # 每十分钟调度一次,检查是否有新的任务需要执行
  3. # 每次中间会休眠5分钟来判断数据源是否还在持续更新中
  4. import sys
  5. import re
  6. import os
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.utils.log_utils import pretty_print
  11. from configparser import ConfigParser
  12. from datetime import time
  13. from time import sleep
  14. from pymongo import MongoClient
  15. from dw_base import *
  16. from dw_base.utils.config_utils import parse_args
  17. from dw_base.utils.datetime_utils import date_to_timestamp
  18. from bson.objectid import ObjectId
  19. def get_mongo_client(conf_path):
  20. config_parser = ConfigParser()
  21. config_parser.read(root_path + conf_path)
  22. url = config_parser.get('base', 'address')
  23. return MongoClient(url)
  24. def get_count(client, mgdb, mgtbl, start_date, stop_date):
  25. db = client[mgdb]
  26. # 连接集合
  27. collection = db[mgtbl]
  28. # 根据查询条件查询数据条数
  29. start_dt_str = hex(int(date_to_timestamp(start_date)))[2:] + '0000000000000000'
  30. stop_dt_str = hex(int(date_to_timestamp(stop_date)))[2:] + '0000000000000000'
  31. query = {'_id': {'$gte': ObjectId(start_dt_str), '$lt': ObjectId(stop_dt_str)}}
  32. return collection.count(query)
  33. def get_source_count(mgdb, mgtbl, start_date, stop_date):
  34. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  35. result = get_count(client, mgdb, mgtbl, start_date, stop_date)
  36. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  37. f'{NORM_MGT} source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  38. f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}')
  39. return result
  40. def get_sink_count(mgdb, mgtbl, start_date, stop_date):
  41. # if mgdb != 'america':
  42. # client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-new.ini')
  43. # if mgdb == 'america':
  44. #
  45. client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
  46. result = get_count(client, mgdb, mgtbl, start_date, stop_date)
  47. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  48. f'{NORM_MGT} sink mongo: {NORM_GRN}{mgdb}.{mgtbl} '
  49. f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}')
  50. return result
  51. def is_run(mgdb, mgtbl, start_date, stop_date):
  52. first_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date)
  53. if first_source_count == 0:
  54. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  55. f'{NORM_MGT} source mongo data count is zero, exit! ')
  56. return False
  57. else:
  58. # 等待五分钟
  59. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  60. f'{NORM_MGT} now sleep 5 minutes to check source mongo data count again! ')
  61. sleep(60 * 10)
  62. second_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date)
  63. if first_source_count != second_source_count:
  64. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  65. f'{NORM_MGT} source mongo data count is increasing, exit! ')
  66. return False
  67. else:
  68. sink_count = get_sink_count(mgdb, mgtbl, start_date, stop_date)
  69. if sink_count == second_source_count:
  70. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  71. f'{NORM_MGT} source mongo data count is equal to sink mongo data count, exit! ')
  72. return False
  73. else:
  74. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  75. f'{NORM_MGT} source mongo data count is not equal to sink mongo data count, start! ')
  76. return True
  77. if __name__ == '__main__':
  78. CONFIG, _ = parse_args(sys.argv[1:])
  79. start_date = CONFIG.get('start-date')
  80. stop_date = CONFIG.get('stop-date')
  81. mgdb = CONFIG.get('mgdb')
  82. mgtbl = CONFIG.get('mgtbl')
  83. if is_run(mgdb, mgtbl, start_date, stop_date):
  84. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  85. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 1 ')
  86. print('${setValue(is_run=%s)}' % '1')
  87. else:
  88. pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
  89. f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 0 ')
  90. print('${setValue(is_run=%s)}' % '0')