pdt_check_table.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import os
  2. import re
  3. import sys
  4. abspath = os.path.abspath(__file__)
  5. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  6. sys.path.append(root_path)
  7. from pyspark.sql import SparkSession
  8. """
  9. 产品库工具:检测表当日是否存在新增数据
  10. 参数1:表名table_name
  11. 参数2:分区日期dt
  12. 输出:用于DS调度检测
  13. """
  14. def check_data(table_name, date):
  15. # 创建 SparkSession
  16. spark = SparkSession.builder \
  17. .appName(f"check_{table_name}_{date}_data") \
  18. .config("spark.driver.memory", "1g") \
  19. .config("spark.executor.memory", "2g") \
  20. .config("spark.executor.instances", "2") \
  21. .config("spark.executor.cores", "2") \
  22. .config("spark.executor.memoryOverhead", "512") \
  23. .config("hive.exec.dynamic.partition", "true") \
  24. .config("hive.exec.dynamic.partition.mode", "nonstrict") \
  25. .config("spark.yarn.queue", "pdt") \
  26. .enableHiveSupport() \
  27. .getOrCreate()
  28. # 执行查询
  29. query = f"SELECT COUNT(1) AS count FROM {table_name} WHERE dt = '{date}'"
  30. result = spark.sql(query).collect()[0]['count']
  31. # 关闭 SparkSession
  32. spark.stop()
  33. return result > 0
  34. if __name__ == "__main__":
  35. if len(sys.argv) != 3:
  36. print("Usage: python check_hive_data.py <table_name> <date>")
  37. sys.exit(1)
  38. table_name = sys.argv[1]
  39. date = sys.argv[2]
  40. if check_data(table_name, date):
  41. print('${setValue(is_run=%s)}' % 'true')
  42. else:
  43. print('${setValue(is_run=%s)}' % 'false')