get_oldmongo_sldw.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import argparse
  2. import sys
  3. import re
  4. import os
  5. from pyhive import hive
  6. import pandas as pd
  7. abspath = os.path.abspath(__file__)
  8. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  9. sys.path.append(root_path)
  10. from dw_base.utils.log_utils import pretty_print
  11. from configparser import ConfigParser
  12. from pymongo import MongoClient
  13. from dw_base import *
  14. from dw_base.scheduler.polling_scheduler import get_mongo_client
  15. # 定义一个数组(列表)
  16. my_array = [
  17. 'america_stat',
  18. 'australia',
  19. 'brazil',
  20. 'brazil_stat',
  21. 'canada',
  22. 'canada_stat',
  23. 'china_stat',
  24. 'cis',
  25. 'dominica',
  26. 'england',
  27. 'ethiopia',
  28. 'eurasian_bol',
  29. 'european_union',
  30. 'fiji',
  31. 'guatemala',
  32. 'honduras',
  33. 'honduras_stat',
  34. 'hongkong_stat',
  35. 'indonesia_stat',
  36. 'japan',
  37. 'kyrghyzstan',
  38. 'new_zealand',
  39. 'nicaragua',
  40. 'peru_exp',
  41. 'philippines_stat',
  42. 'russia_rail',
  43. 'salvador',
  44. 'salvador_stat',
  45. 'south_africa_stat',
  46. 'south_korea',
  47. 'south_korea_stat',
  48. 'spain',
  49. 'taiwan',
  50. 'thailand',
  51. 'thailand_stat',
  52. 'turkey_stat',
  53. 'zimbabwe',
  54. 'taiwan_stat',
  55. 'tanzania',
  56. 'tanzania_tboe',
  57. 'bolivia_stat',
  58. 'spain_co',
  59. 'congo_kinshasa',
  60. 'south_korea_co',
  61. 'england_stat',
  62. 'angola_stat',
  63. 'guatemala_stat',
  64. 'brazil_air',
  65. 'egypt_co',
  66. 'uruguay_nboe',
  67. 'panama_exp',
  68. 'bahrain_stat',
  69. 'dominican_republic_stat',
  70. 'qatar_stat'
  71. ]
  72. def parse_arguments():
  73. # 创建 ArgumentParser 对象
  74. parser = argparse.ArgumentParser(description='Process some parameters.')
  75. # 添加参数
  76. parser.add_argument('-mgdb', dest='mgdb', required=True, help='Parameter 1')
  77. # 解析参数
  78. return parser.parse_args()
  79. def get_mongo_client(conf_path):
  80. config_parser = ConfigParser()
  81. config_parser.read(root_path + conf_path)
  82. url = config_parser.get('base', 'address')
  83. return MongoClient(url)
  84. def get_count(client, mgdb):
  85. # 选择数据库
  86. db = client[mgdb]
  87. # 选择集合
  88. collection1 = db['shipments_imports']
  89. collection2 = db['shipments_exports']
  90. # 使用聚合管道进行分组和计数
  91. pipeline = [
  92. {
  93. "$group": {
  94. "_id": "$sldw", # 按sldw字段分组
  95. "count": {"$sum": 1} # 计算每个组的数量
  96. }
  97. }
  98. ]
  99. # 执行聚合查询
  100. results1 = list(collection1.aggregate(pipeline))
  101. results2 = list(collection2.aggregate(pipeline))
  102. pretty_print(f'开始合并结果-------------------------------------------------------------------------')
  103. # 合并结果
  104. combined_results = list(results1) + list(results2)
  105. # 假设 combined_results 是一个字典列表
  106. # 将结果转换为 DataFrame
  107. df = pd.DataFrame(combined_results)
  108. df1 = pd.DataFrame(results1)
  109. df2 = pd.DataFrame(results2)
  110. # 连接到 Hive
  111. hive_conn = hive.Connection(host='192.168.30.3', port=10000, username='hive', database='dim')
  112. # 写入 Hive 表
  113. cursor = hive_conn.cursor()
  114. pretty_print(f'开始插入结果-------------------------------------------------------------------------')
  115. # 插入数据
  116. for index, row in df1.iterrows():
  117. insert_query = f"""
  118. INSERT INTO dim.cts_sldw_global_old (sldw, cnt, gj, jck)
  119. VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'im')
  120. """
  121. pretty_print(f'{insert_query}')
  122. cursor.execute(insert_query)
  123. # 插入数据
  124. for index, row in df2.iterrows():
  125. insert_query = f"""
  126. INSERT INTO dim.cts_sldw_global_old (sldw, cnt, gj, jck)
  127. VALUES ('{row['_id']}' , '{row['count']}','{mgdb}', 'ex')
  128. """
  129. cursor.execute(insert_query)
  130. # 关闭连接
  131. cursor.close()
  132. hive_conn.close()
  133. jgj = ('----------------------'+
  134. '\n结果1-->' + str(results1) +
  135. '结果1end\n结果2-->' + str(results2) +
  136. '结果2end\n合并后结果-->'+str(combined_results)+
  137. '\n----------------------'
  138. )
  139. pretty_print(f'{jgj}')
  140. return jgj
  141. def get_old_count(client,mgdb):
  142. result = get_count(client, mgdb)
  143. pretty_print(f'{NORM_MGT} old source mongo: {NORM_GRN}{mgdb} '
  144. f'{NORM_MGT} old data count: {NORM_GRN}')
  145. return result
  146. def main():
  147. client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
  148. pretty_print(f'开始循环调用-------------------------------------------------------------------------')
  149. pretty_print(f'{my_array}')
  150. # 使用for循环遍历数组,并调用函数
  151. for item in my_array:
  152. pretty_print(f'开始执行:{item}')
  153. get_old_count(client,item)
  154. client.close()
  155. return 0
  156. if __name__ == '__main__':
  157. main()