| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- from typing import List
- from pymongo import MongoClient
- class MongoDBHandler:
- def __init__(self, url: str, port: int = None, username: str = None, password: str = None, check_db: str = None,
- extra_config: List[str] = None):
- if not port or not username or not password or not check_db:
- uri = url
- else:
- uri = f'mongodb://{username}:{password}@{url}:{port}/{check_db}'
- if extra_config:
- uri = f'{uri}?{"&".join(extra_config)}'
- self.mongo_client = MongoClient(uri)
- # ES_CLIENT = Elasticsearch('es-cn-nif1oiv5w0009di0f.public.elasticsearch.aliyuncs.com:9200',
- # http_auth=('datafix', 'Aa123456'))
- #
- # INCLUDES = ['id', 'appInfo', 'appName', 'b2bClient', 'b2bInfo', 'b2bOpScope', 'b2bProduct', 'baikeInfo', 'blogName',
- # 'buyingInfo', 'certL2Name', 'contactAddress', 'entName', 'historyName', 'jobName', 'licenseContent',
- # 'licenseFileName', 'licenseOffice', 'miniAppName', 'opScope', 'patentName', 'semKeyword', 'semTitle',
- # 'seoInfo', 'seoKeyword', 'seoTitle', 'siteName', 'softwareProductName', 'tenderInfo', 'tenderName',
- # 'trademarkName', 'wechatName']
- #
- # def get_pid_by_name(company_name):
- # ent_col = MONGO_CLIENT.get_database('enterprise').get_collection('EnterpriseBaseInfo')
- # query = {'ENTNAME': company_name}
- # projection = {'PID': 1}
- # res_doc = ent_col.find_one(query, projection)
- # if res_doc and 'PID' in res_doc:
- # return res_doc['PID']
- #
- # return None
- #
- #
- # def search_by_name(company_name):
- # pid = get_pid_by_name(company_name)
- # print('pid: %s' % pid)
- # if pid:
- # company_details = search_by_pid(pid)
- # return company_details
- # return None
- #
- #
- # def get_detail_from_es(index, pid, includes=['id']):
- # try:
- # return ES_CLIENT.get(index=index, id=pid, _source_includes=includes)
- # except NotFoundError as not_found_e:
- # pass
- # except Exception as e:
- # print('Unknown error when get detail from es. error:%s' % str(e))
- #
- # return None
- #
- #
- # def search_by_pid(pid):
- # return get_detail_from_es('company_info_prod', pid, INCLUDES)
- #
- #
- # def main(local_save_path):
- # names = fp.readlines()
- # result_docs = []
- # for name in names:
- # print('company name: %s' % name)
- # detail = search_by_name(name.replace('\n', '').strip())
- # if detail is None:
- # print('can not found detail for name:%s' % name)
- # continue
- # source = detail['_source']
- # current_doc = []
- # # 为了保证顺序,需按INCLUDES遍历获取字段值
- # for field in INCLUDES:
- # if field in source:
- # if isinstance(source[field], str) and source[field] != '':
- # current_doc.append(source[field].replace('\t', ';').replace('\n', ';').replace('\r', ';'))
- # elif isinstance(source[field], list) and source[field] != []:
- # current_doc.append(
- # ','.join(source[field]).replace('\t', ';').replace('\n', ';').replace('\r', ';'))
- # else:
- # current_doc.append('')
- # else:
- # current_doc.append('')
- # result_docs.append(current_doc)
- # res_df = pd.DataFrame(result_docs, mysql_column_list=INCLUDES)
- # res_df.to_csv(local_save_path, sep='\t', index=False, encoding='utf-8', header=True)
- # # os.system("source /etc/profile;hadoop fs -put %s %s" % (local_save_path, hdfs_save_path))
- #
- #
- # FLAGS = set('n ng nrfg nrt nt vn un'.split())
- #
- #
- # def lcut(text):
- # if isinstance(text, list):
- # print(text)
- # return [
- # w.word for sentence in split('[^a-zA-Z0-9\u4e00-\u9fa5]+', text.strip())
- # for w in dt.cut(sentence) if len(w.word) > 2 and w.flag in FLAGS]
- #
- #
- # class TFIDF:
- # def __init__(self, dictionary, model):
- # self.model = model
- # self.doc2bow = dictionary.doc2bow
- # self.id2word = {i: w for w, i in dictionary.token2id.items()}
- #
- # @classmethod
- # def train(cls, texts):
- # texts1 = [lcut(text) for text in texts]
- # dictionary = Dictionary(texts1)
- # corpus = [dictionary.doc2bow(text) for text in texts1]
- # model = TfidfModel(corpus)
- # return cls(dictionary, model)
- #
- # def extract(self, text, top_n=10):
- # vector = self.doc2bow(lcut(text))
- # key_words = sorted(self.model[vector], key=lambda x: x[1], reverse=True)
- # return [self.id2word[i] for i, j in key_words][:top_n]
- #
- #
- # def extract_keywords(mysql_column_list):
- # columns_list = raw_data_df[mysql_column_list].values.tolist()
- # combined = [';'.join(row) for row in columns_list]
- # keywords = tf_idf_model.extract(';'.join(combined), top_n=50)
- # print(','.join(keywords))
- # return keywords
- #
- #
- # if __name__ == '__main__':
- # fp = open(get_abs_path('data/seed_company_name_yidong.csv'))
- # local_save_path = '/root/wwj-hive-warehouse/data/zhong_qi_customer_info_yidong.csv'
- # hdfs_save_path = ''
- #
- # main(local_save_path)
- # raw_data_df = pd.read_csv(local_save_path, sep='\t', encoding='utf-8')
- # raw_data_df = raw_data_df.fillna('')
- # raw_data_df['combined'] = raw_data_df[INCLUDES].apply(lambda row: ';'.join(row.values.astype(str)), axis=1)
- # all_text_info_list = raw_data_df['combined'].values.tolist()
- # tf_idf_model = TFIDF.train(all_text_info_list)
- #
- # op_scope_keywords = extract_keywords(['opScope', 'b2bOpScope'])
- # product_keywords = extract_keywords(['appInfo', 'appName', 'b2bProduct'])
- # jobName_keywords = extract_keywords(['jobName'])
- # semKeyword_keywords = extract_keywords(['semKeyword'])
- # baike_keywords = extract_keywords(['baikeInfo'])
- # b2bInfo_keywords = extract_keywords(['b2bInfo'])
- # # opScope = raw_data_df[['opScope']].values.tolist()
- # # opScope_keywords = tf_idf_model.extract(opScope)
- # #
- # # opScope = raw_data_df[['opScope']].values.tolist()
- # # opScope_keywords = tf_idf_model.extract(opScope)
- # #
- # # opScope = raw_data_df[['opScope']].values.tolist()
- # # opScope_keywords = tf_idf_model.extract(opScope)
- # # pid = get_pid_by_name('杭州德玛瑞户外用品有限公司')
- # # print(pid)
- # mongo_url_a42 = 'mongodb://dw_all_ro:Dt#R30ES@' \
- # 'dds-m5e44df0967967a41.mongodb.rds.aliyuncs.com:3717,' \
- # 'dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com:3717,' \
- # 'dds-m5e44df0967967a43.mongodb.rds.aliyuncs.com:3717' \
- # '/admin?replicaSet=mgset-12596773&readReference=secondaryPreferred'
- # a42_handler = MongoDBHandler(mongo_url_a42)
- # mongo_url_b41 = 'mongodb://dw_all_rw:W#ioQseT@' \
- # 'dds-m5ed9ea9d9a653b41.mongodb.rds.aliyuncs.com:3717,' \
- # 'dds-m5ed9ea9d9a653b42.mongodb.rds.aliyuncs.com:3717,' \
- # 'dds-m5ed9ea9d9a653b43.mongodb.rds.aliyuncs.com:3717' \
- # '/admin?replicaSet=mgset-45687639&readReference=secondaryPreferred'
- # b41_handler = MongoDBHandler(mongo_url_b41)
- # mongo_url_dev = 'mongodb://dev_dw_ro:Dt#R30ES@dds-m5e686962c7b71641431-pub.mongodb.rds.aliyuncs.com:3717/admin'
- # dev_handler = MongoDBHandler(mongo_url_dev)
- #
- # if __name__ == '__main__':
- # mongo_handler = MongoDBHandler(url='dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com',
- # port=3717,
- # username='dw_all_ro',
- # password='Dt#R30ES',
- # check_db='admin')
- # for db in mongo_handler.mongo_client.list_database_names():
- # print(db)
|