| 12345678910111213141516171819202122232425262728293031323334353637 |
- import sys
- import os
- import re
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.scheduler.mg2es.conf_reader import ConfReader
- from dw_base.scheduler.mg2es.redis_operator import RedisOperator
- from dw_base.spark.spark_sql import SparkSQL
- from dw_base.utils.config_utils import parse_args
- if __name__ == '__main__':
- CONFIG, _ = parse_args(sys.argv[1:])
- dt = CONFIG.get('dt')
- cf = ConfReader()
- host, port, db, password = cf.get_redis_conf()
- redis_client = RedisOperator(host, port, db, password)
- spark = SparkSQL().get_spark_session()
- state_dict = redis_client.get_hash_table_all('customs:state:dict')
- country_dict = redis_client.get_hash_table_all('customs:country:dict')
- state_df_dict = [{"field": k.decode(), "value": v.decode()} for k, v in state_dict.items()]
- country_df_dict = [{"field": k.decode(), "value": v.decode()} for k, v in country_dict.items()]
- state_df = spark.createDataFrame(state_df_dict)
- country_df = spark.createDataFrame(country_df_dict)
- # 注册DataFrame为临时视图
- state_df.createOrReplaceTempView("redis_state_data")
- country_df.createOrReplaceTempView("redis_country_data")
- # 将数据写入Hive表
- spark.sql("set hive.exec.dynamic.partition=true")
- spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
- spark.sql("set spark.yarn.queue=cts")
- print("开始写入Hive表")
- spark.sql(f"INSERT overwrite TABLE dim.redis_cts_state_dict SELECT *,'{dt}' FROM redis_state_data")
- spark.sql(f"INSERT overwrite TABLE dim.redis_cts_country_dict SELECT *,'{dt}' FROM redis_country_data")
- # 停止SparkSession
- spark.stop()
|