mongodb_utils.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. from typing import List
  4. from pymongo import MongoClient
  5. class MongoDBHandler:
  6. def __init__(self, url: str, port: int = None, username: str = None, password: str = None, check_db: str = None,
  7. extra_config: List[str] = None):
  8. if not port or not username or not password or not check_db:
  9. uri = url
  10. else:
  11. uri = f'mongodb://{username}:{password}@{url}:{port}/{check_db}'
  12. if extra_config:
  13. uri = f'{uri}?{"&".join(extra_config)}'
  14. self.mongo_client = MongoClient(uri)
  15. # ES_CLIENT = Elasticsearch('es-cn-nif1oiv5w0009di0f.public.elasticsearch.aliyuncs.com:9200',
  16. # http_auth=('datafix', 'Aa123456'))
  17. #
  18. # INCLUDES = ['id', 'appInfo', 'appName', 'b2bClient', 'b2bInfo', 'b2bOpScope', 'b2bProduct', 'baikeInfo', 'blogName',
  19. # 'buyingInfo', 'certL2Name', 'contactAddress', 'entName', 'historyName', 'jobName', 'licenseContent',
  20. # 'licenseFileName', 'licenseOffice', 'miniAppName', 'opScope', 'patentName', 'semKeyword', 'semTitle',
  21. # 'seoInfo', 'seoKeyword', 'seoTitle', 'siteName', 'softwareProductName', 'tenderInfo', 'tenderName',
  22. # 'trademarkName', 'wechatName']
  23. #
  24. # def get_pid_by_name(company_name):
  25. # ent_col = MONGO_CLIENT.get_database('enterprise').get_collection('EnterpriseBaseInfo')
  26. # query = {'ENTNAME': company_name}
  27. # projection = {'PID': 1}
  28. # res_doc = ent_col.find_one(query, projection)
  29. # if res_doc and 'PID' in res_doc:
  30. # return res_doc['PID']
  31. #
  32. # return None
  33. #
  34. #
  35. # def search_by_name(company_name):
  36. # pid = get_pid_by_name(company_name)
  37. # print('pid: %s' % pid)
  38. # if pid:
  39. # company_details = search_by_pid(pid)
  40. # return company_details
  41. # return None
  42. #
  43. #
  44. # def get_detail_from_es(index, pid, includes=['id']):
  45. # try:
  46. # return ES_CLIENT.get(index=index, id=pid, _source_includes=includes)
  47. # except NotFoundError as not_found_e:
  48. # pass
  49. # except Exception as e:
  50. # print('Unknown error when get detail from es. error:%s' % str(e))
  51. #
  52. # return None
  53. #
  54. #
  55. # def search_by_pid(pid):
  56. # return get_detail_from_es('company_info_prod', pid, INCLUDES)
  57. #
  58. #
  59. # def main(local_save_path):
  60. # names = fp.readlines()
  61. # result_docs = []
  62. # for name in names:
  63. # print('company name: %s' % name)
  64. # detail = search_by_name(name.replace('\n', '').strip())
  65. # if detail is None:
  66. # print('can not found detail for name:%s' % name)
  67. # continue
  68. # source = detail['_source']
  69. # current_doc = []
  70. # # 为了保证顺序,需按INCLUDES遍历获取字段值
  71. # for field in INCLUDES:
  72. # if field in source:
  73. # if isinstance(source[field], str) and source[field] != '':
  74. # current_doc.append(source[field].replace('\t', ';').replace('\n', ';').replace('\r', ';'))
  75. # elif isinstance(source[field], list) and source[field] != []:
  76. # current_doc.append(
  77. # ','.join(source[field]).replace('\t', ';').replace('\n', ';').replace('\r', ';'))
  78. # else:
  79. # current_doc.append('')
  80. # else:
  81. # current_doc.append('')
  82. # result_docs.append(current_doc)
  83. # res_df = pd.DataFrame(result_docs, mysql_column_list=INCLUDES)
  84. # res_df.to_csv(local_save_path, sep='\t', index=False, encoding='utf-8', header=True)
  85. # # os.system("source /etc/profile;hadoop fs -put %s %s" % (local_save_path, hdfs_save_path))
  86. #
  87. #
  88. # FLAGS = set('n ng nrfg nrt nt vn un'.split())
  89. #
  90. #
  91. # def lcut(text):
  92. # if isinstance(text, list):
  93. # print(text)
  94. # return [
  95. # w.word for sentence in split('[^a-zA-Z0-9\u4e00-\u9fa5]+', text.strip())
  96. # for w in dt.cut(sentence) if len(w.word) > 2 and w.flag in FLAGS]
  97. #
  98. #
  99. # class TFIDF:
  100. # def __init__(self, dictionary, model):
  101. # self.model = model
  102. # self.doc2bow = dictionary.doc2bow
  103. # self.id2word = {i: w for w, i in dictionary.token2id.items()}
  104. #
  105. # @classmethod
  106. # def train(cls, texts):
  107. # texts1 = [lcut(text) for text in texts]
  108. # dictionary = Dictionary(texts1)
  109. # corpus = [dictionary.doc2bow(text) for text in texts1]
  110. # model = TfidfModel(corpus)
  111. # return cls(dictionary, model)
  112. #
  113. # def extract(self, text, top_n=10):
  114. # vector = self.doc2bow(lcut(text))
  115. # key_words = sorted(self.model[vector], key=lambda x: x[1], reverse=True)
  116. # return [self.id2word[i] for i, j in key_words][:top_n]
  117. #
  118. #
  119. # def extract_keywords(mysql_column_list):
  120. # columns_list = raw_data_df[mysql_column_list].values.tolist()
  121. # combined = [';'.join(row) for row in columns_list]
  122. # keywords = tf_idf_model.extract(';'.join(combined), top_n=50)
  123. # print(','.join(keywords))
  124. # return keywords
  125. #
  126. #
  127. # if __name__ == '__main__':
  128. # fp = open(get_abs_path('data/seed_company_name_yidong.csv'))
  129. # local_save_path = '/root/wwj-hive-warehouse/data/zhong_qi_customer_info_yidong.csv'
  130. # hdfs_save_path = ''
  131. #
  132. # main(local_save_path)
  133. # raw_data_df = pd.read_csv(local_save_path, sep='\t', encoding='utf-8')
  134. # raw_data_df = raw_data_df.fillna('')
  135. # raw_data_df['combined'] = raw_data_df[INCLUDES].apply(lambda row: ';'.join(row.values.astype(str)), axis=1)
  136. # all_text_info_list = raw_data_df['combined'].values.tolist()
  137. # tf_idf_model = TFIDF.train(all_text_info_list)
  138. #
  139. # op_scope_keywords = extract_keywords(['opScope', 'b2bOpScope'])
  140. # product_keywords = extract_keywords(['appInfo', 'appName', 'b2bProduct'])
  141. # jobName_keywords = extract_keywords(['jobName'])
  142. # semKeyword_keywords = extract_keywords(['semKeyword'])
  143. # baike_keywords = extract_keywords(['baikeInfo'])
  144. # b2bInfo_keywords = extract_keywords(['b2bInfo'])
  145. # # opScope = raw_data_df[['opScope']].values.tolist()
  146. # # opScope_keywords = tf_idf_model.extract(opScope)
  147. # #
  148. # # opScope = raw_data_df[['opScope']].values.tolist()
  149. # # opScope_keywords = tf_idf_model.extract(opScope)
  150. # #
  151. # # opScope = raw_data_df[['opScope']].values.tolist()
  152. # # opScope_keywords = tf_idf_model.extract(opScope)
  153. # # pid = get_pid_by_name('杭州德玛瑞户外用品有限公司')
  154. # # print(pid)
  155. # mongo_url_a42 = 'mongodb://dw_all_ro:Dt#R30ES@' \
  156. # 'dds-m5e44df0967967a41.mongodb.rds.aliyuncs.com:3717,' \
  157. # 'dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com:3717,' \
  158. # 'dds-m5e44df0967967a43.mongodb.rds.aliyuncs.com:3717' \
  159. # '/admin?replicaSet=mgset-12596773&readReference=secondaryPreferred'
  160. # a42_handler = MongoDBHandler(mongo_url_a42)
  161. # mongo_url_b41 = 'mongodb://dw_all_rw:W#ioQseT@' \
  162. # 'dds-m5ed9ea9d9a653b41.mongodb.rds.aliyuncs.com:3717,' \
  163. # 'dds-m5ed9ea9d9a653b42.mongodb.rds.aliyuncs.com:3717,' \
  164. # 'dds-m5ed9ea9d9a653b43.mongodb.rds.aliyuncs.com:3717' \
  165. # '/admin?replicaSet=mgset-45687639&readReference=secondaryPreferred'
  166. # b41_handler = MongoDBHandler(mongo_url_b41)
  167. # mongo_url_dev = 'mongodb://dev_dw_ro:Dt#R30ES@dds-m5e686962c7b71641431-pub.mongodb.rds.aliyuncs.com:3717/admin'
  168. # dev_handler = MongoDBHandler(mongo_url_dev)
  169. #
  170. # if __name__ == '__main__':
  171. # mongo_handler = MongoDBHandler(url='dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com',
  172. # port=3717,
  173. # username='dw_all_ro',
  174. # password='Dt#R30ES',
  175. # check_db='admin')
  176. # for db in mongo_handler.mongo_client.list_database_names():
  177. # print(db)