# 用于仅保留dwd 近7日和19700101 数据 # 测试参数 -slect_db tmp -drop_db dwd_smp # 生产参数 -slect_db task -drop_db dwd import datetime import sys import re import os abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.config_utils import parse_args def main(): CONFIG, _ = parse_args(sys.argv[1:]) slect_db = CONFIG.get('slect_db') drop_db = CONFIG.get('drop_db') spark=SparkSQL() seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=4) format_date = seven_days_ago.strftime('%Y%m%d') sql1 = (f"select mgdb, catalog from {slect_db}.mg_count_monitor " f"where is_deleted = '0'") res = spark.query(sql1)[0].collect() extends_db=["global_bol"] for record in res: mgdb = record['mgdb'] catalog = record['catalog'] if mgdb not in extends_db: sql2 = (f"SHOW PARTITIONS {drop_db}.cts_{mgdb}_{catalog}") partitions = spark.query(sql2)[0].collect() dts = [] for dt in partitions: a1 = dt['partition'].split('=')[1] if a1 < format_date and a1 != '19700101' and a1 != '20200101': dts.append(dt['partition'].split('=')[1]) for p in dts: sql3 = f" alter TABLE {drop_db}.cts_{mgdb}_{catalog} DROP PARTITION ( dt='{p}') " spark.query(sql3)[0].collect() if __name__ == "__main__": main()