import argparse import os import re import sys abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from pyspark.sql import SparkSession """ 产品库工具:检测表当日是否存在新增数据 参数1:表名table_name 参数2:分区日期dt 输出:用于DS调度检测 """ def check_data(tables, date, country): # 创建 SparkSession spark = SparkSession.builder \ .appName(f"check_{tables}_data") \ .config("spark.driver.memory", "1g") \ .config("spark.executor.memory", "2g") \ .config("spark.executor.instances", "2") \ .config("spark.executor.cores", "2") \ .config("spark.executor.memoryOverhead", "512") \ .config("hive.exec.dynamic.partition", "true") \ .config("hive.exec.dynamic.partition.mode", "nonstrict") \ .config("spark.yarn.queue", "pdt") \ .enableHiveSupport() \ .getOrCreate() # 执行查询 result = 0 for table in tables: if date is not None and country is not None: query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}' and country = '{country}'" elif date is not None and country is None: query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}'" elif date is None and country is not None: query = f"SELECT COUNT(1) AS count FROM {table} WHERE country = '{country}'" else: query = f"SELECT COUNT(1) AS count FROM {table}" print(query) result = spark.sql(query).collect()[0]['count'] if result > 0: break # 关闭 SparkSession spark.stop() return result > 0 def parse_arguments(): parser = argparse.ArgumentParser(description="Check Table Multiple") parser.add_argument('--tables', type=str, nargs='+', required=True, help='table names') parser.add_argument('--dt', type=str, required=False, help='table partition') parser.add_argument('--country', type=str, required=False, help='table country') # 解析命令行参数 return parser.parse_args() if __name__ == "__main__": args = parse_arguments() print(f"任务参数:{args}") table_names = args.tables date = args.dt country = args.country if check_data(table_names, date, country): print('${setValue(is_run=%s)}' % 'true') else: print('${setValue(is_run=%s)}' % 'false')