import sys import os import re abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.scheduler.mg2es.conf_reader import ConfReader from dw_base.scheduler.mg2es.redis_operator import RedisOperator from dw_base.spark.spark_sql import SparkSQL from dw_base.utils.config_utils import parse_args if __name__ == '__main__': CONFIG, _ = parse_args(sys.argv[1:]) dt = CONFIG.get('dt') cf = ConfReader() host, port, db, password = cf.get_redis_conf() redis_client = RedisOperator(host, port, db, password) spark = SparkSQL().get_spark_session() state_dict = redis_client.get_hash_table_all('customs:state:dict') country_dict = redis_client.get_hash_table_all('customs:country:dict') state_df_dict = [{"field": k.decode(), "value": v.decode()} for k, v in state_dict.items()] country_df_dict = [{"field": k.decode(), "value": v.decode()} for k, v in country_dict.items()] state_df = spark.createDataFrame(state_df_dict) country_df = spark.createDataFrame(country_df_dict) # 注册DataFrame为临时视图 state_df.createOrReplaceTempView("redis_state_data") country_df.createOrReplaceTempView("redis_country_data") # 将数据写入Hive表 spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") spark.sql("set spark.yarn.queue=cts") print("开始写入Hive表") spark.sql(f"INSERT overwrite TABLE dim.redis_cts_state_dict SELECT *,'{dt}' FROM redis_state_data") spark.sql(f"INSERT overwrite TABLE dim.redis_cts_country_dict SELECT *,'{dt}' FROM redis_country_data") # 停止SparkSession spark.stop()