# 用于仅保留dwd 近7日和19700101 数据 # daily_full_snapshot_tbls # task.daily_full_snapshot_tbls # 测试参数 -monitor_db test # 生产参数 -monitor_db task 可以不赋值 import datetime import sys import re import os abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.config_utils import parse_args def main(): CONFIG, _ = parse_args(sys.argv[1:]) monitor_db = CONFIG.get('monitor_db', 'task') spark = SparkSQL() sql1 = (f"select db, tbl,days from {monitor_db}.daily_full_snapshot_tbls " f"where is_deleted = '0'") res = spark.query(sql1)[0].collect() for record in res: db = record['db'] tbl = record['tbl'] ds = record['days'] days_ago = datetime.datetime.now() - datetime.timedelta(days = ds) format_date = days_ago.strftime('%Y%m%d') sql2 = (f"SHOW PARTITIONS {db}.{tbl}") partitions = spark.query(sql2)[0].collect() dts = set() for dt in partitions: a1 = dt['partition'].split('=')[1][:8] if a1 < format_date and a1 != '19700101' and a1 != '20200101': dts.add(dt['partition'].split('=')[1][:8]) for p in dts: sql3 = f" alter TABLE {db}.{tbl} DROP PARTITION ( dt='{p}') " spark.query(sql3)[0].collect() if __name__ == "__main__": main()