| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- import os
- import re
- import sys
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from pyspark.sql import SparkSession
- """
- 产品库工具:检测表当日是否存在新增数据
- 参数1:表名table_name
- 参数2:分区日期dt
- 输出:用于DS调度检测
- """
- def check_data(table_name, date):
- # 创建 SparkSession
- spark = SparkSession.builder \
- .appName(f"check_{table_name}_{date}_data") \
- .config("spark.driver.memory", "1g") \
- .config("spark.executor.memory", "2g") \
- .config("spark.executor.instances", "2") \
- .config("spark.executor.cores", "2") \
- .config("spark.executor.memoryOverhead", "512") \
- .config("hive.exec.dynamic.partition", "true") \
- .config("hive.exec.dynamic.partition.mode", "nonstrict") \
- .config("spark.yarn.queue", "pdt") \
- .enableHiveSupport() \
- .getOrCreate()
- # 执行查询
- query = f"SELECT COUNT(1) AS count FROM {table_name} WHERE dt = '{date}'"
- result = spark.sql(query).collect()[0]['count']
- # 关闭 SparkSession
- spark.stop()
- return result > 0
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: python check_hive_data.py <table_name> <date>")
- sys.exit(1)
- table_name = sys.argv[1]
- date = sys.argv[2]
- if check_data(table_name, date):
- print('${setValue(is_run=%s)}' % 'true')
- else:
- print('${setValue(is_run=%s)}' % 'false')
|