| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import argparse
- import os
- import re
- import sys
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from pyspark.sql import SparkSession
- """
- 产品库工具:检测表当日是否存在新增数据
- 参数1:表名table_name
- 参数2:分区日期dt
- 输出:用于DS调度检测
- """
- def check_data(tables, date, country):
- # 创建 SparkSession
- spark = SparkSession.builder \
- .appName(f"check_{tables}_data") \
- .config("spark.driver.memory", "1g") \
- .config("spark.executor.memory", "2g") \
- .config("spark.executor.instances", "2") \
- .config("spark.executor.cores", "2") \
- .config("spark.executor.memoryOverhead", "512") \
- .config("hive.exec.dynamic.partition", "true") \
- .config("hive.exec.dynamic.partition.mode", "nonstrict") \
- .config("spark.yarn.queue", "pdt") \
- .enableHiveSupport() \
- .getOrCreate()
- # 执行查询
- result = 0
- for table in tables:
- if date is not None and country is not None:
- query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}' and country = '{country}'"
- elif date is not None and country is None:
- query = f"SELECT COUNT(1) AS count FROM {table} WHERE dt = '{date}'"
- elif date is None and country is not None:
- query = f"SELECT COUNT(1) AS count FROM {table} WHERE country = '{country}'"
- else:
- query = f"SELECT COUNT(1) AS count FROM {table}"
- print(query)
- result = spark.sql(query).collect()[0]['count']
- if result > 0:
- break
- # 关闭 SparkSession
- spark.stop()
- return result > 0
- def parse_arguments():
- parser = argparse.ArgumentParser(description="Check Table Multiple")
- parser.add_argument('--tables', type=str, nargs='+', required=True, help='table names')
- parser.add_argument('--dt', type=str, required=False, help='table partition')
- parser.add_argument('--country', type=str, required=False, help='table country')
- # 解析命令行参数
- return parser.parse_args()
- if __name__ == "__main__":
- args = parse_arguments()
- print(f"任务参数:{args}")
- table_names = args.tables
- date = args.dt
- country = args.country
- if check_data(table_names, date, country):
- print('${setValue(is_run=%s)}' % 'true')
- else:
- print('${setValue(is_run=%s)}' % 'false')
|