# 用于钉钉监控T+1任务是否需要重跑 import argparse import sys import re import os import requests import json from pyhive import hive import pandas as pd abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.log_utils import pretty_print from configparser import ConfigParser from datetime import time from pymongo import MongoClient from dw_base import * from dw_base.scheduler.polling_scheduler import get_mongo_client from dw_base.utils.config_utils import parse_args from dw_base.scheduler.mg2es.conf_reader import ConfReader from dw_base.scheduler.mg2es.es_operator import ESOperator from elasticsearch.exceptions import NotFoundError # sql = "SELECT mgdb, mgtbl_name FROM tmp.tmp_zjh_1011" # spark = SparkSQL() # res = spark.query(sql)[0].collect() def parse_arguments(): # 创建 ArgumentParser 对象 parser = argparse.ArgumentParser(description='Process some parameters.') # 添加参数 parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1') # 解析参数 return parser.parse_args() def get_mongo_client(conf_path): config_parser = ConfigParser() config_parser.read(root_path + conf_path) url = config_parser.get('base', 'address') return MongoClient(url) def get_count(client, mgdb): # 选择数据库 db = client[mgdb] # 选择集合 collection1 = db['shipments_imports'] collection2 = db['shipments_exports'] # 使用聚合管道进行分组和计数 pipeline = [ { "$group": { "_id": "$ysfs", # 按ysfs字段分组 "count": {"$sum": 1} # 计算每个组的数量 } } ] # 执行聚合查询 results1 = list(collection1.aggregate(pipeline)) results2 = list(collection2.aggregate(pipeline)) pretty_print(f'开始合并结果-------------------------------------------------------------------------') # 合并结果 combined_results = list(results1) + list(results2) # 假设 combined_results 是一个字典列表 # 将结果转换为 DataFrame df = pd.DataFrame(combined_results) df1 = pd.DataFrame(results1) df2 = pd.DataFrame(results2) # 连接到 Hive hive_conn = hive.Connection(host='192.168.30.3', port=10000, username='hive', database='dim') # 写入 Hive 表 cursor = hive_conn.cursor() pretty_print(f'开始插入结果-------------------------------------------------------------------------') # 插入数据 for index, row in df1.iterrows(): insert_query = f""" INSERT INTO dim.cts_ysfs_global_old (ysfs, cnt, gj, jck) VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'im') """ pretty_print(f'{insert_query}') cursor.execute(insert_query) # 插入数据 for index, row in df2.iterrows(): insert_query = f""" INSERT INTO dim.cts_ysfs_global_old (ysfs, cnt, gj, jck) VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'ex') """ cursor.execute(insert_query) # 关闭连接 cursor.close() hive_conn.close() jgj = ('----------------------'+ '\n结果1-->' + str(results1) + '结果1end\n结果2-->' + str(results2) + '结果2end\n合并后结果-->'+str(combined_results)+ '\n----------------------' ) pretty_print(f'{jgj}') return jgj def get_old_count(mgdb): client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini') result = get_count(client, mgdb) pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} ' f'{NORM_MGT} old data count: {NORM_GRN}') return result def main(): # CONFIG, _ = parse_args(sys.argv[1:]) # for record in res: # mgtbl = record['mgtbl_name'] # 解析命令行参数 args = parse_arguments() mgdb = args.mgdb old_cnt = get_old_count(mgdb) return 0 if __name__ == '__main__': main()