Ver Fonte

refactor: 删除 mongodb_utils + kb/90 §5.1 档案化

tianyu.chu há 2 semanas atrás
pai
commit
cc2886f738
3 ficheiros alterados com 15 adições e 187 exclusões
  1. 0 184
      dw_base/database/mongodb_utils.py
  2. 14 3
      kb/90-重构路线.md
  3. 1 0
      kb/92-重构进度.md

+ 0 - 184
dw_base/database/mongodb_utils.py

@@ -1,184 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-
-from typing import List
-
-from pymongo import MongoClient
-
-
-class MongoDBHandler:
-    def __init__(self, url: str, port: int = None, username: str = None, password: str = None, check_db: str = None,
-                 extra_config: List[str] = None):
-        if not port or not username or not password or not check_db:
-            uri = url
-        else:
-            uri = f'mongodb://{username}:{password}@{url}:{port}/{check_db}'
-            if extra_config:
-                uri = f'{uri}?{"&".join(extra_config)}'
-        self.mongo_client = MongoClient(uri)
-
-
-# ES_CLIENT = Elasticsearch('es-cn-nif1oiv5w0009di0f.public.elasticsearch.aliyuncs.com:9200',
-#                               http_auth=('datafix', 'Aa123456'))
-#
-# INCLUDES = ['id', 'appInfo', 'appName', 'b2bClient', 'b2bInfo', 'b2bOpScope', 'b2bProduct', 'baikeInfo', 'blogName',
-#                 'buyingInfo', 'certL2Name', 'contactAddress', 'entName', 'historyName', 'jobName', 'licenseContent',
-#                 'licenseFileName', 'licenseOffice', 'miniAppName', 'opScope', 'patentName', 'semKeyword', 'semTitle',
-#                 'seoInfo', 'seoKeyword', 'seoTitle', 'siteName', 'softwareProductName', 'tenderInfo', 'tenderName',
-#                 'trademarkName', 'wechatName']
-#
-# def get_pid_by_name(company_name):
-#     ent_col = MONGO_CLIENT.get_database('enterprise').get_collection('EnterpriseBaseInfo')
-#     query = {'ENTNAME': company_name}
-#     projection = {'PID': 1}
-#     res_doc = ent_col.find_one(query, projection)
-#     if res_doc and 'PID' in res_doc:
-#         return res_doc['PID']
-#
-#     return None
-#
-#
-# def search_by_name(company_name):
-#     pid = get_pid_by_name(company_name)
-#     print('pid: %s' % pid)
-#     if pid:
-#         company_details = search_by_pid(pid)
-#         return company_details
-#     return None
-#
-#
-# def get_detail_from_es(index, pid, includes=['id']):
-#     try:
-#         return ES_CLIENT.get(index=index, id=pid, _source_includes=includes)
-#     except NotFoundError as not_found_e:
-#         pass
-#     except Exception as e:
-#         print('Unknown error when get detail from es. error:%s' % str(e))
-#
-#     return None
-#
-#
-# def search_by_pid(pid):
-#     return get_detail_from_es('company_info_prod', pid, INCLUDES)
-#
-#
-# def main(local_save_path):
-#     names = fp.readlines()
-#     result_docs = []
-#     for name in names:
-#         print('company name: %s' % name)
-#         detail = search_by_name(name.replace('\n', '').strip())
-#         if detail is None:
-#             print('can not found detail for name:%s' % name)
-#             continue
-#         source = detail['_source']
-#         current_doc = []
-#         # 为了保证顺序,需按INCLUDES遍历获取字段值
-#         for field in INCLUDES:
-#             if field in source:
-#                 if isinstance(source[field], str) and source[field] != '':
-#                     current_doc.append(source[field].replace('\t', ';').replace('\n', ';').replace('\r', ';'))
-#                 elif isinstance(source[field], list) and source[field] != []:
-#                     current_doc.append(
-#                         ','.join(source[field]).replace('\t', ';').replace('\n', ';').replace('\r', ';'))
-#                 else:
-#                     current_doc.append('')
-#             else:
-#                 current_doc.append('')
-#         result_docs.append(current_doc)
-#     res_df = pd.DataFrame(result_docs, mysql_column_list=INCLUDES)
-#     res_df.to_csv(local_save_path, sep='\t', index=False, encoding='utf-8', header=True)
-#     # os.system("source /etc/profile;hadoop fs -put %s %s" % (local_save_path, hdfs_save_path))
-#
-#
-# FLAGS = set('n ng nrfg nrt nt vn un'.split())
-#
-#
-# def lcut(text):
-#     if isinstance(text, list):
-#         print(text)
-#     return [
-#         w.word for sentence in split('[^a-zA-Z0-9\u4e00-\u9fa5]+', text.strip())
-#         for w in dt.cut(sentence) if len(w.word) > 2 and w.flag in FLAGS]
-#
-#
-# class TFIDF:
-#     def __init__(self, dictionary, model):
-#         self.model = model
-#         self.doc2bow = dictionary.doc2bow
-#         self.id2word = {i: w for w, i in dictionary.token2id.items()}
-#
-#     @classmethod
-#     def train(cls, texts):
-#         texts1 = [lcut(text) for text in texts]
-#         dictionary = Dictionary(texts1)
-#         corpus = [dictionary.doc2bow(text) for text in texts1]
-#         model = TfidfModel(corpus)
-#         return cls(dictionary, model)
-#
-#     def extract(self, text, top_n=10):
-#         vector = self.doc2bow(lcut(text))
-#         key_words = sorted(self.model[vector], key=lambda x: x[1], reverse=True)
-#         return [self.id2word[i] for i, j in key_words][:top_n]
-#
-#
-# def extract_keywords(mysql_column_list):
-#     columns_list = raw_data_df[mysql_column_list].values.tolist()
-#     combined = [';'.join(row) for row in columns_list]
-#     keywords = tf_idf_model.extract(';'.join(combined), top_n=50)
-#     print(','.join(keywords))
-#     return keywords
-#
-#
-# if __name__ == '__main__':
-#     fp = open(get_abs_path('data/seed_company_name_yidong.csv'))
-#     local_save_path = '/root/wwj-hive-warehouse/data/zhong_qi_customer_info_yidong.csv'
-#     hdfs_save_path = ''
-#
-#     main(local_save_path)
-#     raw_data_df = pd.read_csv(local_save_path, sep='\t', encoding='utf-8')
-#     raw_data_df = raw_data_df.fillna('')
-#     raw_data_df['combined'] = raw_data_df[INCLUDES].apply(lambda row: ';'.join(row.values.astype(str)), axis=1)
-#     all_text_info_list = raw_data_df['combined'].values.tolist()
-#     tf_idf_model = TFIDF.train(all_text_info_list)
-#
-#     op_scope_keywords = extract_keywords(['opScope', 'b2bOpScope'])
-#     product_keywords = extract_keywords(['appInfo', 'appName', 'b2bProduct'])
-#     jobName_keywords = extract_keywords(['jobName'])
-#     semKeyword_keywords = extract_keywords(['semKeyword'])
-#     baike_keywords = extract_keywords(['baikeInfo'])
-#     b2bInfo_keywords = extract_keywords(['b2bInfo'])
-#     # opScope = raw_data_df[['opScope']].values.tolist()
-#     # opScope_keywords = tf_idf_model.extract(opScope)
-#     #
-#     # opScope = raw_data_df[['opScope']].values.tolist()
-#     # opScope_keywords = tf_idf_model.extract(opScope)
-#     #
-#     # opScope = raw_data_df[['opScope']].values.tolist()
-#     # opScope_keywords = tf_idf_model.extract(opScope)
-#     # pid = get_pid_by_name('杭州德玛瑞户外用品有限公司')
-#     # print(pid)
-
-# mongo_url_a42 = 'mongodb://dw_all_ro:Dt#R30ES@' \
-#                 'dds-m5e44df0967967a41.mongodb.rds.aliyuncs.com:3717,' \
-#                 'dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com:3717,' \
-#                 'dds-m5e44df0967967a43.mongodb.rds.aliyuncs.com:3717' \
-#                 '/admin?replicaSet=mgset-12596773&readReference=secondaryPreferred'
-# a42_handler = MongoDBHandler(mongo_url_a42)
-# mongo_url_b41 = 'mongodb://dw_all_rw:W#ioQseT@' \
-#                 'dds-m5ed9ea9d9a653b41.mongodb.rds.aliyuncs.com:3717,' \
-#                 'dds-m5ed9ea9d9a653b42.mongodb.rds.aliyuncs.com:3717,' \
-#                 'dds-m5ed9ea9d9a653b43.mongodb.rds.aliyuncs.com:3717' \
-#                 '/admin?replicaSet=mgset-45687639&readReference=secondaryPreferred'
-# b41_handler = MongoDBHandler(mongo_url_b41)
-# mongo_url_dev = 'mongodb://dev_dw_ro:Dt#R30ES@dds-m5e686962c7b71641431-pub.mongodb.rds.aliyuncs.com:3717/admin'
-# dev_handler = MongoDBHandler(mongo_url_dev)
-#
-# if __name__ == '__main__':
-#     mongo_handler = MongoDBHandler(url='dds-m5e44df0967967a42.mongodb.rds.aliyuncs.com',
-#                                    port=3717,
-#                                    username='dw_all_ro',
-#                                    password='Dt#R30ES',
-#                                    check_db='admin')
-#     for db in mongo_handler.mongo_client.list_database_names():
-#         print(db)

+ 14 - 3
kb/90-重构路线.md

@@ -485,12 +485,23 @@ sql = "... WHERE TABLE_SCHEMA='%s' ..." % (database, table_name)
 
 ## 五、清理废弃代码(中优先级)
 
+截至 2026-04-20 本节已无待清理项。后续若发现新的废弃代码,在下方表格追加登记;已完成项保留在"历史档案"表中留档。
+
+### 5.1 待清理(当前为空)
+
 | 模块/文件 | 状态 | 建议 |
 |----------|------|------|
-| `dw_base/database/mongodb_utils.py` | 约 80% 是注释掉的旧代码 | 清理注释 |
-| `conf/datax/` 下全部内容 | 已废弃的旧配置 | 保留少量样例,其余删除 |
+| _(无)_ |  |  |
+
+### 5.2 历史档案(已完成)
+
+| 模块/文件 | 完成动作 | 完成时间 / 来源 |
+|----------|---------|----------------|
+| `dw_base/{validation,ml,flink,elasticsearch}/` 四空壳模块 | 整目录删(仅 56 字节空 `__init__.py`,零引用) | 2026-04-20,见 `92-进度.md` |
+| `dw_base/database/mongodb_utils.py` | 整文件删(184 行 = 19 行 MongoClient 薄包装 + 165 行注释,零外部引用,需要时 `MongoClient(uri)` 一行重写) | 2026-04-20,见 `92-进度.md` |
+| `conf/datax/` 下老项目遗留 ini / datasource 样例 | 整批挪入 `conf/bak/datax/{config,datasource}/`,由 `.gitignore:6 conf/bak` 拦截不入库 | 项目初始化 `8d2ade5`(2026-04-17) |
 
-> `dw_base/{validation,ml,flink,elasticsearch}/` 四个空壳模块已于 2026-04-20 删除,详见 `92-重构进度.md`。
+> 代码里残留的 `conf/datax/config/` replace 死逻辑 + `conf/datax/generated` 默认值,属于 §2.x 路径硬编码清理范畴(改名为 `conf/datax-json/` + 删 replace),不在本节
 
 ## 六、测试体系搭建(中优先级)
 

+ 1 - 0
kb/92-重构进度.md

@@ -163,3 +163,4 @@
 | 2026-04-20 | **修正 §7.1 pyspark 误记**:前期文档把 pyspark 列进强依赖 KEEP 行 + "pyspark 2.4.0 固定" 一句,均与真实的 `requirements.txt` 不符。真实机制:`findspark==2.0.1` 运行时定位 CDH 集群已装 PySpark,版本随集群走,客户端不固定也不入 `requirements.txt`。kb/90 §7.1 表格 KEEP 列去 pyspark + "后续事项"末行改为 findspark 机制说明 | — |
 | 2026-04-20 | **UDF 模块重组(重构计划外)**:独立 `dw_base/spark/udf/` 目录结构为 `common/`(通用 UDF,SparkSQL 入口自动 `ADD FILE` 注册)+ `business/`(业务专用 UDF,SQL 中按需 `ADD FILE` 加载)两类。(a) 6 份源文件(根 `spark_common_udf.py` 24 函数 + `spark_json_array_udf.py` 23 函数 + `spark_mmq_udf.py` 3 函数 + `customs/cts_common.py` + `product/escape_udf.py` + `enterprise/spark_eng_ent_json_array_append_udf.py`)通读 + 去重 + 业务耦合剥离后,合并为单文件 `common/spark_common_udf.py`(500 行 40 函数,分 JSON / Array / String / Numeric-Date-Hash / Cross-type-converters 5 段)。单文件方案而非按类型拆分,理由:跨类型转换函数(`json2str` / `arr2json` / `str2map` 等约 9 个,占 20%+)没有明确归属,强行分只会制造边界争议。(b) 清理 `dw_base/spark/udf/` 下所有老业务 UDF 子目录与根级业务文件共 60 个:整目录删 `contacts/` / `customs/` / `enterprise/` / `product/` / `productApplication/` / `test/`;根目录删 `spark_eng_ent_name_clean.py` / `spark_india_format_phone_udf.py` / `solr_similar_match_udf.py` / `main_test.py` 以及 3 份源 UDF 文件。(c) `dw_base/__init__.py:27` `COMMON_SPARK_UDF_FILE` 常量路径由 `dw_base/spark/udf/spark_common_udf.py` 改为 `dw_base/spark/udf/common/spark_common_udf.py`(`bin/spark-sql-starter.py:172-173` 两处 usage 靠常量传递自动生效)。(d) 删除老 `dingtalk_*` / `mg2es` 级联清理中没赶上的 UDF 业务耦合文件在此批统一清零。`business/` 目录暂为骨架,后续真正出现新业务 UDF 时按需补 | — |
 | 2026-04-20 | **删除空壳模块 `ml/` / `elasticsearch/` / `flink/` / `validation/`(反转 2026-04-20 早先"暂留"记录)**:4 个目录下均只有 56 字节空 `__init__.py`,零 import / 零内容,保留无意义;2026-04-20 UDF 模块重组 changelog 末尾"暂留"一句是误记。`git rm -r` 一批清零。同步 `kb/90-重构路线.md §5.1` 从废弃代码表中移除这 4 行并加指向本条 changelog 的尾注。`dw_base/common/` 因 `alerter_constants.py` / `config_constants.py` / `container.py` / `template_constants.py` 非空保留,不在本批 | — |
+| 2026-04-20 | **删除 `dw_base/database/mongodb_utils.py` + kb/90 §5.1 档案化改造**:(a) `mongodb_utils.py` 184 行真实内容仅 `MongoDBHandler` 薄类 19 行(吃 url/port/user/pwd 拼 URI 实例化 MongoClient),其余 165 行全是公司名→Mongo/ES 查询 + TF-IDF 关键词抽取 + 三段老集群 `dds-m5e*` 连接串注释。grep 零外部引用。新项目若需连 Mongo 一行 `MongoClient(uri)` 即可,薄包装无保留价值。(b) §5.1 原表两行实际状态:`mongodb_utils.py` = 本次删除;`conf/datax/` 下老项目遗留 ini/datasource = 项目初始化 `8d2ade5` 时已整体挪入 `conf/bak/datax/{config,datasource}/` 并由 `.gitignore:6 conf/bak` 拦截,早已完成但没画勾。(c) §5.1 改造为"待清理表(当前为空)+ 历史档案表(完成项留档)"双表结构,空壳模块 4 行 + 本次 mongodb_utils + conf/datax 挪 bak 三项入档案。尾注说明:代码里残留的 `conf/datax/config/` replace 死逻辑 + `conf/datax/generated` 默认值属于 §2.x 路径硬编码清理(改名 `conf/datax-json/`),不在本节范围 | — |