| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- import argparse
- import sys
- import re
- import os
- from pyhive import hive
- import pandas as pd
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.utils.log_utils import pretty_print
- from configparser import ConfigParser
- from pymongo import MongoClient
- from dw_base import *
- from dw_base.scheduler.polling_scheduler import get_mongo_client
- # 定义一个数组(列表)
- my_array = [
- 'america_stat',
- 'australia',
- 'brazil',
- 'brazil_stat',
- 'canada',
- 'canada_stat',
- 'china_stat',
- 'cis',
- 'dominica',
- 'england',
- 'ethiopia',
- 'eurasian_bol',
- 'european_union',
- 'fiji',
- 'guatemala',
- 'honduras',
- 'honduras_stat',
- 'hongkong_stat',
- 'indonesia_stat',
- 'japan',
- 'kyrghyzstan',
- 'new_zealand',
- 'nicaragua',
- 'peru_exp',
- 'philippines_stat',
- 'russia_rail',
- 'salvador',
- 'salvador_stat',
- 'south_africa_stat',
- 'south_korea',
- 'south_korea_stat',
- 'spain',
- 'taiwan',
- 'thailand',
- 'thailand_stat',
- 'turkey_stat',
- 'zimbabwe',
- 'taiwan_stat',
- 'tanzania',
- 'tanzania_tboe',
- 'bolivia_stat',
- 'spain_co',
- 'congo_kinshasa',
- 'south_korea_co',
- 'england_stat',
- 'angola_stat',
- 'guatemala_stat',
- 'brazil_air',
- 'egypt_co',
- 'uruguay_nboe',
- 'panama_exp',
- 'bahrain_stat',
- 'dominican_republic_stat',
- 'qatar_stat'
- ]
- def parse_arguments():
- # 创建 ArgumentParser 对象
- parser = argparse.ArgumentParser(description='Process some parameters.')
- # 添加参数
- parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1')
- # 解析参数
- return parser.parse_args()
- def get_mongo_client(conf_path):
- config_parser = ConfigParser()
- config_parser.read(root_path + conf_path)
- url = config_parser.get('base', 'address')
- return MongoClient(url)
- def get_count(client, mgdb):
- # 选择数据库
- db = client[mgdb]
- # 选择集合
- collection1 = db['shipments_imports']
- collection2 = db['shipments_exports']
- # 使用聚合管道进行分组和计数
- pipeline = [
- {
- "$group": {
- "_id": "$cjfs", # 按cjfs字段分组
- "count": {"$sum": 1} # 计算每个组的数量
- }
- }
- ]
- # 执行聚合查询
- results1 = list(collection1.aggregate(pipeline))
- results2 = list(collection2.aggregate(pipeline))
- pretty_print(f'开始合并结果-------------------------------------------------------------------------')
- # 合并结果
- combined_results = list(results1) + list(results2)
- # 假设 combined_results 是一个字典列表
- # 将结果转换为 DataFrame
- df = pd.DataFrame(combined_results)
- df1 = pd.DataFrame(results1)
- df2 = pd.DataFrame(results2)
- # 连接到 Hive
- hive_conn = hive.Connection(host='192.168.30.3', port=10000, username='hive', database='dim')
- # 写入 Hive 表
- cursor = hive_conn.cursor()
- pretty_print(f'开始插入结果-------------------------------------------------------------------------')
- # 插入数据
- for index, row in df1.iterrows():
- insert_query = f"""
- INSERT INTO dim.cts_cjfs_global_old (cjfs, cnt, gj, jck)
- VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'im')
- """
- pretty_print(f'{insert_query}')
- cursor.execute(insert_query)
- # 插入数据
- for index, row in df2.iterrows():
- insert_query = f"""
- INSERT INTO dim.cts_cjfs_global_old (cjfs, cnt, gj, jck)
- VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'ex')
- """
- cursor.execute(insert_query)
- # 关闭连接
- cursor.close()
- hive_conn.close()
- jgj = ('----------------------'+
- '\n结果1-->' + str(results1) +
- '结果1end\n结果2-->' + str(results2) +
- '结果2end\n合并后结果-->'+str(combined_results)+
- '\n----------------------'
- )
- pretty_print(f'{jgj}')
- return jgj
- def get_old_count(client,mgdb):
- result = get_count(client, mgdb)
- pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} '
- f'{NORM_MGT} old data count: {NORM_GRN}')
- return result
- def main():
- client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
- pretty_print(f'开始循环调用-------------------------------------------------------------------------')
- pretty_print(f'{my_array}')
- # 使用for循环遍历数组,并调用函数
- for item in my_array:
- pretty_print(f'开始执行:{item}')
- get_old_count(client,item)
- client.close()
- return 0
- if __name__ == '__main__':
- main()
|