import argparse import sys import re import os from pyhive import hive import pandas as pd abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.log_utils import pretty_print from configparser import ConfigParser from pymongo import MongoClient from dw_base import * from dw_base.scheduler.polling_scheduler import get_mongo_client # 定义一个数组(列表) my_array = [ 'japan' ] def parse_arguments(): # 创建 ArgumentParser 对象 parser = argparse.ArgumentParser(description='Process some parameters.') # 添加参数 parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1') # 解析参数 return parser.parse_args() def get_mongo_client(conf_path): config_parser = ConfigParser() config_parser.read(root_path + conf_path) url = config_parser.get('base', 'address') return MongoClient(url) def get_count(client, mgdb): # 选择数据库 db = client[mgdb] # 选择集合 # collection1 = db['shipments_imports'] collection2 = db['shipments_exports'] # 使用聚合管道进行分组和计数 pipeline = [ { "$group": { "_id": "$sldw", # 按sldw字段分组 "count": {"$sum": 1} , # 计算每个组的数量 "maxid": {"$max": "$_id"} # 计算每个组的最大id值 } } ] # 执行聚合查询 # results1 = list(collection1.aggregate(pipeline)) results2 = list(collection2.aggregate(pipeline)) pretty_print(f'开始合并结果-------------------------------------------------------------------------') # 合并结果 combined_results = list(results2) pretty_print(f'结果-------------------------------------------------------------------------{combined_results}') def get_old_count(client,mgdb): result = get_count(client, mgdb) pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} ' f'{NORM_MGT} old data count: {NORM_GRN}') return result def main(): client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini') pretty_print(f'开始循环调用-------------------------------------------------------------------------') pretty_print(f'{my_array}') # 使用for循环遍历数组,并调用函数 for item in my_array: pretty_print(f'开始执行:{item}') get_old_count(client,item) client.close() return 0 if __name__ == '__main__': main()