dict_redis2hive.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import sys
  2. import os
  3. import re
  4. abspath = os.path.abspath(__file__)
  5. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  6. sys.path.append(root_path)
  7. from dw_base.scheduler.mg2es.conf_reader import ConfReader
  8. from dw_base.scheduler.mg2es.redis_operator import RedisOperator
  9. from dw_base.spark.spark_sql import SparkSQL
  10. from dw_base.utils.config_utils import parse_args
  11. if __name__ == '__main__':
  12. CONFIG, _ = parse_args(sys.argv[1:])
  13. dt = CONFIG.get('dt')
  14. cf = ConfReader()
  15. host, port, db, password = cf.get_redis_conf()
  16. redis_client = RedisOperator(host, port, db, password)
  17. spark = SparkSQL().get_spark_session()
  18. state_dict = redis_client.get_hash_table_all('customs:state:dict')
  19. country_dict = redis_client.get_hash_table_all('customs:country:dict')
  20. state_df_dict = [{"field": k.decode(), "value": v.decode()} for k, v in state_dict.items()]
  21. country_df_dict = [{"field": k.decode(), "value": v.decode()} for k, v in country_dict.items()]
  22. state_df = spark.createDataFrame(state_df_dict)
  23. country_df = spark.createDataFrame(country_df_dict)
  24. # 注册DataFrame为临时视图
  25. state_df.createOrReplaceTempView("redis_state_data")
  26. country_df.createOrReplaceTempView("redis_country_data")
  27. # 将数据写入Hive表
  28. spark.sql("set hive.exec.dynamic.partition=true")
  29. spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
  30. spark.sql("set spark.yarn.queue=cts")
  31. print("开始写入Hive表")
  32. spark.sql(f"INSERT overwrite TABLE dim.redis_cts_state_dict SELECT *,'{dt}' FROM redis_state_data")
  33. spark.sql(f"INSERT overwrite TABLE dim.redis_cts_country_dict SELECT *,'{dt}' FROM redis_country_data")
  34. # 停止SparkSession
  35. spark.stop()