hive_utils.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # -*- coding:utf-8 -*-
  2. from typing import List, Dict
  3. from dw_base.hive.hive_constants import COLUMN_NAME_TYPE_DICT, COLUMN_NAME_COMMENT_DICT
  4. def get_hive_database_name(project: str, layer: str, env: str) -> str:
  5. """
  6. 获取Hive数据库名称
  7. Args:
  8. project: 所属项目
  9. layer: 所属层次
  10. env: 所属环境
  11. Returns: Hive数据库名称
  12. """
  13. if project and project != '':
  14. if layer and layer != '':
  15. database = f'{project}_{layer}'
  16. else:
  17. database = f'{project}_ods'
  18. elif layer and layer != '':
  19. database = layer
  20. else:
  21. database = 'tmp'
  22. if database != 'tmp' and env and env != '':
  23. database = f'{database}_{env}'
  24. return database
  25. def get_hive_table_prefix(project: str, layer: str, version: str) -> str:
  26. """
  27. 获取表名前缀
  28. Args:
  29. project: 所属项目
  30. layer: 所属层次
  31. version: 所属版本
  32. Returns: 表名前缀
  33. """
  34. if layer and layer != '':
  35. if project and project != '':
  36. prefix = f'{layer}_{project}'
  37. else:
  38. prefix = layer
  39. if version and version != '':
  40. prefix = f'{prefix}_{version}'
  41. elif project and project != '':
  42. prefix = project
  43. else:
  44. prefix = 'tmp'
  45. return prefix
  46. def get_hive_create_table_ddl(database: str,
  47. table: str,
  48. columns: List[str],
  49. columns_with_types: Dict,
  50. comment: str = '',
  51. is_external: bool = False,
  52. is_partitioned: bool = False):
  53. """
  54. 生成Hive建表语句
  55. Args:
  56. database: 数据库名称
  57. table: 表名称
  58. columns: 字段
  59. comment: 表注释
  60. is_external: 是否是外部表
  61. is_partitioned: 是否是分区表
  62. Returns:
  63. """
  64. ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n\tCOMMENT '{4}'\n{5}\tSTORED AS ORC\n;"
  65. if is_external:
  66. argument1 = 'EXTERNAL '
  67. else:
  68. argument1 = ''
  69. if database is None:
  70. table_with_database = table
  71. else:
  72. table_with_database = f'{database}.{table}'
  73. max_column_length = max(map(len, columns))
  74. column_defs = []
  75. for col in columns:
  76. padded_col = col.ljust(max_column_length, ' ')
  77. if columns_with_types.__contains__(col):
  78. col_type = columns_with_types.get(col)
  79. else:
  80. col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING')
  81. col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '')
  82. column_defs.append(f" {padded_col} {col_type} COMMENT '{col_comment}'")
  83. argument3 = ',\n'.join(column_defs)
  84. if is_partitioned:
  85. argument5 = '\tPARTITIONED BY (dt STRING)\n'
  86. else:
  87. argument5 = ''
  88. return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5)
  89. def get_hive_create_table_ddl_sop(database: str,
  90. table: str,
  91. columns: List[str],
  92. columns_with_types: Dict,
  93. comment: str = '',
  94. is_external: bool = False,
  95. is_partitioned: bool = False):
  96. """
  97. 生成Hive建表语句
  98. Args:
  99. database: 数据库名称
  100. table: 表名称
  101. columns: 字段
  102. comment: 表注释
  103. is_external: 是否是外部表
  104. is_partitioned: 是否是分区表
  105. Returns:
  106. """
  107. ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n COMMENT '{4}'\n{5} STORED AS ORC\n;"
  108. if is_external:
  109. argument1 = 'EXTERNAL '
  110. else:
  111. argument1 = ''
  112. if database is None:
  113. table_with_database = table
  114. else:
  115. table_with_database = f'{database}.{table}'
  116. max_column_length = max(map(len, columns))
  117. column_defs = []
  118. for col in columns:
  119. padded_col = col.ljust(max_column_length, ' ')
  120. if columns_with_types.__contains__(col):
  121. col_type = columns_with_types.get(col)
  122. else:
  123. col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING')
  124. col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '')
  125. column_defs.append(f" {padded_col} {col_type} COMMENT '{col_comment}'")
  126. argument3 = ',\n'.join(column_defs)
  127. if is_partitioned:
  128. argument5 = ' PARTITIONED BY (dt STRING)\n'
  129. else:
  130. argument5 = ''
  131. return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5)