get_oldmongo_stat.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # 用于钉钉监控T+1任务是否需要重跑
  2. import argparse
  3. import sys
  4. import re
  5. import os
  6. import requests
  7. import json
  8. abspath = os.path.abspath(__file__)
  9. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  10. sys.path.append(root_path)
  11. from dw_base.spark.spark_sql import SparkSQL
  12. from dw_base.utils.log_utils import pretty_print
  13. from configparser import ConfigParser
  14. from datetime import time
  15. from pymongo import MongoClient
  16. from dw_base import *
  17. from dw_base.scheduler.polling_scheduler import get_mongo_client
  18. from dw_base.utils.config_utils import parse_args
  19. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  20. from dw_base.scheduler.mg2es.es_operator import ESOperator
  21. from elasticsearch.exceptions import NotFoundError
  22. # sql = "SELECT mgdb, mgtbl_name FROM tmp.tmp_zjh_1011"
  23. # spark = SparkSQL()
  24. # res = spark.query(sql)[0].collect()
  25. def parse_arguments():
  26. # 创建 ArgumentParser 对象
  27. parser = argparse.ArgumentParser(description='Process some parameters.')
  28. # 添加参数
  29. parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1')
  30. # 解析参数
  31. return parser.parse_args()
  32. def get_mongo_client(conf_path):
  33. config_parser = ConfigParser()
  34. config_parser.read(root_path + conf_path)
  35. url = config_parser.get('base', 'address')
  36. return MongoClient(url)
  37. def get_count(client, mgdb):
  38. # 选择数据库
  39. db = client[mgdb]
  40. # 选择集合
  41. collection1 = db['shipments_imports']
  42. collection2 = db['shipments_exports']
  43. # 根据 mgtbl 确定字段名
  44. fields_name1 = 'jksmc'
  45. fields_name2 = 'cksmc'
  46. # # 获取字段名
  47. # field_name = fields_name.get(mgtbl)
  48. # if field_name is None:
  49. # # 如果集合名不存在,抛出 ValueError 异常
  50. # raise ValueError(f"No field name found for mgtbl: {mgtbl}")
  51. # 使用 distinct 方法获取字段的去重后个数
  52. data1 = collection1.distinct(fields_name1)
  53. data2 = collection2.distinct(fields_name2)
  54. stat1 = len(data1)
  55. stat2 = len(data2)
  56. cnt1 = collection1.count()
  57. cnt2 = collection2.count()
  58. combined_data = len(set(data1 + data2))
  59. jgj = ('----------------------'+
  60. '\n进口总条数-->'+str(cnt1)+
  61. ',\n出口总条数-->'+str(cnt2)+
  62. ',\n进口去重企业数-->'+str(stat1)+
  63. ',\n出口去重企业数-->'+str(stat2)+
  64. ',\n进出口去重企业数-->'+str(combined_data)+
  65. '\n----------------------'
  66. )
  67. pretty_print(f'{jgj}')
  68. return jgj
  69. def get_old_count(mgdb):
  70. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  71. result = get_count(client, mgdb)
  72. pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} '
  73. f'{NORM_MGT} old data count: {NORM_GRN}')
  74. return result
  75. def main():
  76. # CONFIG, _ = parse_args(sys.argv[1:])
  77. # for record in res:
  78. # mgtbl = record['mgtbl_name']
  79. # 解析命令行参数
  80. args = parse_arguments()
  81. mgdb = args.mgdb
  82. old_cnt = get_old_count(mgdb)
  83. return 0
  84. if __name__ == '__main__':
  85. main()