get_oldmongo_sldw_detail.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import argparse
  2. import sys
  3. import re
  4. import os
  5. from pyhive import hive
  6. import pandas as pd
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.utils.log_utils import pretty_print
  11. from configparser import ConfigParser
  12. from pymongo import MongoClient
  13. from dw_base import *
  14. from dw_base.scheduler.polling_scheduler import get_mongo_client
  15. # 定义一个数组(列表)
  16. my_array = [
  17. 'japan'
  18. ]
  19. def parse_arguments():
  20. # 创建 ArgumentParser 对象
  21. parser = argparse.ArgumentParser(description='Process some parameters.')
  22. # 添加参数
  23. parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1')
  24. # 解析参数
  25. return parser.parse_args()
  26. def get_mongo_client(conf_path):
  27. config_parser = ConfigParser()
  28. config_parser.read(root_path + conf_path)
  29. url = config_parser.get('base', 'address')
  30. return MongoClient(url)
  31. def get_count(client, mgdb):
  32. # 选择数据库
  33. db = client[mgdb]
  34. # 选择集合
  35. # collection1 = db['shipments_imports']
  36. collection2 = db['shipments_exports']
  37. # 使用聚合管道进行分组和计数
  38. pipeline = [
  39. {
  40. "$group": {
  41. "_id": "$sldw", # 按sldw字段分组
  42. "count": {"$sum": 1} , # 计算每个组的数量
  43. "maxid": {"$max": "$_id"} # 计算每个组的最大id值
  44. }
  45. }
  46. ]
  47. # 执行聚合查询
  48. # results1 = list(collection1.aggregate(pipeline))
  49. results2 = list(collection2.aggregate(pipeline))
  50. pretty_print(f'开始合并结果-------------------------------------------------------------------------')
  51. # 合并结果
  52. combined_results = list(results2)
  53. pretty_print(f'结果-------------------------------------------------------------------------{combined_results}')
  54. def get_old_count(client,mgdb):
  55. result = get_count(client, mgdb)
  56. pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} '
  57. f'{NORM_MGT} old data count: {NORM_GRN}')
  58. return result
  59. def main():
  60. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  61. pretty_print(f'开始循环调用-------------------------------------------------------------------------')
  62. pretty_print(f'{my_array}')
  63. # 使用for循环遍历数组,并调用函数
  64. for item in my_array:
  65. pretty_print(f'开始执行:{item}')
  66. get_old_count(client,item)
  67. client.close()
  68. return 0
  69. if __name__ == '__main__':
  70. main()