drop_daily_full_snapshot_tbls.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # 用于仅保留dwd 近7日和19700101 数据
  2. # daily_full_snapshot_tbls
  3. # task.daily_full_snapshot_tbls
  4. # 测试参数 -monitor_db test
  5. # 生产参数 -monitor_db task 可以不赋值
  6. import datetime
  7. import sys
  8. import re
  9. import os
  10. abspath = os.path.abspath(__file__)
  11. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  12. sys.path.append(root_path)
  13. from dw_base.spark.spark_sql import SparkSQL
  14. from dw_base.utils.config_utils import parse_args
  15. def main():
  16. CONFIG, _ = parse_args(sys.argv[1:])
  17. monitor_db = CONFIG.get('monitor_db', 'task')
  18. spark = SparkSQL()
  19. sql1 = (f"select db, tbl,days from {monitor_db}.daily_full_snapshot_tbls "
  20. f"where is_deleted = '0'")
  21. res = spark.query(sql1)[0].collect()
  22. for record in res:
  23. db = record['db']
  24. tbl = record['tbl']
  25. ds = record['days']
  26. days_ago = datetime.datetime.now() - datetime.timedelta(days = ds)
  27. format_date = days_ago.strftime('%Y%m%d')
  28. sql2 = (f"SHOW PARTITIONS {db}.{tbl}")
  29. partitions = spark.query(sql2)[0].collect()
  30. dts = set()
  31. for dt in partitions:
  32. a1 = dt['partition'].split('=')[1][:8]
  33. if a1 < format_date and a1 != '19700101' and a1 != '20200101':
  34. dts.add(dt['partition'].split('=')[1][:8])
  35. for p in dts:
  36. sql3 = f" alter TABLE {db}.{tbl} DROP PARTITION ( dt='{p}') "
  37. spark.query(sql3)[0].collect()
  38. if __name__ == "__main__":
  39. main()