| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # 用于钉钉监控T+1任务是否需要重跑
- import argparse
- import sys
- import re
- import os
- import requests
- import json
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.log_utils import pretty_print
- from configparser import ConfigParser
- from datetime import time
- from pymongo import MongoClient
- from dw_base import *
- from dw_base.scheduler.polling_scheduler import get_mongo_client
- from dw_base.utils.config_utils import parse_args
- from dw_base.scheduler.mg2es.conf_reader import ConfReader
- from dw_base.scheduler.mg2es.es_operator import ESOperator
- from elasticsearch.exceptions import NotFoundError
- # sql = "SELECT mgdb, mgtbl_name FROM tmp.tmp_zjh_1011"
- # spark = SparkSQL()
- # res = spark.query(sql)[0].collect()
- def parse_arguments():
- # 创建 ArgumentParser 对象
- parser = argparse.ArgumentParser(description='Process some parameters.')
- # 添加参数
- parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1')
- # 解析参数
- return parser.parse_args()
- def get_mongo_client(conf_path):
- config_parser = ConfigParser()
- config_parser.read(root_path + conf_path)
- url = config_parser.get('base', 'address')
- return MongoClient(url)
- def get_count(client, mgdb):
- # 选择数据库
- db = client[mgdb]
- # 选择集合
- collection1 = db['shipments_imports']
- collection2 = db['shipments_exports']
- # 根据 mgtbl 确定字段名
- fields_name1 = 'jksmc'
- fields_name2 = 'cksmc'
- # # 获取字段名
- # field_name = fields_name.get(mgtbl)
- # if field_name is None:
- # # 如果集合名不存在,抛出 ValueError 异常
- # raise ValueError(f"No field name found for mgtbl: {mgtbl}")
- # 使用 distinct 方法获取字段的去重后个数
- data1 = collection1.distinct(fields_name1)
- data2 = collection2.distinct(fields_name2)
- stat1 = len(data1)
- stat2 = len(data2)
- cnt1 = collection1.count()
- cnt2 = collection2.count()
- combined_data = len(set(data1 + data2))
- jgj = ('----------------------'+
- '\n进口总条数-->'+str(cnt1)+
- ',\n出口总条数-->'+str(cnt2)+
- ',\n进口去重企业数-->'+str(stat1)+
- ',\n出口去重企业数-->'+str(stat2)+
- ',\n进出口去重企业数-->'+str(combined_data)+
- '\n----------------------'
- )
- pretty_print(f'{jgj}')
- return jgj
- def get_old_count(mgdb):
- client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
- result = get_count(client, mgdb)
- pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} '
- f'{NORM_MGT} old data count: {NORM_GRN}')
- return result
- def main():
- # CONFIG, _ = parse_args(sys.argv[1:])
- # for record in res:
- # mgtbl = record['mgtbl_name']
- # 解析命令行参数
- args = parse_arguments()
- mgdb = args.mgdb
- old_cnt = get_old_count(mgdb)
- return 0
- if __name__ == '__main__':
- main()
|