# 用于钉钉监控T+1任务是否需要重跑 import argparse import sys import re import os import requests import json abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.log_utils import pretty_print from configparser import ConfigParser from datetime import time from pymongo import MongoClient from dw_base import * from dw_base.scheduler.polling_scheduler import get_mongo_client from dw_base.utils.config_utils import parse_args from dw_base.scheduler.mg2es.conf_reader import ConfReader from dw_base.scheduler.mg2es.es_operator import ESOperator from elasticsearch.exceptions import NotFoundError # sql = "SELECT mgdb, mgtbl_name FROM tmp.tmp_zjh_1011" # spark = SparkSQL() # res = spark.query(sql)[0].collect() def parse_arguments(): # 创建 ArgumentParser 对象 parser = argparse.ArgumentParser(description='Process some parameters.') # 添加参数 parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1') # 解析参数 return parser.parse_args() def get_mongo_client(conf_path): config_parser = ConfigParser() config_parser.read(root_path + conf_path) url = config_parser.get('base', 'address') return MongoClient(url) def get_count(client, mgdb): # 选择数据库 db = client[mgdb] # 选择集合 collection1 = db['shipments_imports'] collection2 = db['shipments_exports'] # 根据 mgtbl 确定字段名 fields_name1 = 'jksmc' fields_name2 = 'cksmc' # # 获取字段名 # field_name = fields_name.get(mgtbl) # if field_name is None: # # 如果集合名不存在,抛出 ValueError 异常 # raise ValueError(f"No field name found for mgtbl: {mgtbl}") # 使用 distinct 方法获取字段的去重后个数 data1 = collection1.distinct(fields_name1) data2 = collection2.distinct(fields_name2) stat1 = len(data1) stat2 = len(data2) cnt1 = collection1.count() cnt2 = collection2.count() combined_data = len(set(data1 + data2)) jgj = ('----------------------'+ '\n进口总条数-->'+str(cnt1)+ ',\n出口总条数-->'+str(cnt2)+ ',\n进口去重企业数-->'+str(stat1)+ ',\n出口去重企业数-->'+str(stat2)+ ',\n进出口去重企业数-->'+str(combined_data)+ '\n----------------------' ) pretty_print(f'{jgj}') return jgj def get_old_count(mgdb): client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini') result = get_count(client, mgdb) pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} ' f'{NORM_MGT} old data count: {NORM_GRN}') return result def main(): # CONFIG, _ = parse_args(sys.argv[1:]) # for record in res: # mgtbl = record['mgtbl_name'] # 解析命令行参数 args = parse_arguments() mgdb = args.mgdb old_cnt = get_old_count(mgdb) return 0 if __name__ == '__main__': main()