| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- import argparse
- import sys
- import re
- import os
- from pyhive import hive
- import pandas as pd
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.utils.log_utils import pretty_print
- from configparser import ConfigParser
- from pymongo import MongoClient
- from dw_base import *
- from dw_base.scheduler.polling_scheduler import get_mongo_client
- # 定义一个数组(列表)
- my_array = [
- 'japan'
- ]
- def parse_arguments():
- # 创建 ArgumentParser 对象
- parser = argparse.ArgumentParser(description='Process some parameters.')
- # 添加参数
- parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1')
- # 解析参数
- return parser.parse_args()
- def get_mongo_client(conf_path):
- config_parser = ConfigParser()
- config_parser.read(root_path + conf_path)
- url = config_parser.get('base', 'address')
- return MongoClient(url)
- def get_count(client, mgdb):
- # 选择数据库
- db = client[mgdb]
- # 选择集合
- # collection1 = db['shipments_imports']
- collection2 = db['shipments_exports']
- # 使用聚合管道进行分组和计数
- pipeline = [
- {
- "$group": {
- "_id": "$sldw", # 按sldw字段分组
- "count": {"$sum": 1} , # 计算每个组的数量
- "maxid": {"$max": "$_id"} # 计算每个组的最大id值
- }
- }
- ]
- # 执行聚合查询
- # results1 = list(collection1.aggregate(pipeline))
- results2 = list(collection2.aggregate(pipeline))
- pretty_print(f'开始合并结果-------------------------------------------------------------------------')
- # 合并结果
- combined_results = list(results2)
- pretty_print(f'结果-------------------------------------------------------------------------{combined_results}')
- def get_old_count(client,mgdb):
- result = get_count(client, mgdb)
- pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} '
- f'{NORM_MGT} old data count: {NORM_GRN}')
- return result
- def main():
- client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
- pretty_print(f'开始循环调用-------------------------------------------------------------------------')
- pretty_print(f'{my_array}')
- # 使用for循环遍历数组,并调用函数
- for item in my_array:
- pretty_print(f'开始执行:{item}')
- get_old_count(client,item)
- client.close()
- return 0
- if __name__ == '__main__':
- main()
|