import os import re import sys abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from pyspark.sql import SparkSession """ 产品库工具:检测表当日是否存在新增数据 参数1:表名table_name 参数2:分区日期dt 输出:用于DS调度检测 """ def check_data(table_name, date): # 创建 SparkSession spark = SparkSession.builder \ .appName(f"check_{table_name}_{date}_data") \ .config("spark.driver.memory", "1g") \ .config("spark.executor.memory", "2g") \ .config("spark.executor.instances", "2") \ .config("spark.executor.cores", "2") \ .config("spark.executor.memoryOverhead", "512") \ .config("hive.exec.dynamic.partition", "true") \ .config("hive.exec.dynamic.partition.mode", "nonstrict") \ .config("spark.yarn.queue", "pdt") \ .enableHiveSupport() \ .getOrCreate() # 执行查询 query = f"SELECT COUNT(1) AS count FROM {table_name} WHERE dt = '{date}'" result = spark.sql(query).collect()[0]['count'] # 关闭 SparkSession spark.stop() return result > 0 if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: python check_hive_data.py ") sys.exit(1) table_name = sys.argv[1] date = sys.argv[2] if check_data(table_name, date): print('${setValue(is_run=%s)}' % 'true') else: print('${setValue(is_run=%s)}' % 'false')