get_oldmongo_ysfs.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # 用于钉钉监控T+1任务是否需要重跑
  2. import argparse
  3. import sys
  4. import re
  5. import os
  6. import requests
  7. import json
  8. from pyhive import hive
  9. import pandas as pd
  10. abspath = os.path.abspath(__file__)
  11. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  12. sys.path.append(root_path)
  13. from dw_base.spark.spark_sql import SparkSQL
  14. from dw_base.utils.log_utils import pretty_print
  15. from configparser import ConfigParser
  16. from datetime import time
  17. from pymongo import MongoClient
  18. from dw_base import *
  19. from dw_base.scheduler.polling_scheduler import get_mongo_client
  20. from dw_base.utils.config_utils import parse_args
  21. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  22. from dw_base.scheduler.mg2es.es_operator import ESOperator
  23. from elasticsearch.exceptions import NotFoundError
  24. # sql = "SELECT mgdb, mgtbl_name FROM tmp.tmp_zjh_1011"
  25. # spark = SparkSQL()
  26. # res = spark.query(sql)[0].collect()
  27. def parse_arguments():
  28. # 创建 ArgumentParser 对象
  29. parser = argparse.ArgumentParser(description='Process some parameters.')
  30. # 添加参数
  31. parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1')
  32. # 解析参数
  33. return parser.parse_args()
  34. def get_mongo_client(conf_path):
  35. config_parser = ConfigParser()
  36. config_parser.read(root_path + conf_path)
  37. url = config_parser.get('base', 'address')
  38. return MongoClient(url)
  39. def get_count(client, mgdb):
  40. # 选择数据库
  41. db = client[mgdb]
  42. # 选择集合
  43. collection1 = db['shipments_imports']
  44. collection2 = db['shipments_exports']
  45. # 使用聚合管道进行分组和计数
  46. pipeline = [
  47. {
  48. "$group": {
  49. "_id": "$ysfs", # 按ysfs字段分组
  50. "count": {"$sum": 1} # 计算每个组的数量
  51. }
  52. }
  53. ]
  54. # 执行聚合查询
  55. results1 = list(collection1.aggregate(pipeline))
  56. results2 = list(collection2.aggregate(pipeline))
  57. pretty_print(f'开始合并结果-------------------------------------------------------------------------')
  58. # 合并结果
  59. combined_results = list(results1) + list(results2)
  60. # 假设 combined_results 是一个字典列表
  61. # 将结果转换为 DataFrame
  62. df = pd.DataFrame(combined_results)
  63. df1 = pd.DataFrame(results1)
  64. df2 = pd.DataFrame(results2)
  65. # 连接到 Hive
  66. hive_conn = hive.Connection(host='192.168.30.3', port=10000, username='hive', database='dim')
  67. # 写入 Hive 表
  68. cursor = hive_conn.cursor()
  69. pretty_print(f'开始插入结果-------------------------------------------------------------------------')
  70. # 插入数据
  71. for index, row in df1.iterrows():
  72. insert_query = f"""
  73. INSERT INTO dim.cts_ysfs_global_old (ysfs, cnt, gj, jck)
  74. VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'im')
  75. """
  76. pretty_print(f'{insert_query}')
  77. cursor.execute(insert_query)
  78. # 插入数据
  79. for index, row in df2.iterrows():
  80. insert_query = f"""
  81. INSERT INTO dim.cts_ysfs_global_old (ysfs, cnt, gj, jck)
  82. VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'ex')
  83. """
  84. cursor.execute(insert_query)
  85. # 关闭连接
  86. cursor.close()
  87. hive_conn.close()
  88. jgj = ('----------------------'+
  89. '\n结果1-->' + str(results1) +
  90. '结果1end\n结果2-->' + str(results2) +
  91. '结果2end\n合并后结果-->'+str(combined_results)+
  92. '\n----------------------'
  93. )
  94. pretty_print(f'{jgj}')
  95. return jgj
  96. def get_old_count(mgdb):
  97. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  98. result = get_count(client, mgdb)
  99. pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} '
  100. f'{NORM_MGT} old data count: {NORM_GRN}')
  101. return result
  102. def main():
  103. # CONFIG, _ = parse_args(sys.argv[1:])
  104. # for record in res:
  105. # mgtbl = record['mgtbl_name']
  106. # 解析命令行参数
  107. args = parse_arguments()
  108. mgdb = args.mgdb
  109. old_cnt = get_old_count(mgdb)
  110. return 0
  111. if __name__ == '__main__':
  112. main()