pdt_check_table_multis.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import argparse
  2. import os
  3. import re
  4. import sys
  5. abspath = os.path.abspath(__file__)
  6. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  7. sys.path.append(root_path)
  8. from pyspark.sql import SparkSession
  9. """
  10. 产品库工具:检测表当日是否存在新增数据
  11. 参数1:表名table_name
  12. 参数2:分区日期dt
  13. 输出:用于DS调度检测
  14. """
  15. def check_data(tables, date, country):
  16. # 创建 SparkSession
  17. spark = SparkSession.builder \
  18. .appName(f"check_{tables}_data") \
  19. .config("spark.driver.memory", "1g") \
  20. .config("spark.executor.memory", "2g") \
  21. .config("spark.executor.instances", "2") \
  22. .config("spark.executor.cores", "2") \
  23. .config("spark.executor.memoryOverhead", "512") \
  24. .config("hive.exec.dynamic.partition", "true") \
  25. .config("hive.exec.dynamic.partition.mode", "nonstrict") \
  26. .config("spark.yarn.queue", "pdt") \
  27. .enableHiveSupport() \
  28. .getOrCreate()
  29. # 执行查询
  30. result = 0
  31. for table in tables:
  32. if date is not None and country is not None:
  33. query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}' and country = '{country}'"
  34. elif date is not None and country is None:
  35. query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}'"
  36. elif date is None and country is not None:
  37. query = f"SELECT COUNT(1) AS count FROM {table} WHERE country = '{country}'"
  38. else:
  39. query = f"SELECT COUNT(1) AS count FROM {table}"
  40. print(query)
  41. result = spark.sql(query).collect()[0]['count']
  42. if result > 0:
  43. break
  44. # 关闭 SparkSession
  45. spark.stop()
  46. return result > 0
  47. def parse_arguments():
  48. parser = argparse.ArgumentParser(description="Check Table Multiple")
  49. parser.add_argument('--tables', type=str, nargs='+', required=True, help='table names')
  50. parser.add_argument('--dt', type=str, required=False, help='table partition')
  51. parser.add_argument('--country', type=str, required=False, help='table country')
  52. # 解析命令行参数
  53. return parser.parse_args()
  54. if __name__ == "__main__":
  55. args = parse_arguments()
  56. print(f"任务参数:{args}")
  57. table_names = args.tables
  58. date = args.dt
  59. country = args.country
  60. if check_data(table_names, date, country):
  61. print('${setValue(is_run=%s)}' % 'true')
  62. else:
  63. print('${setValue(is_run=%s)}' % 'false')