# 海关数据项目t0清洗 - 轮询调度入口 # 每十分钟调度一次,检查是否有新的任务需要执行 # 每次中间会休眠5分钟来判断数据源是否还在持续更新中 import sys import re import os abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.log_utils import pretty_print from configparser import ConfigParser from datetime import time from time import sleep from pymongo import MongoClient from dw_base import * from dw_base.utils.config_utils import parse_args from dw_base.utils.datetime_utils import date_to_timestamp from bson.objectid import ObjectId def get_mongo_client(conf_path): config_parser = ConfigParser() config_parser.read(root_path + conf_path) url = config_parser.get('base', 'address') return MongoClient(url) def get_count(client, mgdb, mgtbl, start_date, stop_date): db = client[mgdb] # 连接集合 collection = db[mgtbl] # 根据查询条件查询数据条数 start_dt_str = hex(int(date_to_timestamp(start_date)))[2:] + '0000000000000000' stop_dt_str = hex(int(date_to_timestamp(stop_date)))[2:] + '0000000000000000' query = {'_id': {'$gte': ObjectId(start_dt_str), '$lt': ObjectId(stop_dt_str)}} return collection.count(query) def get_source_count(mgdb, mgtbl, start_date, stop_date): client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini') result = get_count(client, mgdb, mgtbl, start_date, stop_date) pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} source mongo: {NORM_GRN}{mgdb}.{mgtbl} ' f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}') return result def get_sink_count(mgdb, mgtbl, start_date, stop_date): # if mgdb != 'america': # client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-new.ini') # if mgdb == 'america': # client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini') result = get_count(client, mgdb, mgtbl, start_date, stop_date) pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} sink mongo: {NORM_GRN}{mgdb}.{mgtbl} ' f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}') return result def is_run(mgdb, mgtbl, start_date, stop_date): first_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date) if first_source_count == 0: pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} source mongo data count is zero, exit! ') return False else: # 等待五分钟 pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} now sleep 5 minutes to check source mongo data count again! ') sleep(60 * 10) second_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date) if first_source_count != second_source_count: pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} source mongo data count is increasing, exit! ') return False else: sink_count = get_sink_count(mgdb, mgtbl, start_date, stop_date) if sink_count == second_source_count: pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} source mongo data count is equal to sink mongo data count, exit! ') return False else: pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT} source mongo data count is not equal to sink mongo data count, start! ') return True if __name__ == '__main__': CONFIG, _ = parse_args(sys.argv[1:]) start_date = CONFIG.get('start-date') stop_date = CONFIG.get('stop-date') mgdb = CONFIG.get('mgdb') mgtbl = CONFIG.get('mgtbl') if is_run(mgdb, mgtbl, start_date, stop_date): pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 1 ') print('${setValue(is_run=%s)}' % '1') else: pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} ' f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 0 ') print('${setValue(is_run=%s)}' % '0')