| 123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- # 用于仅保留dwd 近7日和19700101 数据
- # daily_full_snapshot_tbls
- # task.daily_full_snapshot_tbls
- # 测试参数 -monitor_db test
- # 生产参数 -monitor_db task 可以不赋值
- import datetime
- import sys
- import re
- import os
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.config_utils import parse_args
- def main():
- CONFIG, _ = parse_args(sys.argv[1:])
- monitor_db = CONFIG.get('monitor_db', 'task')
- spark = SparkSQL()
- sql1 = (f"select db, tbl,days from {monitor_db}.daily_full_snapshot_tbls "
- f"where is_deleted = '0'")
- res = spark.query(sql1)[0].collect()
- for record in res:
- db = record['db']
- tbl = record['tbl']
- ds = record['days']
- days_ago = datetime.datetime.now() - datetime.timedelta(days = ds)
- format_date = days_ago.strftime('%Y%m%d')
- sql2 = (f"SHOW PARTITIONS {db}.{tbl}")
- partitions = spark.query(sql2)[0].collect()
- dts = set()
- for dt in partitions:
- a1 = dt['partition'].split('=')[1][:8]
- if a1 < format_date and a1 != '19700101' and a1 != '20200101':
- dts.add(dt['partition'].split('=')[1][:8])
- for p in dts:
- sql3 = f" alter TABLE {db}.{tbl} DROP PARTITION ( dt='{p}') "
- spark.query(sql3)[0].collect()
- if __name__ == "__main__":
- main()
|