| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- # 海关数据项目t0清洗 - 轮询调度入口
- # 每十分钟调度一次,检查是否有新的任务需要执行
- # 每次中间会休眠5分钟来判断数据源是否还在持续更新中
- import sys
- import re
- import os
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.utils.log_utils import pretty_print
- from configparser import ConfigParser
- from datetime import time
- from time import sleep
- from pymongo import MongoClient
- from dw_base import *
- from dw_base.utils.config_utils import parse_args
- from dw_base.utils.datetime_utils import date_to_timestamp
- from bson.objectid import ObjectId
- def get_mongo_client(conf_path):
- config_parser = ConfigParser()
- config_parser.read(root_path + conf_path)
- url = config_parser.get('base', 'address')
- return MongoClient(url)
- def get_count(client, mgdb, mgtbl, start_date, stop_date):
- db = client[mgdb]
- # 连接集合
- collection = db[mgtbl]
- # 根据查询条件查询数据条数
- start_dt_str = hex(int(date_to_timestamp(start_date)))[2:] + '0000000000000000'
- stop_dt_str = hex(int(date_to_timestamp(stop_date)))[2:] + '0000000000000000'
- query = {'_id': {'$gte': ObjectId(start_dt_str), '$lt': ObjectId(stop_dt_str)}}
- return collection.count(query)
- def get_source_count(mgdb, mgtbl, start_date, stop_date):
- client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
- result = get_count(client, mgdb, mgtbl, start_date, stop_date)
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
- f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}')
- return result
- def get_sink_count(mgdb, mgtbl, start_date, stop_date):
- # if mgdb != 'america':
- # client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-new.ini')
- # if mgdb == 'america':
- #
- client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
- result = get_count(client, mgdb, mgtbl, start_date, stop_date)
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} sink mongo: {NORM_GRN}{mgdb}.{mgtbl} '
- f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}')
- return result
- def is_run(mgdb, mgtbl, start_date, stop_date):
- first_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date)
- if first_source_count == 0:
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} source mongo data count is zero, exit! ')
- return False
- else:
- # 等待五分钟
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} now sleep 5 minutes to check source mongo data count again! ')
- sleep(60 * 10)
- second_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date)
- if first_source_count != second_source_count:
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} source mongo data count is increasing, exit! ')
- return False
- else:
- sink_count = get_sink_count(mgdb, mgtbl, start_date, stop_date)
- if sink_count == second_source_count:
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} source mongo data count is equal to sink mongo data count, exit! ')
- return False
- else:
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT} source mongo data count is not equal to sink mongo data count, start! ')
- return True
- if __name__ == '__main__':
- CONFIG, _ = parse_args(sys.argv[1:])
- start_date = CONFIG.get('start-date')
- stop_date = CONFIG.get('stop-date')
- mgdb = CONFIG.get('mgdb')
- mgtbl = CONFIG.get('mgtbl')
- if is_run(mgdb, mgtbl, start_date, stop_date):
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 1 ')
- print('${setValue(is_run=%s)}' % '1')
- else:
- pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
- f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 0 ')
- print('${setValue(is_run=%s)}' % '0')
|