Przeglądaj źródła

refactor(dw_base): 删除 oss/scheduler/hive 整目录 + utils/ 7 个老业务耦合文件

tianyu.chu 2 tygodni temu
rodzic
commit
96b8099ffe

+ 0 - 3
dw_base/hive/__init__.py

@@ -1,3 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-

+ 0 - 30
dw_base/hive/hive_constants.py

@@ -1,30 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-COLUMN_NAME_COMMENT_DICT = {
-    'valid': '是否校验通过, bool类型',
-    'validate_desc': '校验结果详情',
-    'crawler_time': '抓取时间',
-    'create_date': '创建日期',
-    'update_date': '更新日期',
-    'record_status': '记录状态',
-    'pid': 'PID',
-    'company_name': '企业名称',
-    'db_name': '来源站(拼音)',
-    'source_name': '来源站(中文)',
-    'web_name': '来源站',
-    'url': '可用外链链接',
-    'url_desc': '下载数据链接',
-    'json_cache_id': 'json页面OSS id',
-    'html_cache_id': 'HTML页面OSS id',
-    'use_status': '删除标记',
-    'is_final': '能否被爬虫更新',
-    'id': 'pg库自增主键'
-}
-COLUMN_NAME_TYPE_DICT = {
-    'valid': 'BOOLEAN',
-    'crawler_time': 'BIGINT',
-    'create_date': 'BIGINT',
-    'update_date': 'BIGINT',
-    'use_status': 'INT',
-    'id': 'BIGINT'
-}

+ 0 - 141
dw_base/hive/hive_utils.py

@@ -1,141 +0,0 @@
-# -*- coding:utf-8 -*-
-
-from typing import List, Dict
-
-from dw_base.hive.hive_constants import COLUMN_NAME_TYPE_DICT, COLUMN_NAME_COMMENT_DICT
-
-
-def get_hive_database_name(project: str, layer: str, env: str) -> str:
-    """
-    获取Hive数据库名称
-    Args:
-        project: 所属项目
-        layer: 所属层次
-        env: 所属环境
-    Returns: Hive数据库名称
-    """
-    if project and project != '':
-        if layer and layer != '':
-            database = f'{project}_{layer}'
-        else:
-            database = f'{project}_ods'
-    elif layer and layer != '':
-        database = layer
-    else:
-        database = 'tmp'
-    if database != 'tmp' and env and env != '':
-        database = f'{database}_{env}'
-    return database
-
-
-def get_hive_table_prefix(project: str, layer: str, version: str) -> str:
-    """
-    获取表名前缀
-    Args:
-        project: 所属项目
-        layer: 所属层次
-        version: 所属版本
-    Returns: 表名前缀
-    """
-    if layer and layer != '':
-        if project and project != '':
-            prefix = f'{layer}_{project}'
-        else:
-            prefix = layer
-        if version and version != '':
-            prefix = f'{prefix}_{version}'
-    elif project and project != '':
-        prefix = project
-    else:
-        prefix = 'tmp'
-    return prefix
-
-
-def get_hive_create_table_ddl(database: str,
-                              table: str,
-                              columns: List[str],
-                              columns_with_types: Dict,
-                              comment: str = '',
-                              is_external: bool = False,
-                              is_partitioned: bool = False):
-    """
-    生成Hive建表语句
-    Args:
-        database: 数据库名称
-        table: 表名称
-        columns: 字段
-        comment: 表注释
-        is_external: 是否是外部表
-        is_partitioned: 是否是分区表
-    Returns:
-    """
-    ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n\tCOMMENT '{4}'\n{5}\tSTORED AS ORC\n;"
-    if is_external:
-        argument1 = 'EXTERNAL '
-    else:
-        argument1 = ''
-    if database is None:
-        table_with_database = table
-    else:
-        table_with_database = f'{database}.{table}'
-    max_column_length = max(map(len, columns))
-    column_defs = []
-    for col in columns:
-        padded_col = col.ljust(max_column_length, ' ')
-        if columns_with_types.__contains__(col):
-            col_type = columns_with_types.get(col)
-        else:
-            col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING')
-        col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '')
-        column_defs.append(f"    {padded_col} {col_type} COMMENT '{col_comment}'")
-    argument3 = ',\n'.join(column_defs)
-    if is_partitioned:
-        argument5 = '\tPARTITIONED BY (dt STRING)\n'
-    else:
-        argument5 = ''
-    return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5)
-
-
-def get_hive_create_table_ddl_sop(database: str,
-                                  table: str,
-                                  columns: List[str],
-                                  columns_with_types: Dict,
-                                  comment: str = '',
-                                  is_external: bool = False,
-                                  is_partitioned: bool = False):
-    """
-    生成Hive建表语句
-    Args:
-        database: 数据库名称
-        table: 表名称
-        columns: 字段
-        comment: 表注释
-        is_external: 是否是外部表
-        is_partitioned: 是否是分区表
-    Returns:
-    """
-    ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n    COMMENT '{4}'\n{5}    STORED AS ORC\n;"
-    if is_external:
-        argument1 = 'EXTERNAL '
-    else:
-        argument1 = ''
-    if database is None:
-        table_with_database = table
-    else:
-        table_with_database = f'{database}.{table}'
-    max_column_length = max(map(len, columns))
-    column_defs = []
-    for col in columns:
-        padded_col = col.ljust(max_column_length, ' ')
-        if columns_with_types.__contains__(col):
-            col_type = columns_with_types.get(col)
-        else:
-            col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING')
-        col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '')
-        column_defs.append(f"    {padded_col} {col_type} COMMENT '{col_comment}'")
-    argument3 = ',\n'.join(column_defs)
-    if is_partitioned:
-        argument5 = '    PARTITIONED BY (dt STRING)\n'
-    else:
-        argument5 = ''
-    return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5)

+ 0 - 3
dw_base/oss/__init__.py

@@ -1,3 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-

+ 0 - 235
dw_base/oss/oss2_util.py

@@ -1,235 +0,0 @@
-import os
-
-import oss2
-
-from dw_base.utils.file_utils import get_abs_path
-
-
-class Readable:
-    def read(self):
-        pass
-
-
-class OssClient:
-    """
-    OSS术语
-    English                  | 中文
-    Bucket                   | 存储空间
-    Object                   | 对象或者文件
-    Endpoint                 | OSS 访问域名
-    Region                   | 地域或者数据中心
-    AccessKey                | AccessKeyId 和 AccessKeySecret 的统称,访问密钥
-    Put Object               | 简单上传
-    Post Object              | 表单上传
-    Multipart Upload         | 分片上传
-    Append Object            | 追加上传
-    Get Object               | 简单下载
-                             | 回调
-    Object Meta              | 文件元信息。用来描述文件信息,例如长度,类型等
-    Data                     | 文件数据
-    Key                      | 文件名
-    ACL (Access Control List)| 存储空间或者文件的权限
-    """
-
-    def __init__(self, access_key_id, access_key_secret, endpoint, bucket_name=None):
-        """
-        提供操作OSS各种方法
-        :param str access_key_id:
-        :param str access_key_secret:
-        :param str endpoint:
-        :rtype: OssClient
-        """
-        self._access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', access_key_id)
-        self._access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', access_key_secret)
-        self._endpoint = os.getenv('OSS_TEST_ENDPOINT', endpoint)
-        self._auth = oss2.Auth(self._access_key_id, self._access_key_secret)
-        self._bucket_name = bucket_name
-        if bucket_name:
-            self._bucket = oss2.Bucket(self._auth, self._endpoint, bucket_name)
-        # 创建一个Service对象
-        self._service = oss2.Service(self._auth, self._endpoint)
-
-    def list_buckets(self, prefix='', marker='', max_keys=100, params=None):
-        """
-        根据前缀罗列用户的Bucket
-        :param str prefix: 只罗列Bucket名为该前缀的Bucket,空串表示罗列所有的Bucket
-        :param str marker: 分页标志。首次调用传空串,后续使用返回值中的next_marker
-        :param int max_keys: 每次调用最多返回的Bucket数目
-        :param dict params: list操作参数,传入'tag-key','tag-value'对结果进行过滤
-        :return: 罗列的结果
-        :rtype: oss2.models.ListBucketsResult
-        """
-        list_buckets_result = self._service.list_buckets(prefix, marker, max_keys, params)
-        return list_buckets_result
-
-    def get_bucket(self, bucket_name=None):
-        """
-        获取bucket
-        :param str bucket_name: bucket名称
-        :return:
-        :rtype: oss2.Bucket
-        """
-        assert bucket_name or self._bucket_name, 'need bucket name since default bucket is not provided'
-        if bucket_name:
-            return oss2.Bucket(self._auth, self._endpoint, bucket_name)
-        return self._bucket
-
-    def get_bucket_info(self, bucket_name=None):
-        """
-        获取bucket相关信息,如创建时间,访问Endpoint,Owner与ACL等。
-        :param str bucket_name: bucket名称
-        :return:
-        :rtype: oss2.models.GetBucketInfoResult
-        """
-        return self.get_bucket(bucket_name).get_bucket_info()
-
-    def get_bucket_status(self, bucket_name=None):
-        """
-        查看Bucket的状态,目前包括bucket大小,bucket的object数量,bucket正在上传的Multipart Upload事件个数等。
-        :param str bucket_name: bucket名称
-        :return:
-        :rtype: oss2.models.GetBucketStatResult
-        """
-        return self.get_bucket(bucket_name).get_bucket_stat()
-
-    def set_bucket_lifecycle(self, bucket_name=None, lifecycle_rule=None):
-        """
-        设置bucket生命周期
-        例:
-        # 设置bucket生命周期, 有'中文/'前缀的对象在最后修改时间之后357天失效
-        rule = oss2.models.LifecycleRule('lc_for_chinese_prefix', '中文/', status=oss2.models.LifecycleRule.ENABLED,
-                                         expiration=oss2.models.LifecycleExpiration(days=357))
-        # 删除相对最后修改时间365天之后的parts
-        rule.abort_multipart_upload = oss2.models.AbortMultipartUpload(days=356)
-        # 对象最后修改时间超过180天后转为IA
-        rule.storage_transitions = [oss2.models.StorageTransition(days=180, storage_class=oss2.BUCKET_STORAGE_CLASS_IA)]
-        # 对象最后修改时间超过356天后转为ARCHIVE
-        rule.storage_transitions.append(oss2.models.StorageTransition(days=356,
-                                                                      storage_class=oss2.BUCKET_STORAGE_CLASS_ARCHIVE))
-        lifecycle = oss2.models.BucketLifecycle([rule])
-        :param str bucket_name: bucket名称
-        :param oss2.models.BucketLifecycle lifecycle_rule: 生命周期
-        :return:
-        :rtype: oss2.models.RequestResult
-        """
-        assert not lifecycle_rule
-        return self.get_bucket(bucket_name).put_bucket_lifecycle(lifecycle_rule)
-
-    def upload_object(self, object_name, data, bucket_name=None, headers=None, progress_callback=None):
-        """
-        上传一个普通文件
-        :param str bucket_name: bucket名称
-        :param str object_name: 要上传的对象名称
-        :param bytes or str or Readable data: 要上传的数据(字节数组、字符串或file-like object——即含有read方法的对象)
-        :param dict[str,any] or oss2.CaseInsensitiveDict headers:
-        :param function progress_callback: 进度回调函数
-        :return:
-        :rtype: oss2.models.PutObjectResult
-        """
-        bucket = self.get_bucket(bucket_name)
-        put_object_result = bucket.put_object(object_name, data, headers, progress_callback)
-        return put_object_result
-
-    def upload_object_from_file(self, filename, object_name=None, bucket_name=None, headers=None,
-                                progress_callback=None):
-        """
-        上传一个普通文件
-        例:
-        1.  upload('my-bucket','my-file1.txt','content of my-file1')
-        2.  upload('my-bucket','my-file2.txt',b'content of my-file2')
-        3.  with open(oss2.to_unicode('my-file3.txt'), 'rb') as f:
-                upload('my-file3.txt', f)
-        :param str bucket_name: bucket名称
-        :param str filename: 要上传的文件名称
-        :param str object_name: 上传到OSS后新的对象名称
-        :param dict[str,any] or oss2.CaseInsensitiveDict headers:
-        :param function progress_callback: 进度回调函数
-        :return:
-        :rtype: oss2.models.PutObjectResult
-        """
-        if not object_name:
-            object_name = os.path.basename(filename)
-        bucket = self.get_bucket(bucket_name)
-        put_object_result = bucket.put_object_from_file(object_name, filename, headers, progress_callback)
-        return put_object_result
-
-    def download_object(self, bucket_object_name, local_object_name=None, bucket_name=None):
-        """
-        下载一个文件到本地文件
-        :param str bucket_name: bucket名称
-        :param str bucket_object_name: bucket上的对象名称
-        :param str local_object_name: 下载到本地后保存的对象名称
-        :return:
-        :rtype: oss2.models.GetObjectResult
-        """
-        bucket = self.get_bucket(bucket_name)
-        if local_object_name and bucket_object_name != local_object_name:
-            # 下载并修改文件名
-            get_object_result = bucket.get_object_to_file(bucket_object_name, local_object_name)
-        else:
-            get_object_result = bucket.get_object_to_file(bucket_object_name, bucket_object_name)
-        return get_object_result
-
-    def delete_object(self, object_name, bucket_name=None):
-        """
-        删除单个对象
-        :param str bucket_name: bucket名称
-        :param str object_name: 对象名称
-        :return:
-        :rtype: oss2.models.RequestResult
-        """
-        bucket = self.get_bucket(bucket_name)
-        return bucket.delete_object(object_name)
-
-    def delete_objects(self, objects_name, bucket_name=None):
-        """
-        批量删除对象
-        :param str bucket_name: bucket名称
-        :param list[str] objects_name: 对象名称list
-        :return:
-        :rtype: oss2.models.BatchDeleteObjectsResult
-        """
-        bucket = self.get_bucket(bucket_name)
-        return bucket.batch_delete_objects(objects_name)
-
-    def get_object(self, object_name, bucket_name=None):
-        """
-        获取一个对象
-        :param str object_name: 对象名称
-        :param str bucket_name: bucket名称
-        :return:
-        :rtype: oss2.models.GetObjectResult
-        """
-        bucket = self.get_bucket(bucket_name)
-        return bucket.get_object(object_name)
-
-    def get_object_meta(self, object_name, bucket_name=None):
-        """
-        获取一个对象的详细信息
-        :param str object_name: 对象名称
-        :param str bucket_name: bucket名称
-        :return:
-        :rtype: oss2.models.GetObjectMetaResult
-        """
-        bucket = self.get_bucket(bucket_name)
-        return bucket.get_object_meta(object_name)
-
-    def list_objects(self, bucket_name=None):
-        bucket = self.get_bucket(bucket_name)
-        return bucket.list_objects()
-
-
-DEFAULT_ACCESS_KEY_ID = 'LTAI5t9oGbXWacakS4PJyQsR'
-DEFAULT_ACCESS_KEY_SECRET = 'BeuOP5zsavBtsR8fQ5QmrNyczdCW1Q'
-DEFAULT_ENDPOINT = 'oss-cn-qingdao-internal.aliyuncs.com'
-DEFAULT_BUCKET_NAME = 'skb-applogo'
-DEFAULT_DOWNLOAD_ADDRESS = f'https://skb-applogo.oss-cn-qingdao.aliyuncs.com'
-DEFAULT_OSS_CLIENT = OssClient(
-    DEFAULT_ACCESS_KEY_ID,
-    DEFAULT_ACCESS_KEY_SECRET,
-    DEFAULT_ENDPOINT,
-    DEFAULT_BUCKET_NAME
-)
-
-if __name__ == '__main__':
-    DEFAULT_OSS_CLIENT.upload_object_from_file(get_abs_path('lib/gzrt-0.8.tar.gz'))

+ 0 - 0
dw_base/scheduler/__init__.py


+ 0 - 45
dw_base/scheduler/drop_daily_full_snapshot_tbls.py

@@ -1,45 +0,0 @@
-# 用于仅保留dwd 近7日和19700101 数据
-# daily_full_snapshot_tbls
-# task.daily_full_snapshot_tbls
-# 测试参数  -monitor_db test
-# 生产参数  -monitor_db task  可以不赋值
-import datetime
-import sys
-import re
-import os
-
-abspath = os.path.abspath(__file__)
-root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
-sys.path.append(root_path)
-from dw_base.spark.spark_sql import SparkSQL
-from dw_base.utils.config_utils import parse_args
-
-
-def main():
-    CONFIG, _ = parse_args(sys.argv[1:])
-    monitor_db = CONFIG.get('monitor_db', 'task')
-    spark = SparkSQL()
-
-    sql1 = (f"select db, tbl,days from {monitor_db}.daily_full_snapshot_tbls "
-            f"where is_deleted = '0'")
-    res = spark.query(sql1)[0].collect()
-    for record in res:
-        db = record['db']
-        tbl = record['tbl']
-        ds = record['days']
-        days_ago = datetime.datetime.now() - datetime.timedelta(days = ds)
-        format_date = days_ago.strftime('%Y%m%d')
-        sql2 = (f"SHOW PARTITIONS {db}.{tbl}")
-        partitions = spark.query(sql2)[0].collect()
-        dts = set()
-        for dt in partitions:
-            a1 = dt['partition'].split('=')[1][:8]
-            if a1 < format_date and a1 != '19700101' and a1 != '20200101':
-                dts.add(dt['partition'].split('=')[1][:8])
-        for p in dts:
-            sql3 = f" alter TABLE {db}.{tbl} DROP PARTITION ( dt='{p}') "
-            spark.query(sql3)[0].collect()
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 45
dw_base/scheduler/drop_partitions.py

@@ -1,45 +0,0 @@
-# 用于仅保留dwd 近7日和19700101 数据
-# 测试参数  -slect_db tmp -drop_db dwd_smp
-# 生产参数  -slect_db task -drop_db dwd
-import datetime
-import sys
-import re
-import os
-
-abspath = os.path.abspath(__file__)
-root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
-sys.path.append(root_path)
-from dw_base.spark.spark_sql import SparkSQL
-from dw_base.utils.config_utils import parse_args
-
-def main():
-    CONFIG, _ = parse_args(sys.argv[1:])
-    slect_db = CONFIG.get('slect_db')
-    drop_db = CONFIG.get('drop_db')
-    spark=SparkSQL()
-
-    seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=4)
-    format_date = seven_days_ago.strftime('%Y%m%d')
-
-    sql1 = (f"select mgdb, catalog from {slect_db}.mg_count_monitor "
-            f"where is_deleted = '0'")
-    res = spark.query(sql1)[0].collect()
-    extends_db=["global_bol"]
-    for record in res:
-        mgdb = record['mgdb']
-        catalog = record['catalog']
-        if mgdb not in extends_db:
-            sql2 = (f"SHOW PARTITIONS {drop_db}.cts_{mgdb}_{catalog}")
-            partitions = spark.query(sql2)[0].collect()
-            dts = []
-            for dt in partitions:
-                a1 = dt['partition'].split('=')[1]
-                if a1 < format_date and a1 != '19700101' and a1 != '20200101':
-                    dts.append(dt['partition'].split('=')[1])
-            for p in dts:
-                sql3 = f" alter TABLE {drop_db}.cts_{mgdb}_{catalog} DROP PARTITION ( dt='{p}') "
-                spark.query(sql3)[0].collect()
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 104
dw_base/scheduler/polling_scheduler.py

@@ -1,104 +0,0 @@
-# 海关数据项目t0清洗 - 轮询调度入口
-# 每十分钟调度一次,检查是否有新的任务需要执行
-# 每次中间会休眠5分钟来判断数据源是否还在持续更新中
-import sys
-import re
-import os
-
-abspath = os.path.abspath(__file__)
-root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
-sys.path.append(root_path)
-from dw_base.utils.log_utils import pretty_print
-from configparser import ConfigParser
-from datetime import time
-from time import sleep
-from pymongo import MongoClient
-from dw_base import *
-from dw_base.utils.config_utils import parse_args
-from dw_base.utils.datetime_utils import date_to_timestamp
-from bson.objectid import ObjectId
-
-
-def get_mongo_client(conf_path):
-    config_parser = ConfigParser()
-    config_parser.read(root_path + conf_path)
-    url = config_parser.get('base', 'address')
-    return MongoClient(url)
-
-
-def get_count(client, mgdb, mgtbl, start_date, stop_date):
-    db = client[mgdb]
-    # 连接集合
-    collection = db[mgtbl]
-    # 根据查询条件查询数据条数
-    start_dt_str = hex(int(date_to_timestamp(start_date)))[2:] + '0000000000000000'
-    stop_dt_str = hex(int(date_to_timestamp(stop_date)))[2:] + '0000000000000000'
-    query = {'_id': {'$gte': ObjectId(start_dt_str), '$lt': ObjectId(stop_dt_str)}}
-    return collection.count(query)
-
-
-def get_source_count(mgdb, mgtbl, start_date, stop_date):
-    client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-old.ini')
-    result = get_count(client, mgdb, mgtbl, start_date, stop_date)
-    pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                 f'{NORM_MGT} source mongo: {NORM_GRN}{mgdb}.{mgtbl} '
-                 f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}')
-    return result
-
-
-def get_sink_count(mgdb, mgtbl, start_date, stop_date):
-    # if mgdb != 'america':
-    #     client = get_mongo_client('/../datasource/mongo/mongo-cts-prod-new.ini')
-    # if mgdb == 'america':
-    #
-    client = get_mongo_client('/../datasource/mongo/mongo-cluster-cts-prod.ini')
-    result = get_count(client, mgdb, mgtbl, start_date, stop_date)
-    pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                 f'{NORM_MGT} sink mongo: {NORM_GRN}{mgdb}.{mgtbl} '
-                 f'{NORM_MGT}{start_date}-{stop_date} data count: {NORM_GRN}{result}')
-    return result
-
-
-def is_run(mgdb, mgtbl, start_date, stop_date):
-    first_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date)
-    if first_source_count == 0:
-        pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                     f'{NORM_MGT} source mongo data count is zero, exit! ')
-        return False
-    else:
-        # 等待五分钟
-        pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                     f'{NORM_MGT} now sleep 5 minutes to check source mongo data count again! ')
-        sleep(60 * 10)
-        second_source_count = get_source_count(mgdb, mgtbl, start_date, stop_date)
-        if first_source_count != second_source_count:
-            pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                         f'{NORM_MGT} source mongo data count is increasing, exit! ')
-            return False
-        else:
-            sink_count = get_sink_count(mgdb, mgtbl, start_date, stop_date)
-            if sink_count == second_source_count:
-                pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                             f'{NORM_MGT} source mongo data count is equal to sink mongo data count, exit! ')
-                return False
-            else:
-                pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                             f'{NORM_MGT} source mongo data count is not equal to sink mongo data count, start! ')
-                return True
-
-
-if __name__ == '__main__':
-    CONFIG, _ = parse_args(sys.argv[1:])
-    start_date = CONFIG.get('start-date')
-    stop_date = CONFIG.get('stop-date')
-    mgdb = CONFIG.get('mgdb')
-    mgtbl = CONFIG.get('mgtbl')
-
-    if is_run(mgdb, mgtbl, start_date, stop_date):
-        pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                     f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 1 ')
-        print('${setValue(is_run=%s)}' % '1')
-    else:
-        pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                     f'{NORM_MGT}向后传递参数: {NORM_GRN}is_run => 0 ')
-        print('${setValue(is_run=%s)}' % '0')

+ 0 - 100
dw_base/utils/data_distinct.py

@@ -1,100 +0,0 @@
-# 用来解析字段
-#
-# -tbl cts_turkey_im
-
-import sys
-import re
-import os
-
-abspath = os.path.abspath(__file__)
-root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
-sys.path.append(root_path)
-from dw_base.spark.spark_sql import SparkSQL
-from dw_base.utils.log_utils import pretty_print
-from datetime import time
-from dw_base import *
-from dw_base.utils.config_utils import parse_args
-
-NORM_RED: str = '\033[0;31m'
-NORM_GRN: str = '\033[0;32m'
-NORM_YEL: str = '\033[0;33m'
-NORM_BLU: str = '\033[0;34m'
-NORM_MGT: str = '\033[0;35m'
-NORM_CYN: str = '\033[0;36m'
-
-
-def get_cols_with_type(dt: str):
-    global_output = {}
-    spark = SparkSQL()
-    spark._final_spark_config = {'hive.exec.dynamic.partition': 'true',
-                                 'hive.exec.dynamic.partition.mode': 'nonstrict',
-                                 'spark.yarn.queue': 'cts',
-                                 'spark.sql.crossJoin.enabled': 'true',
-                                 'spark.executor.memory': '6g',
-                                 'spark.executor.memoryOverhead': '2024',
-                                 'spark.driver.memory': '4g',
-                                 'spark.executor.instances': "10",
-                                 'spark.executor.cores': '4'
-                                 }
-    # sql = (f"select concat('cts_',mgdb,'_',catalog) as  `tbl` from task.mg_count_monitor "
-    #        f"where is_deleted = '0'")
-    # tbls = spark.query(sql)[0].collect()
-    # for tbl2 in tbls:
-    #     tbl = tbl2['tbl']
-    tbls = [
-        'cts_argentina_im',
-        ]
-    for tbl in tbls:
-        sql = f'SHOW CREATE TABLE from_mongo.{tbl}_incr'
-        if tbl == "cts_sao_tome_and_principe_im":
-            sql = f"SHOW CREATE TABLE from_mongo.cts_stp_im_incr"
-        if tbl == "cts_sao_tome_and_principe_ex":
-            sql = f"SHOW CREATE TABLE from_mongo.cts_stp_ex_incr"
-        ctbl = spark.query(sql)[0].collect()[0]['createtab_stmt']
-        cols_with_type = re.findall(r'`([^`]+)` ([A-Z]+)', ctbl)
-        if cols_with_type[0][0] != 'id':
-            exit('请检查表结构,id列必须是第一个字段')
-
-        fields = {tup[0] for tup in cols_with_type if tup[0] != 'id'}
-        # 对每个字段名加上反引号
-        quoted_fields = [f"`{field}`" for field in fields]
-        group_str = ', '.join(quoted_fields)
-        query0 = (f" select count(1) as `all_cnt`,'all_cnt2' from dwd.{tbl} where dt in('19700101','{dt}') "
-                  f"union all select count(1) as `unique_cnt`,'dwd.{tbl}' as `tbl` "
-                  f"from ( select count(1) as `cnt` FROM (select {group_str} FROM dwd.{tbl} where dt in('19700101','{dt}')) t2  "
-                  f"GROUP BY {group_str} ) b ")
-        cnt = spark.query(query0)[0].collect()
-        all_cnt = cnt[0].all_cnt
-        unique_cnt = cnt[1].all_cnt
-        duplicate_cnt = all_cnt - unique_cnt
-        #
-        # print(all_cnt )
-        # print( unique_cnt)
-        # print(unique_cnt/all_cnt)
-        cts_dict = {
-            "country": tbl,
-            "all_cnt": all_cnt,
-            "group_cnt": unique_cnt,
-            "duplicate_cnt": duplicate_cnt,
-            "duplicate_ratio": (all_cnt - unique_cnt) / all_cnt
-        }
-
-        duplicate_ratio = (all_cnt - unique_cnt) / all_cnt
-        # queryend = (f" insert into table tmp.cts_distinct"
-        #             f" values('{tbl}','{all_cnt}','{unique_cnt}','{duplicate_cnt}','{duplicate_ratio}') ")
-        # spark.query(queryend)[0].collect()
-        # print(cts_dict)
-        global_output[tbl] = cts_dict
-        pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                     f'{NORM_MGT} 大数据dwd表名: {NORM_GRN}  {tbl} '
-                     f'{NORM_MGT} 本表的指标  {cts_dict}'
-                     )
-
-    pretty_print(f'{NORM_CYN}{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} '
-                 f'{NORM_MGT} 大数据dwd表名: {NORM_GRN}  {tbl} '
-                 f'{NORM_MGT} 这是最终的  {global_output}'
-                 )
-
-
-if __name__ == '__main__':
-    get_cols_with_type('20240812')

+ 0 - 118
dw_base/utils/diff_utils.py

@@ -1,118 +0,0 @@
-import json
-import re
-import os
-
-
-def ensure_json_values(bson_str):
-    def replace_special_types(match):
-        prefix = match.group(1)
-        key = match.group(2)
-        value = match.group(3).replace('"', "")
-        suffix = match.group(4)
-        comma_or_newline = match.group(5)
-        return f'{prefix}{key}{value}{suffix}{comma_or_newline}'
-
-    patterns = {
-        'normal': re.compile(r'([ \t]*)(\"[^\"]*\"\s*:\s*)([^\",\{\}\[\]\s].*?)([,\n])'),
-        'special_types': re.compile(
-            r'([ \t]*)(\"(?:ObjectId|ISODate|NumberInt|NumberLong|NumberDecimal|Binary|Boolean|Timestamp|RegExp|DBRef|JavaScript code|Symbol|MinKey|MaxKey)\()\"([^\"\)]*)\"(\).+?)([,\n])')
-    }
-
-    # 使用正则表达式替换非特殊类型的值
-    bson_str = re.sub(patterns['normal'],
-                      lambda match: f'{match.group(1)}{match.group(2)}"{match.group(3)}"{match.group(4)}', bson_str)
-    # 使用定义的函数替换特殊类型的值
-    bson_str = re.sub(patterns['special_types'], replace_special_types, bson_str)
-    return bson_str
-
-
-def analyze_json_objects(file_path):
-    json_objects = []
-    with open(file_path, 'r') as file:
-        content = ensure_json_values(file.read())
-        json_strings = re.split(r'\n\s*\n', content.strip())
-        for js in json_strings:
-            if js:
-                try:
-                    json_obj = json.loads(js)
-                    json_objects.append(json_obj)
-                except json.JSONDecodeError:
-                    print(f'Error decoding JSON from string: {js}')
-
-    id_to_json_map = {}
-    for jsonObj in json_objects:
-        if isinstance(jsonObj, dict):
-            json_id = jsonObj.get('_id', None)
-            if json_id is not None:
-                if json_id in id_to_json_map:
-                    id_to_json_map[json_id].append(jsonObj)
-                else:
-                    id_to_json_map[json_id] = [jsonObj]
-
-    return id_to_json_map
-
-
-def compare_json_objects(id_to_json_map):
-    output_lines = []
-    for json_id, json_list in id_to_json_map.items():
-        if len(json_list) != 2:
-            continue
-
-        first, second = json_list
-        first_set = set(first.keys())
-        second_set = set(second.keys())
-        intersect_keys = first_set & second_set
-        unique_first = first_set - second_set
-        unique_second = second_set - first_set
-        output_lines.append(f"比较的MongoId: {json_id}")
-
-        if unique_first:
-            output_lines.append("新表中独有的:")
-            for key in unique_first:
-                output_lines.append(f"                          {key}: {first.get(key)}")
-
-        if unique_second:
-            output_lines.append("旧表中独有的:")
-            for key in unique_second:
-                output_lines.append(f"                          {key}: {second.get(key)}")
-
-        differing_values = []
-        for key in intersect_keys:
-            if first.get(key) != second.get(key):
-                differing_values.append(f"                          {key}: {first.get(key)} vs {second.get(key)}")
-        if differing_values:
-            output_lines.append("两表value不同的:")
-            output_lines.extend(differing_values)
-
-        output_lines.append("")
-
-        current_path = os.getcwd()
-        target_folder = 'tendata-warehouse'
-        base_path = get_base_path(current_path, target_folder)
-        # 可以指定自己的输出路径
-        file_path = os.path.join(base_path, "workspace", "output.txt")
-
-    with open(file_path, 'w') as output_file:
-        output_file.write('\n'.join(output_lines))
-        print("可在服务器查看{}".format(file_path))
-
-
-def get_base_path(current_path, target_folder):
-    path_parts = current_path.split(os.sep)
-    target_index = path_parts.index(target_folder)
-    base_path = os.sep.join(path_parts[:target_index + 1])
-    return base_path
-
-
-def main():
-    current_path = os.getcwd()
-    target_folder = 'tendata-warehouse'
-    base_path = get_base_path(current_path, target_folder)
-    relative_path = input("请输入比较文件的相对路径 Path From Content Root: ")
-    file_path = os.path.join(base_path, relative_path)
-    id_to_json_map = analyze_json_objects(file_path)
-    compare_json_objects(id_to_json_map)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 132
dw_base/utils/excel_to_hive_utils.py

@@ -1,132 +0,0 @@
-import os
-
-import pandas as pd
-import re
-
-from pypinyin import lazy_pinyin
-from pyspark.sql import SparkSession
-
-# 对列类型进行特殊转换
-# dtype={'注册号': str}
-dtype=None
-# dtype={'sygj': str,'hgbm_source': str}
-
-# 对hive表结构进行自定义指定
-# hive_table_schema = "fj:string,hgbm:string,sygj:string,hgbm_source:string,hgbmms_en:string,hgbmms_cn:string,cphy_en:string,cphy_en1:string,cphy_cn:string,cpdl_cn:string,cpgg_en:string,cpgg_cn:string"
-hive_table_schema = None
-
-class Excel2HiveUtil:
-    def __init__(self):
-        base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-        self.BASE_DIR = base_dir
-
-    def create_hive_table(self,spark_session,db:str,table_name:str,sheet_name:str,columns:list):
-        table_name = f"{self.chinese_to_pinyin(table_name,True)}_{self.chinese_to_pinyin(str(sheet_name),False).lower()}"
-        # columns = [chinese_to_pinyin(col) for col in columns]
-        table_name = re.sub("[^a-zA-Z0-9_]","",table_name)
-        columns = [f"`{re.sub('[^a-zA-Z0-9_]', '',self.chinese_to_pinyin(col,False))}` string COMMENT '{col}'" for col in columns]
-        col_str=",".join(columns)
-        create_tbl_sql = f"CREATE TABLE IF NOT EXISTS {db}.{table_name} ({col_str}) STORED AS ORC"
-        print("建表语句为:",create_tbl_sql)
-        is_right = input("请检查建表语句,确认是否继续执行(y or n):")
-        if is_right.strip().upper() == 'Y':
-            pass
-        elif is_right.strip().upper() == 'N':
-            exit(1)
-        else:
-            print("输入不满足要求,结束任务")
-            exit(1)
-        # spark_session_name="task_"+db+"_"+table_name
-        spark_session.sql(create_tbl_sql)
-        return f"{db}.{table_name}"
-
-    def chinese_to_pinyin(self,text,is_filter_digit:bool):
-        if is_filter_digit:
-            # 调用lazy_pinyin函数获取每个汉字对应的拼音列表
-            pinyins = [py for py in filter(str.isalpha, lazy_pinyin(text))]
-        else:
-            # 调用lazy_pinyin函数获取每个汉字对应的拼音列表
-            pinyins = [re.sub(r'(?<=^)[^a-zA-Z0-9]+|[^a-zA-Z0-9]+(?=$)', '', py) for py in lazy_pinyin(text)]
-        return '_'.join(pinyins)
-
-
-
-    def run(self,excel_position:str,hive_db:str='tmp',sheet_name_list=[0]):
-        spark = SparkSession.builder \
-            .appName("ExcelToHive") \
-            .master("yarn") \
-            .config('hive.exec.orc.default.block.size', 134217728) \
-            .config('spark.debug.maxToStringFields', 5000) \
-            .config('spark.dynamicAllocation.enabled', False) \
-            .config('spark.files.ignoreCorruptFiles', True) \
-            .config('spark.sql.adaptive.enabled', 'true') \
-            .config('spark.sql.broadcastTimeout', -1) \
-            .config('spark.sql.codegen.wholeStage', 'false') \
-            .config('spark.sql.execution.arrow.enabled', True) \
-            .config('spark.sql.execution.arrow.fallback.enabled', True) \
-            .config('spark.sql.files.ignoreCorruptFiles', True) \
-            .config('spark.sql.statistics.fallBackToHdfs', True) \
-            .config('spark.yarn.queue', "default") \
-            .enableHiveSupport().getOrCreate()
-        if excel_position.startswith('/'):
-            full_path = excel_position
-        else:
-            full_path = os.path.join(self.BASE_DIR, excel_position)
-
-        file_name = os.path.basename(full_path).replace('_', '').split('.')[0]
-        if sheet_name_list is None:
-            # 读取Excel文件
-            excel_data_dict = pd.read_excel(full_path, sheet_name=None,dtype=dtype)
-            try:
-                for sheet,excel_data in excel_data_dict.items():
-                    excel_data.fillna(value='', inplace=True)
-                    db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet, excel_data.columns)
-                    # 将pandas DataFrame转换为Spark DataFrame
-                    spark_df = spark.createDataFrame(excel_data, schema=hive_table_schema)
-                    # print(spark_df.schema)
-                    # spark_df.select("*").show()
-                    spark_df.write.insertInto(db_tbl, True)
-            except Exception as e:
-                print(e)
-                exit(1)
-
-        else:
-            for sheet in sheet_name_list:
-                try:
-                    # 读取Excel文件
-                    excel_data = pd.read_excel(full_path, sheet_name=sheet,dtype=dtype)
-                except Exception as e:
-                    print(e)
-                else:
-                    excel_data.fillna(value='',inplace=True)
-                    # print(excel_data)
-                    db_tbl = self.create_hive_table(spark, hive_db, file_name, sheet,excel_data.columns)
-                    # 将pandas DataFrame转换为Spark DataFrame
-                    spark_df = spark.createDataFrame(excel_data,schema=hive_table_schema)
-                    # print(spark_df.schema)
-                    # spark_df.select("*").show()
-                    spark_df.write.insertInto(db_tbl, True)
-        spark.stop()
-
-
-
-if __name__ == '__main__':
-    excel_position_input = input("请输入你要转换的表相对路径或绝对路径:")
-    excel_position = excel_position_input.strip()
-    hive_db_input = input("请输入你要插入的hive库名,默认为[tmp]库:")
-    hive_db = hive_db_input.strip()
-    if not hive_db:
-        hive_db = 'tmp'
-    sheet_list_input = input("请输入你要转换的sheet列表(以英文逗号分隔),如果转换所有sheet请输入None,不输入默认只转换第一张sheet: ")
-    if sheet_list_input.strip() == 'None':
-        Excel2HiveUtil().run(excel_position,hive_db, None)
-    elif sheet_list_input.strip() == '':
-        Excel2HiveUtil().run(excel_position,hive_db)
-    else:
-        sheet_list = [i.strip() for i in sheet_list_input.split(",")]
-        Excel2HiveUtil().run(excel_position,hive_db, sheet_list)
-
-
-
-
-

+ 0 - 231
dw_base/utils/hive_diff_database.py

@@ -1,231 +0,0 @@
-"""
-Hive 表比较与同步脚本
-功能:
-1. 比较两个 Hive 集群中指定数据库的表结构,找出新集群中缺失的表。
-2. 支持排除指定前缀或后缀的表。
-3. 支持自动在新集群中创建缺失的表。
-4. 支持在新集群中执行 `MSCK REPAIR TABLE` 修复分区元数据。
-使用方法:
-1.运行脚本:
-   python script.py -d <database> [-p <exclude_prefixes>] [-s <exclude_suffixes>] [-a <auto_create>]
-参数说明:
-   -d, --database: 数据库名称(必填)。
-   -p, --exclude_prefixes: 需要排除的表名前缀列表,用逗号分隔(例如:tmp_,bak_)。
-   -s, --exclude_suffixes: 需要排除的表名后缀列表,用逗号分隔(例如:_old,_backup)。
-   -a, --auto_create: 是否自动创建缺失的表(True/False,默认为 False)。
-示例:
-   1. 比较数据库 ods 的表结构:
-      python script.py -d ods
-   2. 排除前缀 tmp_,bak_ 和后缀 _old,_backup:
-      python script.py -d ods -p tmp_,bak_ -s _old,_backup
-   3. 自动创建缺失的表:
-      python script.py -d ods -a True
-   4. 排除前缀和后缀并自动创建缺失的表:
-      python script.py -d ods -p tmp_,bak_ -s _old,_backup -a True
-"""
-
-from pyhive import hive
-import argparse
-
-# 定义连接Hive的函数
-def get_hive_tables(host, port, database):
-    """
-    连接Hive数据库并获取指定数据库中的所有表名
-    :param host: Hive服务器地址
-    :param port: Hive服务器端口
-    :param database: 数据库名称
-    :return: 表名列表
-    """
-    connection = hive.Connection(host=host, port=port, database=database)
-    connection = hive.Connection(host=host, port=port, database=database, username="hive")
-    cursor = connection.cursor()
-    cursor.execute('SHOW TABLES')
-    tables = cursor.fetchall()
-    cursor.close()
-    connection.close()
-    return [table[0] for table in tables]
-
-
-def filter_tables(tables, exclude_prefixes=None, exclude_suffixes=None):
-    """
-    过滤表名,排除指定的前缀和后缀
-    :param tables: 表名列表
-    :param exclude_prefixes: 需要排除的前缀列表
-    :param exclude_suffixes: 需要排除的后缀列表
-    :return: 过滤后的表名列表
-    """
-    if exclude_prefixes is None:
-        exclude_prefixes = []
-    if exclude_suffixes is None:
-        exclude_suffixes = []
-
-    filtered_tables = []
-    for table in tables:
-        # 检查表名是否以指定的前缀开头
-        if any(table.startswith(prefix) for prefix in exclude_prefixes):
-            continue
-        # 检查表名是否以指定的后缀结尾
-        if any(table.endswith(suffix) for suffix in exclude_suffixes):
-            continue
-        filtered_tables.append(table)
-    return filtered_tables
-
-
-def get_create_table_sql(host, port, database, table_name):
-    """
-    获取指定表的完整建表语句
-    :param host: Hive服务器地址
-    :param port: Hive服务器端口
-    :param database: 数据库名称
-    :param table_name: 表名
-    :return: 完整的建表语句
-    """
-    connection = hive.Connection(host=host, port=port, database=database)
-    connection = hive.Connection(host=host, port=port, database=database, username="hive")
-    cursor = connection.cursor()
-    cursor.execute(f'SHOW CREATE TABLE {database}.{table_name}')
-
-    # 逐行读取建表语句
-    create_table_sql = ""
-    for row in cursor:
-        create_table_sql += row[0] + "\n"
-
-    cursor.close()
-    connection.close()
-    return create_table_sql.strip()  # 去除末尾的换行符
-
-
-def execute_sql(host, port, database, sql):
-    """
-    在指定的Hive服务器上执行SQL语句
-    :param host: Hive服务器地址
-    :param port: Hive服务器端口
-    :param database: 数据库名称
-    :param sql: 要执行的SQL语句
-    """
-    connection = hive.Connection(host=host, port=port, database=database)
-    connection = hive.Connection(host=host, port=port, database=database, username="hive")
-    cursor = connection.cursor()
-    try:
-        cursor.execute(sql)
-        print(f"执行成功: {sql}")
-    except Exception as e:
-        print(f"执行失败: {sql}\n错误信息: {e}")
-    finally:
-        cursor.close()
-        connection.close()
-
-
-def compare_hive_tables(database, exclude_prefixes=None, exclude_suffixes=None, auto_create=False):
-    """
-    比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句
-    :param database: 数据库名称
-    :param exclude_prefixes: 需要排除的前缀列表
-    :param exclude_suffixes: 需要排除的后缀列表
-    :param auto_create: 是否自动在new_host上创建缺失的表
-    """
-    # Hive连接信息
-    old_host = '192.168.30.3'
-    new_host = '192.168.30.23'
-    port = 10000
-
-    # 获取两个数据库的表名
-    old_tables = get_hive_tables(old_host, port, database)
-    new_tables = get_hive_tables(new_host, port, database)
-
-    # 过滤表名
-    old_tables_filtered = filter_tables(old_tables, exclude_prefixes, exclude_suffixes)
-    new_tables_filtered = filter_tables(new_tables, exclude_prefixes, exclude_suffixes)
-
-    # 将表名转换为集合
-    set_tables_old = set(old_tables_filtered)
-    set_tables_new = set(new_tables_filtered)
-
-    # 比较表结构并重新建表
-    # common_tables = set_tables_old.intersection(set_tables_new)
-    # if common_tables:
-    #     print("\n检查表结构并重新建表(如果结构不同):")
-    #     for table_name in common_tables:
-    #         try:
-    #             old_table_sql = get_create_table_sql(old_host, port, database, table_name)
-    #             new_table_sql = get_create_table_sql(new_host, port, database, table_name)
-    #             old_table_sql_c = old_table_sql.split("TBLPROPERTIES")[0]
-    #             new_table_sql_c = new_table_sql.split("TBLPROPERTIES")[0]
-    #             if old_table_sql_c != new_table_sql_c:
-    #                 print(f"表结构不同,需要重新建表:{table_name}")
-    #                 if auto_create:
-    #                     # 删除旧表
-    #                     drop_table_sql = f"DROP TABLE IF EXISTS {database}.{table_name}"
-    #                     execute_sql(new_host, port, database, drop_table_sql)
-    #                     # 创建新表
-    #                     execute_sql(new_host, port, database, old_table_sql)
-    #                     print(f"表 {table_name} 重新创建完成。")
-    #             else:
-    #                 print(f"表结构相同,无需重新建表:{table_name}")
-    #         except Exception as e:
-    #             print(f"无法比较表 {table_name} 的结构: {e}")
-    #
-    # 找出缺失的表名
-    missing_in_new = set_tables_old - set_tables_new
-
-    # 打印缺失的表名
-    print(f"新集群hive-({database})缺失的表: {missing_in_new}")
-
-    # 获取并打印缺失表的完整建表语句
-    if missing_in_new:
-        print("\n缺失表的建表语句:")
-        for table_name in missing_in_new:
-            try:
-                create_table_sql = get_create_table_sql(old_host, port, database, table_name)
-                print(f"缺失的表:{table_name}=========================================")
-                print(create_table_sql)
-
-                # 如果启用自动建表功能,则在new_host上执行建表语句
-                if auto_create:
-                    execute_sql(new_host, port, database, create_table_sql)
-            except Exception as e:
-                print(f"无法获取表 {table_name} 的建表语句: {e}")
-
-    # 在new_host上执行MSCK REPAIR TABLE
-    if auto_create:
-        print("\n在new_host上执行MSCK REPAIR TABLE:")
-        for table_name in old_tables:
-            try:
-                msck_sql = f"MSCK REPAIR TABLE {database}.{table_name}"
-                print("执行MSCK REPAIR TABLE修复表:",msck_sql)
-                execute_sql(new_host, port, database, msck_sql)
-            except Exception as e:
-                print(f"执行MSCK REPAIR TABLE失败: {database}.{table_name}\n错误信息: {e}")
-
-
-# 主程序
-if __name__ == "__main__":
-    # 使用 argparse 解析命令行参数
-    parser = argparse.ArgumentParser(
-        description="比较两个Hive数据库中的表名,找出缺失的表,并输出缺失表的完整建表语句, 最后开始修复数据")
-    parser.add_argument('-database', '-d', type=str, required=True, help="数据库名称")
-    parser.add_argument('-exclude_prefixes', '-p', type=str, default="", help="需要排除的前缀列表,用逗号分隔")
-    parser.add_argument('-exclude_suffixes', '-s', type=str, default="", help="需要排除的后缀列表,用逗号分隔")
-    parser.add_argument('-auto_create', '-a', type=str, default="False",
-                        help="是否自动在new_host上创建缺失的表 (True/False)")
-
-    # 解析参数
-    args = parser.parse_args()
-
-    # 处理 exclude_prefixes 和 exclude_suffixes
-    exclude_prefixes = args.exclude_prefixes.split(",") if args.exclude_prefixes else []
-    exclude_suffixes = args.exclude_suffixes.split(",") if args.exclude_suffixes else []
-    print(f"exclude_prefixes: {exclude_prefixes}, exclude_suffixes: {exclude_suffixes}")
-
-    # 处理 auto_create
-    auto_create = args.auto_create.lower() == "true"
-
-    # 调用比较函数
-    compare_hive_tables(
-        database=args.database,
-        exclude_prefixes=exclude_prefixes,
-        exclude_suffixes=exclude_suffixes,
-        auto_create=auto_create
-    )
-    print("程序执行完成!")
-

+ 0 - 116
dw_base/utils/hive_to_excel_utils.py

@@ -1,116 +0,0 @@
-import inspect
-import string
-
-import os
-import re
-import sys
-
-abspath = os.path.abspath(__file__)
-root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
-sys.path.append(root_path)
-
-from importlib import import_module
-from pyspark.sql import SparkSession
-from dw_base.spark.udf.spark_common_udf import *
-
-def read_file(file_path:str)->(list,str):
-    module_list = []
-    sql_list = []
-    with open(file_path, 'r') as file:
-        for line in file:
-            if line.strip().startswith("ADD FILE"):
-                module = line.strip()[len("ADD FILE"):].split('.')[0].replace("/",".")
-                print(f'将要导入的模块为: {module}')
-                module_list.append(module.strip())
-            elif line.strip().startswith("--"):
-                continue
-            else:
-                sql_list.append(line)
-    sql = " ".join(sql_list)
-    return module_list,sql
-
-def register_udf(spark:SparkSession,udf_files:list):
-    for f in udf_files:
-        module = import_module(f)
-        for name,value in inspect.getmembers(module):
-            if inspect.isfunction(value):
-                print(f'registing udf --> name is :{name} , value is :{value}')
-                # spark.sparkContext.addPyFile("dw_base/spark/udf/spark_json_array_udf.py")
-                spark.udf.register(name,value)
-
-
-def generate_random_string(length):
-    # 生成包含大小写字母和数字的字符集
-    characters = string.ascii_letters + string.digits
-    # 生成指定长度的随机字符串
-    random_string = ''.join(random.choice(characters) for i in range(length))
-    return random_string
-
-
-class Hive2ExcelUtil:
-    def __init__(self):
-        base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-        self.BASE_DIR = os.path.dirname(base_dir)
-        self.spark = SparkSession.builder \
-        .appName("HiveToExcel") \
-        .master("local[4]") \
-        .config('hive.exec.orc.default.block.size', 134217728) \
-        .config('spark.debug.maxToStringFields', 5000) \
-        .config('spark.dynamicAllocation.enabled', False) \
-        .config('spark.files.ignoreCorruptFiles', True) \
-        .config('spark.sql.adaptive.enabled', 'true') \
-        .config('spark.sql.broadcastTimeout', -1) \
-        .config('spark.sql.codegen.wholeStage', 'false') \
-        .config('spark.sql.execution.arrow.enabled', True) \
-        .config('spark.sql.execution.arrow.fallback.enabled', True) \
-        .config('spark.sql.files.ignoreCorruptFiles', True) \
-        .config('spark.sql.statistics.fallBackToHdfs', True) \
-        .config('spark.sql.execution.arrow.enabled', False) \
-        .config('spark.yarn.queue', "default") \
-        .enableHiveSupport().getOrCreate()
-
-    def transfer(self,excel_position: str, sql_file_position: str):
-        excel_full_path = excel_position
-        sql_full_path = sql_file_position
-        if not excel_position.startswith("/"):
-            excel_full_path = os.path.join(self.BASE_DIR, excel_position)
-        print(f'生成的excel文件位置在: {excel_full_path}')
-        if not sql_file_position.startswith("/"):
-            sql_full_path = os.path.join(self.BASE_DIR, sql_file_position)
-        print(f'生成的excel文件位置在: {sql_full_path}')
-        module_list,sql = read_file(sql_full_path)
-        print(f"正在执行的sql为: {sql}")
-        register_udf(self.spark,module_list)
-        pd_df = self.spark.sql(sql).toPandas()
-        pd_df.to_excel(excel_full_path, index=False)
-
-
-if __name__ == '__main__':
-    sql_file_position_input = input("请输入你的sql文件相对路径或绝对路径:")
-    sql_file_position = sql_file_position_input.strip()
-
-
-    excel_position_input = input("请输入你要生成的excel文件的相对路径或绝对路径:")
-    excel_position = excel_position_input.strip()
-    if excel_position.endswith("/"):
-        excel_position = f'{excel_position}{generate_random_string(10)}_output.xlsx'
-    else:
-        excel_path_name = excel_position.split(".")[0]
-        excel_position = f'{excel_path_name}.xlsx'
-
-    Hive2ExcelUtil().transfer(excel_position,sql_file_position)
-    print("=================transfer completed!=======================")
-
-
-
-
-
-
-
-
-
-
-
-
-
-

+ 0 - 53
dw_base/utils/pdt_check_table.py

@@ -1,53 +0,0 @@
-import os
-import re
-import sys
-
-abspath = os.path.abspath(__file__)
-root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
-sys.path.append(root_path)
-from pyspark.sql import SparkSession
-
-"""
-产品库工具:检测表当日是否存在新增数据
-参数1:表名table_name
-参数2:分区日期dt
-输出:用于DS调度检测
-"""
-
-
-def check_data(table_name, date):
-    # 创建 SparkSession
-    spark = SparkSession.builder \
-        .appName(f"check_{table_name}_{date}_data") \
-        .config("spark.driver.memory", "1g") \
-        .config("spark.executor.memory", "2g") \
-        .config("spark.executor.instances", "2") \
-        .config("spark.executor.cores", "2") \
-        .config("spark.executor.memoryOverhead", "512") \
-        .config("hive.exec.dynamic.partition", "true") \
-        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
-        .config("spark.yarn.queue", "pdt") \
-        .enableHiveSupport() \
-        .getOrCreate()
-
-    # 执行查询
-    query = f"SELECT COUNT(1) AS count FROM {table_name} WHERE dt = '{date}'"
-    result = spark.sql(query).collect()[0]['count']
-
-    # 关闭 SparkSession
-    spark.stop()
-    return result > 0
-
-
-if __name__ == "__main__":
-    if len(sys.argv) != 3:
-        print("Usage: python check_hive_data.py <table_name> <date>")
-        sys.exit(1)
-
-    table_name = sys.argv[1]
-    date = sys.argv[2]
-
-    if check_data(table_name, date):
-        print('${setValue(is_run=%s)}' % 'true')
-    else:
-        print('${setValue(is_run=%s)}' % 'false')

+ 0 - 76
dw_base/utils/pdt_check_table_multis.py

@@ -1,76 +0,0 @@
-import argparse
-import os
-import re
-import sys
-
-abspath = os.path.abspath(__file__)
-root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
-sys.path.append(root_path)
-from pyspark.sql import SparkSession
-
-"""
-产品库工具:检测表当日是否存在新增数据
-参数1:表名table_name
-参数2:分区日期dt
-输出:用于DS调度检测
-"""
-
-
-def check_data(tables, date, country):
-    # 创建 SparkSession
-    spark = SparkSession.builder \
-        .appName(f"check_{tables}_data") \
-        .config("spark.driver.memory", "1g") \
-        .config("spark.executor.memory", "2g") \
-        .config("spark.executor.instances", "2") \
-        .config("spark.executor.cores", "2") \
-        .config("spark.executor.memoryOverhead", "512") \
-        .config("hive.exec.dynamic.partition", "true") \
-        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
-        .config("spark.yarn.queue", "pdt") \
-        .enableHiveSupport() \
-        .getOrCreate()
-
-    # 执行查询
-    result = 0
-    for table in tables:
-        if date is not None and country is not None:
-            query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}' and country = '{country}'"
-        elif date is not None and country is None:
-            query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}'"
-        elif date is None and country is not None:
-            query = f"SELECT COUNT(1) AS count FROM {table} WHERE country = '{country}'"
-        else:
-            query = f"SELECT COUNT(1) AS count FROM {table}"
-        print(query)
-        result = spark.sql(query).collect()[0]['count']
-        if result > 0:
-            break
-
-    # 关闭 SparkSession
-    spark.stop()
-    return result > 0
-
-
-def parse_arguments():
-    parser = argparse.ArgumentParser(description="Check Table Multiple")
-
-    parser.add_argument('--tables', type=str, nargs='+', required=True, help='table names')
-    parser.add_argument('--dt', type=str, required=False, help='table partition')
-    parser.add_argument('--country', type=str, required=False, help='table country')
-
-    # 解析命令行参数
-    return parser.parse_args()
-
-
-if __name__ == "__main__":
-    args = parse_arguments()
-    print(f"任务参数:{args}")
-    table_names = args.tables
-    date = args.dt
-    country = args.country
-
-    if check_data(table_names, date, country):
-        print('${setValue(is_run=%s)}' % 'true')
-    else:
-        print('${setValue(is_run=%s)}' % 'false')

+ 2 - 3
kb/90-重构路线.md

@@ -491,7 +491,6 @@ sql = "... WHERE TABLE_SCHEMA='%s' ..." % (database, table_name)
 | `dw_base/ml/a.py` | 空文件 | 删除 |
 | `dw_base/flink/__init__.py` | 空文件 | 删除(除非计划使用 Flink) |
 | `dw_base/elasticsearch/__init__.py` | 空文件 | 删除 |
-| `dw_base/oss/oss2_util.py` | 使用场景不明 | 确认后决定保留或删除 |
 | `dw_base/database/mongodb_utils.py` | 约 80% 是注释掉的旧代码 | 清理注释 |
 | `conf/datax/` 下全部内容 | 已废弃的旧配置 | 保留少量样例,其余删除 |
 
@@ -547,9 +546,9 @@ tests/
 
 **后续事项**:
 
-- LAZY 类依赖关联的老代码:`get_oldmongo_*` / `mg2es/` / `ent_interface_dingtalk*` 已于 2026-04-20 提前清理(见 92-进度 变更记录);剩余 `customs/similarity.py`、`dw_base/oss/oss2_util.py`、`dw_base/utils/excel_to_hive_utils.py` 等在阶段 4 / 阶段 5 一并清理
+- LAZY 类依赖关联的老代码:`get_oldmongo_*` / `mg2es/` / `ent_interface_dingtalk*` 于 2026-04-20 第一批提前清理;同日第二批清理 `dw_base/oss/` 整目录、`dw_base/scheduler/` 整目录(含 polling_scheduler / drop_partitions / drop_daily_full_snapshot_tbls)、`dw_base/hive/` 整目录、`dw_base/utils/` 7 文件(data_distinct / diff_utils / excel_to_hive_utils / hive_diff_database / hive_to_excel_utils / pdt_check_table\*);剩余 `customs/similarity.py` 等在阶段 4 / 阶段 5 一并清理
 - 不需要 `requirements-base.txt` / `requirements-dev.txt` 分文件——当前依赖规模下单文件已经足够
-- pyspark 2.4.0 暂保留(CDH 集群一致),等集群升级再一并上调
+- pyspark 2.4.0 固定(与 CDH 集群 Spark 版本一致,客户端必须对齐)
 
 ### 7.2 日志改进
 

+ 3 - 1
kb/92-重构进度.md

@@ -42,7 +42,7 @@
 - [x] 全局替换 SQL 中的 `ADD FILE tendata/...` → `ADD FILE dw_base/...`(2026-04-15)
 - [x] 全局替换 `zip -qr tendata.zip tendata` → `zip -qr dw_base.zip dw_base`(2026-04-15,spark_sql.py f-string 形式已手工修正)
 - [x] 全局替换 `addPyFile('tendata.zip')` → `addPyFile('dw_base.zip')`(2026-04-15,publish.sh 同步更新)
-- [ ] 全局替换路径正则 `re.sub(r"tendata-warehouse.*", ...)` → 使用新项目名(绑定仓库改名,2026-04-20 老业务文件批量清理后剩余约 15 处:`dw_base/scheduler/polling_scheduler.py` / `drop_*.py`、`dw_base/utils/*`、`dw_base/ds/ds_start_workflow.py`(已删)、`bin/doris-*-starter.py`、`dw_base/spark/udf/customs/company_abbr.py`)——另有 `dw_base/utils/diff_utils.py` 的 `target_folder='tendata-warehouse'` 字符串字面量、`dw_base/spark/td_spark_init.py`(已删)docstring 等非 re.sub 形式的引用一并处理
+- [ ] 全局替换路径正则 `re.sub(r"tendata-warehouse.*", ...)` → 使用新项目名(绑定仓库改名;2026-04-20 两批老业务清理后残留进一步缩小到 `dw_base/utils/` 剩余文件、`bin/doris-*-starter.py`、`dw_base/spark/udf/customs/company_abbr.py` 等;`diff_utils.py` 的字符串字面量、`polling_scheduler.py` / `drop_*.py` 已随删除一并清零)
 - [x] 排查 `tendata_corp` 等数据库名/表名引用,**确认不要误替换**(2026-04-15,已确认保留:`tendata_corp`、`tendata_bigdata256!`、`ent_tendata_interface`、`api.tendata.cn`)
 - [x] 新建 `jobs/` 目录 + `jobs/{raw,ods,dim,dwd,dws,tdm,ads}/` 子目录(2026-04-15,已放 `.gitkeep`,`dim/` 为顶层独立分层)
 - [x] 新建 `manual/` 目录 + 5 个子目录(`ddl/`、`backfill/`、`fix/`、`adhoc/`、`archive/`)(2026-04-15,已放 `.gitkeep`;`manual/ddl/` 是所有 DDL 的唯一来源)
@@ -113,6 +113,7 @@
 - [ ] 删除废弃空模块和注释代码
 - [ ] **重新实现 Hive HDFS 小文件合并工具**:原 `dw_base/utils/hive_file_merge.py`(2026-04-20 随老业务批清理一并删除)提供 `alter table ... partition (...) concatenate` 压实能力,但硬编码了老 HiveServer 连接 / `cts_*_ex/_im` 表名规则 / `mirror_country` 过滤。新版需通用化:HiveServer 连接从 `conf/` 读取、表过滤参数化,剥离业务命名假设
 - [ ] **重写告警模块**:老钉钉告警文件(`dingtalk_*` / `ent_interface_dingtalk*` / `country_count_dingtalk` / `spark_parse_json_to_hive` 里的 `dingtalk()` / `bin/dingtalk-work-alert.sh`)已于 2026-04-20 全部删除;新项目不再使用钉钉,Webhook Key 走 `conf/alerter.ini`(见 `90-重构路线.md §2.1`)
+- [ ] **重新实现分区保留工具**:老 `dw_base/scheduler/drop_partitions.py` + `drop_daily_full_snapshot_tbls.py`(2026-04-20 删除)提供"按表清理超期分区,保留最近 N 天 + 例外 dt"能力。前者硬编码海关 `cts_{mgdb}_{catalog}` 表名 + `mg_count_monitor` 元表,后者元表驱动(`daily_full_snapshot_tbls` 存 `db/tbl/days`)模式更通用。新版采用元表驱动 + 保留天数参数化 + 例外 dt 白名单,不绑业务表名;新目录可能不叫 scheduler(按 N 天清分区不是调度职责,更像 `ops/` 或 `maintenance/`)
 - [ ] Spark / HMS 侧 Ranger Hive 策略验证(低优先级,见 `90-重构路线.md` §7.5)
 - [x] 精简 `requirements.txt`(2026-04-15 提前完成:48 行 → 10 个强依赖,老清单备份到 `requirements.txt.bak` 并逐行打标)
 
@@ -155,4 +156,5 @@
 | 2026-04-18 | **§2.8 改造降级为"条件触发"**(第三轮修正):用户提供老项目真实生产 json 样例显示只写 `defaultFS`(无 `hadoopConfig`)也能跑 HA —— 说明老 worker 节点 `hdfs-site.xml` 配置完整,`hadoopConfig` 是**可选覆盖**而非 HA 必要条件。前两轮论断("必须加 `hadoopConfig`"、"运维把 xml 写死单 NN")都被推翻。§2.8 加"新环境 HDFS HA 自检清单"(`echo $HADOOP_CONF_DIR` / grep xml HA keys / `hadoop fs -ls hdfs://nameservice1/`),三项全过则整节改造不做;仅任何一项失败才启动 ini schema 升级 + `HDFSDataSource` 改造。92 阶段 2 checklist 相应改为"自检前置 + 条件触发"4 条子项 | — |
 | 2026-04-18 | **§2.8 锁定 Path B(第四轮,实测决定)**:新 CDH 环境三连实测(json 含/不含 `hadoopConfig` × `HADOOP_CONF_DIR` 设/不设),结论:对 DataX JVM,仅 json 的 `hadoopConfig` 块有效,`HADOOP_CONF_DIR` 无效(`datax.py` 不把 conf 目录入 classpath,与 `hadoop` 命令行不同)。老项目能纯 `defaultFS` 跑通最可能是老运维把 `hdfs-site.xml` 塞进了 DataX classpath 目录,新环境 `/opt/datax` 没这类预置文件。改造要点:(a) `HDFSDataSource.get_datasource_dict()` 吃 `[hadoop_config]` 整节注入 `hadoopConfig`;(b) 删除 `dw_base/__init__.py:16` `os.environ['HADOOP_CONF_DIR']` 死代码。简化 §2.8 文本:去掉 `ha_enabled` 开关(用 `[hadoop_config]` 节存在性代替)、去掉自检决策树(已决定)、去掉"运维手工改 IP"误记 | — |
 | 2026-04-20 | **§7.2.1 再次反转**:删除 `whoami == RELEASE_USER` 分流,`LOG_ROOT_DIR` 改为单值默认 `${HOME}/log` 并保留在 `conf/env.sh`(外配后期可改)。理由:`$HOME` 天然按用户隔离(bigdata/个人用户家目录不同),代码判断是多余一层;`bigdata` 本身就是专属调度账号,其 `$HOME` 即是生产日志合法归宿,不需要系统级 `/opt/data/log` 那条路。同步更新 `90-重构路线.md §7.2.1`(核心段)+ `§2.1 硬编码表行` + `§2.4 env.sh 草稿` + `00-项目架构.md §6 部署段` + `92 阶段 2 checklist` | — |
+| 2026-04-20 | **老业务耦合代码第二批清理(重构计划外)**:在 UDF/模块独立化讨论中顺带盘点 `dw_base/` 子模块,决定 16 文件批量删除:**整目录删 3 个**——`oss/`(oss2_util.py + __init__,新业务不需要对象存储)、`scheduler/`(polling_scheduler / drop_partitions / drop_daily_full_snapshot_tbls 三业务文件,前者绑死老 Mongo 轮询、后两者按 N 天清分区的能力已在阶段 4 记录重写任务)、`hive/`(hive_utils + hive_constants;hive_utils 中 `get_hive_create_table_ddl*` 零引用 + 依赖 `COLUMN_NAME_COMMENT_DICT` 老业务字段字典、DDL 生成器整体不重建;`get_hive_database_name` / `get_hive_table_prefix` 两个命名约定函数语义已在 `kb/21-命名规范.md` 有规则,不重建代码,后续 `bin/datax-gc-generator.py` 从零重写时按新约定实现);**utils/ 删 7 文件**——data_distinct / diff_utils / excel_to_hive_utils / hive_diff_database / hive_to_excel_utils / pdt_check_table / pdt_check_table_multis,全部零外部引用 + 强业务耦合(硬编码 tendata 路径 / 老集群 IP `192.168.30.3` / 中文表名拼音转换 / 海关 `cts_*` 表名模式)。**连带效应**:`bin/datax-gc-generator.py:26` import hive_utils 成破损 import,由 90-路线 §2.7 "从零重写" 任务覆盖,不单独修复。**阶段 4 新增任务**:重新实现分区保留工具(元表驱动 + 参数化天数,目录可能不叫 scheduler)。**CLAUDE.md 规则追加**:"空模块直接删"原则首次执行延后(elasticsearch/flink/ml/validation/common/ 暂留,后续更细粒度规整) | — |
 | 2026-04-20 | **老业务耦合代码批量清理(重构计划外)**:排查 `tendata` 残留时发现一批与 `tendata_corp` / `ent_tendata_interface` / DolphinScheduler / 钉钉告警强耦合的存量文件,逐项核对后批量删除 40 个文件 + 精简 1 个:**老业务模块 34**(`dw_base/scheduler/` 下 `get_oldmongo_*` ×5、`dingtalk_*` / `ent_interface_dingtalk*` / `country_count_dingtalk` / `mg_company_alias_init` ×8、`mg2es/` 整目录 13 文件;`dw_base/ds/` 整目录 4 文件;`dw_base/spark/udf/spark_read_hive_columns_cnt.py`;`dw_base/utils/tid_utils.py`;`dw_base/spark/td_spark_init.py`(老同事 xunxu 所写未被调用);`bin/hive-exec.sh`),**级联清理 6**(`dw_base/spark/udf/spark_id_generate_udf.py` + `dw_base/spark/udf/enterprise/unique/spark_tid_match_udf.py` 依赖已删 `tid_utils`;`dw_base/utils/hive_file_merge.py` + `dw_base/utils/spark_parse_json_to_hive.py` 依赖已删 `mg2es`/钉钉告警;`bin/hive-exec-job-starter.py` 调用已删 `hive-exec.sh`;`bin/dingtalk-work-alert.sh`),**精简 1**:`dw_base/spark/udf/spark_mmq_udf.py` 从 530 行裁到 4 个数据类型转换函数(phone/domain/website/statname 等场景相关 UDF 与 Mongo 相关逻辑全删)。同步更新:`00-项目架构.md`(移除 `td_spark_init` / DS 相关条目)、`90-重构路线.md`(钉钉 + 企微 Webhook 合并表述、删除 DS API 行、§5.2 依赖清理清单标记提前完成)、`92-进度.md` 阶段 1 第 6 行 `re.sub` checklist 更新残留范围(~15 处)。**阶段 4 新增两项任务**:(1) 重新实现 Hive HDFS 小文件合并工具(通用化连接 / 剥离 `cts_*_ex/_im` 表名假设);(2) 重写告警模块(弃钉钉走 `conf/alerter.ini` Webhook) | — |