| 123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- # 用于仅保留dwd 近7日和19700101 数据
- # 测试参数 -slect_db tmp -drop_db dwd_smp
- # 生产参数 -slect_db task -drop_db dwd
- import datetime
- import sys
- import re
- import os
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.config_utils import parse_args
- def main():
- CONFIG, _ = parse_args(sys.argv[1:])
- slect_db = CONFIG.get('slect_db')
- drop_db = CONFIG.get('drop_db')
- spark=SparkSQL()
- seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=4)
- format_date = seven_days_ago.strftime('%Y%m%d')
- sql1 = (f"select mgdb, catalog from {slect_db}.mg_count_monitor "
- f"where is_deleted = '0'")
- res = spark.query(sql1)[0].collect()
- extends_db=["global_bol"]
- for record in res:
- mgdb = record['mgdb']
- catalog = record['catalog']
- if mgdb not in extends_db:
- sql2 = (f"SHOW PARTITIONS {drop_db}.cts_{mgdb}_{catalog}")
- partitions = spark.query(sql2)[0].collect()
- dts = []
- for dt in partitions:
- a1 = dt['partition'].split('=')[1]
- if a1 < format_date and a1 != '19700101' and a1 != '20200101':
- dts.append(dt['partition'].split('=')[1])
- for p in dts:
- sql3 = f" alter TABLE {drop_db}.cts_{mgdb}_{catalog} DROP PARTITION ( dt='{p}') "
- spark.query(sql3)[0].collect()
- if __name__ == "__main__":
- main()
|