drop_partitions.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # 用于仅保留dwd 近7日和19700101 数据
  2. # 测试参数 -slect_db tmp -drop_db dwd_smp
  3. # 生产参数 -slect_db task -drop_db dwd
  4. import datetime
  5. import sys
  6. import re
  7. import os
  8. abspath = os.path.abspath(__file__)
  9. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  10. sys.path.append(root_path)
  11. from dw_base.spark.spark_sql import SparkSQL
  12. from dw_base.utils.config_utils import parse_args
  13. def main():
  14. CONFIG, _ = parse_args(sys.argv[1:])
  15. slect_db = CONFIG.get('slect_db')
  16. drop_db = CONFIG.get('drop_db')
  17. spark=SparkSQL()
  18. seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=4)
  19. format_date = seven_days_ago.strftime('%Y%m%d')
  20. sql1 = (f"select mgdb, catalog from {slect_db}.mg_count_monitor "
  21. f"where is_deleted = '0'")
  22. res = spark.query(sql1)[0].collect()
  23. extends_db=["global_bol"]
  24. for record in res:
  25. mgdb = record['mgdb']
  26. catalog = record['catalog']
  27. if mgdb not in extends_db:
  28. sql2 = (f"SHOW PARTITIONS {drop_db}.cts_{mgdb}_{catalog}")
  29. partitions = spark.query(sql2)[0].collect()
  30. dts = []
  31. for dt in partitions:
  32. a1 = dt['partition'].split('=')[1]
  33. if a1 < format_date and a1 != '19700101' and a1 != '20200101':
  34. dts.append(dt['partition'].split('=')[1])
  35. for p in dts:
  36. sql3 = f" alter TABLE {drop_db}.cts_{mgdb}_{catalog} DROP PARTITION ( dt='{p}') "
  37. spark.query(sql3)[0].collect()
  38. if __name__ == "__main__":
  39. main()