|
@@ -97,6 +97,18 @@ def query_columns_full(conn, schema, table):
|
|
|
return cur.fetchall()
|
|
return cur.fetchall()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+def _resolve_to_project_root(path):
|
|
|
|
|
+ """相对路径按项目根解析,绝对路径原样返回。
|
|
|
|
|
+
|
|
|
|
|
+ 复用 dw_base.datax.entry._resolve_relative_to_base 的逻辑——
|
|
|
|
|
+ 任何 cwd 跑此脚本都能找到 mask conf 等相对路径资源,
|
|
|
|
|
+ 与项目其他 bin 入口(datax-hive-import-starter 等)行为一致。
|
|
|
|
|
+ """
|
|
|
|
|
+ if os.path.isabs(path):
|
|
|
|
|
+ return path
|
|
|
|
|
+ return os.path.join(project_root, path)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def load_mask_conf(path):
|
|
def load_mask_conf(path):
|
|
|
"""读 mask 配置 ini,返回 {field: method} dict。
|
|
"""读 mask 配置 ini,返回 {field: method} dict。
|
|
|
|
|
|
|
@@ -232,7 +244,11 @@ def main():
|
|
|
conn.close()
|
|
conn.close()
|
|
|
|
|
|
|
|
# full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...]
|
|
# full_rows: [(attnum, attname, comment, pg_type, pk_flag), ...]
|
|
|
- mask_dict = load_mask_conf(args.mask_conf) if args.mask_conf else {}
|
|
|
|
|
|
|
+ if args.mask_conf:
|
|
|
|
|
+ mask_path = _resolve_to_project_root(args.mask_conf)
|
|
|
|
|
+ mask_dict = load_mask_conf(mask_path)
|
|
|
|
|
+ else:
|
|
|
|
|
+ mask_dict = {}
|
|
|
|
|
|
|
|
# mask 配置含表中不存在字段时 stderr 警告(不阻断)
|
|
# mask 配置含表中不存在字段时 stderr 警告(不阻断)
|
|
|
pg_field_set = {r[1] for r in full_rows}
|
|
pg_field_set = {r[1] for r in full_rows}
|