Explorar o código

feat(datax): postgresql_reader 集成 [mask] 段自动生成 querySql

优先级 手写 querySql > [mask] 自动生成 > table 透传
mask_config 由 configparser.items('mask') 读取;columns 从 ini reader 段
再读一次(子类 load_column 未填 self.columns 的老代码契约问题,不改基类)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu hai 2 semanas
pai
achega
8a6b3349d8
Modificáronse 1 ficheiros con 7 adicións e 0 borrados
  1. 7 0
      dw_base/datax/plugins/reader/postgresql_reader.py

+ 7 - 0
dw_base/datax/plugins/reader/postgresql_reader.py

@@ -54,6 +54,13 @@ class PostgreSQLReader(Reader):
         else:
             jdbc_url = f'{jdbc_url}/{database}'
         query_sql = self.config_parser.get(self.plugin_type, POSTGRE_SQL_READER_PARAMETER_QUERY_SQL)
+        # 优先级:手写 querySql > [mask] 段自动生成 > table 透传
+        if not query_sql and self.config_parser.has_section('mask'):
+            from dw_base.datax.mask import build_query_sql
+            columns_list = [c.strip() for c in self.config_parser.get(
+                self.plugin_type, POSTGRE_SQL_WRITER_PARAMETER_COLUMN).split(',')]
+            mask_config = dict(self.config_parser.items('mask'))
+            query_sql = build_query_sql('postgresql', columns_list, mask_config, table, where)
         query_sql = query_sql.replace('${start_date}', start_date)
         query_sql = query_sql.replace('${start-date}', start_date)
         query_sql = query_sql.replace('${stop_date}', stop_date)