import argparse import sys import re import os from pyhive import hive import pandas as pd abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.log_utils import pretty_print from configparser import ConfigParser from pymongo import MongoClient from dw_base import * from dw_base.scheduler.polling_scheduler import get_mongo_client # 定义一个数组(列表) my_array = [ 'america_stat', 'australia', 'brazil', 'brazil_stat', 'canada', 'canada_stat', 'china_stat', 'cis', 'dominica', 'england', 'ethiopia', 'eurasian_bol', 'european_union', 'fiji', 'guatemala', 'honduras', 'honduras_stat', 'hongkong_stat', 'indonesia_stat', 'japan', 'kyrghyzstan', 'new_zealand', 'nicaragua', 'peru_exp', 'philippines_stat', 'russia_rail', 'salvador', 'salvador_stat', 'south_africa_stat', 'south_korea', 'south_korea_stat', 'spain', 'taiwan', 'thailand', 'thailand_stat', 'turkey_stat', 'zimbabwe', 'taiwan_stat', 'tanzania', 'tanzania_tboe', 'bolivia_stat', 'spain_co', 'congo_kinshasa', 'south_korea_co', 'england_stat', 'angola_stat', 'guatemala_stat', 'brazil_air', 'egypt_co', 'uruguay_nboe', 'panama_exp', 'bahrain_stat', 'dominican_republic_stat', 'qatar_stat' ] def parse_arguments(): # 创建 ArgumentParser 对象 parser = argparse.ArgumentParser(description='Process some parameters.') # 添加参数 parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1') # 解析参数 return parser.parse_args() def get_mongo_client(conf_path): config_parser = ConfigParser() config_parser.read(root_path + conf_path) url = config_parser.get('base', 'address') return MongoClient(url) def get_count(client, mgdb): # 选择数据库 db = client[mgdb] # 选择集合 collection1 = db['shipments_imports'] collection2 = db['shipments_exports'] # 使用聚合管道进行分组和计数 pipeline = [ { "$group": { "_id": "$cjfs", # 按cjfs字段分组 "count": {"$sum": 1} # 计算每个组的数量 } } ] # 执行聚合查询 results1 = list(collection1.aggregate(pipeline)) results2 = list(collection2.aggregate(pipeline)) pretty_print(f'开始合并结果-------------------------------------------------------------------------') # 合并结果 combined_results = list(results1) + list(results2) # 假设 combined_results 是一个字典列表 # 将结果转换为 DataFrame df = pd.DataFrame(combined_results) df1 = pd.DataFrame(results1) df2 = pd.DataFrame(results2) # 连接到 Hive hive_conn = hive.Connection(host='192.168.30.3', port=10000, username='hive', database='dim') # 写入 Hive 表 cursor = hive_conn.cursor() pretty_print(f'开始插入结果-------------------------------------------------------------------------') # 插入数据 for index, row in df1.iterrows(): insert_query = f""" INSERT INTO dim.cts_cjfs_global_old (cjfs, cnt, gj, jck) VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'im') """ pretty_print(f'{insert_query}') cursor.execute(insert_query) # 插入数据 for index, row in df2.iterrows(): insert_query = f""" INSERT INTO dim.cts_cjfs_global_old (cjfs, cnt, gj, jck) VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'ex') """ cursor.execute(insert_query) # 关闭连接 cursor.close() hive_conn.close() jgj = ('----------------------'+ '\n结果1-->' + str(results1) + '结果1end\n结果2-->' + str(results2) + '结果2end\n合并后结果-->'+str(combined_results)+ '\n----------------------' ) pretty_print(f'{jgj}') return jgj def get_old_count(client,mgdb): result = get_count(client, mgdb) pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} ' f'{NORM_MGT} old data count: {NORM_GRN}') return result def main(): client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini') pretty_print(f'开始循环调用-------------------------------------------------------------------------') pretty_print(f'{my_array}') # 使用for循环遍历数组,并调用函数 for item in my_array: pretty_print(f'开始执行:{item}') get_old_count(client,item) client.close() return 0 if __name__ == '__main__': main()