#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- from typing import List from pymongo import MongoClient class MongoDBHandler: def __init__(self, url: str, port: int = None, username: str = None, password: str = None, check_db: str = None, extra_config: List[str] = None): if not port or not username or not password or not check_db: uri = url else: uri = f'mongodb://{username}:{password}@{url}:{port}/{check_db}' if extra_config: uri = f'{uri}?{"&".join(extra_config)}' self.mongo_client = MongoClient(uri) # ES_CLIENT = Elasticsearch('es-cn-nif1oiv5w0009di0f.public.elasticsearch.aliyuncs.com:9200', # http_auth=('datafix', 'Aa123456')) # # INCLUDES = ['id', 'appInfo', 'appName', 'b2bClient', 'b2bInfo', 'b2bOpScope', 'b2bProduct', 'baikeInfo', 'blogName', # 'buyingInfo', 'certL2Name', 'contactAddress', 'entName', 'historyName', 'jobName', 'licenseContent', # 'licenseFileName', 'licenseOffice', 'miniAppName', 'opScope', 'patentName', 'semKeyword', 'semTitle', # 'seoInfo', 'seoKeyword', 'seoTitle', 'siteName', 'softwareProductName', 'tenderInfo', 'tenderName', # 'trademarkName', 'wechatName'] # # def get_pid_by_name(company_name): # ent_col = MONGO_CLIENT.get_database('enterprise').get_collection('EnterpriseBaseInfo') # query = {'ENTNAME': company_name} # projection = {'PID': 1} # res_doc = ent_col.find_one(query, projection) # if res_doc and 'PID' in res_doc: # return res_doc['PID'] # # return None # # # def search_by_name(company_name): # pid = get_pid_by_name(company_name) # print('pid: %s' % pid) # if pid: # company_details = search_by_pid(pid) # return company_details # return None # # # def get_detail_from_es(index, pid, includes=['id']): # try: # return ES_CLIENT.get(index=index, id=pid, _source_includes=includes) # except NotFoundError as not_found_e: # pass # except Exception as e: # print('Unknown error when get detail from es. error:%s' % str(e)) # # return None # # # def search_by_pid(pid): # return get_detail_from_es('company_info_prod', pid, INCLUDES) # # # def main(local_save_path): # names = fp.readlines() # result_docs = [] # for name in names: # print('company name: %s' % name) # detail = search_by_name(name.replace('\n', '').strip()) # if detail is None: # print('can not found detail for name:%s' % name) # continue # source = detail['_source'] # current_doc = [] # # 为了保证顺序,需按INCLUDES遍历获取字段值 # for field in INCLUDES: # if field in source: # if isinstance(source[field], str) and source[field] != '': # current_doc.append(source[field].replace('\t', ';').replace('\n', ';').replace('\r', ';')) # elif isinstance(source[field], list) and source[field] != []: # current_doc.append( # ','.join(source[field]).replace('\t', ';').replace('\n', ';').replace('\r', ';')) # else: # current_doc.append('') # else: # current_doc.append('') # result_docs.append(current_doc) # res_df = pd.DataFrame(result_docs, mysql_column_list=INCLUDES) # res_df.to_csv(local_save_path, sep='\t', index=False, encoding='utf-8', header=True) # # os.system("source /etc/profile;hadoop fs -put %s %s" % (local_save_path, hdfs_save_path)) # # # FLAGS = set('n ng nrfg nrt nt vn un'.split()) # # # def lcut(text): # if isinstance(text, list): # print(text) # return [ # w.word for sentence in split('[^a-zA-Z0-9\u4e00-\u9fa5]+', text.strip()) # for w in dt.cut(sentence) if len(w.word) > 2 and w.flag in FLAGS] # # # class TFIDF: # def __init__(self, dictionary, model): # self.model = model # self.doc2bow = dictionary.doc2bow # self.id2word = {i: w for w, i in dictionary.token2id.items()} # # @classmethod # def train(cls, texts): # texts1 = [lcut(text) for text in texts] # dictionary = Dictionary(texts1) # corpus = [dictionary.doc2bow(text) for text in texts1] # model = TfidfModel(corpus) # return cls(dictionary, model) # # def extract(self, text, top_n=10): # vector = self.doc2bow(lcut(text)) # key_words = sorted(self.model[vector], key=lambda x: x[1], reverse=True) # return [self.id2word[i] for i, j in key_words][:top_n] # # # def extract_keywords(mysql_column_list): # columns_list = raw_data_df[mysql_column_list].values.tolist() # combined = [';'.join(row) for row in columns_list] # keywords = tf_idf_model.extract(';'.join(combined), top_n=50) # print(','.join(keywords)) # return keywords # # # if __name__ == '__main__': # fp = open(get_abs_path('data/seed_company_name_yidong.csv')) # local_save_path = '/root/wwj-hive-warehouse/data/zhong_qi_customer_info_yidong.csv' # hdfs_save_path = '' # # main(local_save_path) # raw_data_df = pd.read_csv(local_save_path, sep='\t', encoding='utf-8') # raw_data_df = raw_data_df.fillna('') # raw_data_df['combined'] = raw_data_df[INCLUDES].apply(lambda row: ';'.join(row.values.astype(str)), axis=1) # all_text_info_list = raw_data_df['combined'].values.tolist() # tf_idf_model = TFIDF.train(all_text_info_list) # # op_scope_keywords = extract_keywords(['opScope', 'b2bOpScope']) # product_keywords = extract_keywords(['appInfo', 'appName', 'b2bProduct']) # jobName_keywords = extract_keywords(['jobName']) # semKeyword_keywords = extract_keywords(['semKeyword']) # baike_keywords = extract_keywords(['baikeInfo']) # b2bInfo_keywords = extract_keywords(['b2bInfo']) # # opScope = raw_data_df[['opScope']].values.tolist() # # opScope_keywords = tf_idf_model.extract(opScope) # # # # opScope = raw_data_df[['opScope']].values.tolist() # # opScope_keywords = tf_idf_model.extract(opScope) # # # # opScope = raw_data_df[['opScope']].values.tolist() # # opScope_keywords = tf_idf_model.extract(opScope) # # pid = get_pid_by_name('杭州德玛瑞户外用品有限公司') # # print(pid) # mongo_url_a42 = 'mongodb://dw_all_ro:Dt#R30ES@' \ # 'dds-m5e44df0967967a41.mongodb.rds.aliyuncs.com:3717,' \ # 'dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com:3717,' \ # 'dds-m5e44df0967967a43.mongodb.rds.aliyuncs.com:3717' \ # '/admin?replicaSet=mgset-12596773&readReference=secondaryPreferred' # a42_handler = MongoDBHandler(mongo_url_a42) # mongo_url_b41 = 'mongodb://dw_all_rw:W#ioQseT@' \ # 'dds-m5ed9ea9d9a653b41.mongodb.rds.aliyuncs.com:3717,' \ # 'dds-m5ed9ea9d9a653b42.mongodb.rds.aliyuncs.com:3717,' \ # 'dds-m5ed9ea9d9a653b43.mongodb.rds.aliyuncs.com:3717' \ # '/admin?replicaSet=mgset-45687639&readReference=secondaryPreferred' # b41_handler = MongoDBHandler(mongo_url_b41) # mongo_url_dev = 'mongodb://dev_dw_ro:Dt#R30ES@dds-m5e686962c7b71641431-pub.mongodb.rds.aliyuncs.com:3717/admin' # dev_handler = MongoDBHandler(mongo_url_dev) # # if __name__ == '__main__': # mongo_handler = MongoDBHandler(url='dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com', # port=3717, # username='dw_all_ro', # password='Dt#R30ES', # check_db='admin') # for db in mongo_handler.mongo_client.list_database_names(): # print(db)