| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- import hashlib
- # 企业库唯一性调整,离线数据udf
- from datetime import datetime
- from dw_base.spark.udf.customs.common_clean import clean_company_name
- def generate_md5_hash(input_str: str):
- input_data = input_str.encode('utf-8')
- md5_hash = hashlib.md5()
- md5_hash.update(input_data)
- return md5_hash.hexdigest()
- def generate_tid_idn(company_name: str,
- business_number: str,
- city: str) -> str or None:
- if not company_name:
- return None
- if business_number:
- if city:
- input_str = f"{business_number}-{city}AAA"
- else:
- input_str = business_number + 'BBB'
- else:
- input_str = company_name + 'CCC'
- return 'IDN' + generate_md5_hash(input_str)
- def clean_company_name_extra(s: str) -> str or None:
- if s:
- prefixes = ['PT', 'PT.', 'CV', 'CV.']
- suffixes = ['PT', ',PT', 'CV', ',CV']
- # 去除前缀
- for prefix in prefixes:
- if s.startswith(prefix):
- s = s[len(prefix):]
- break
- # 去除后缀
- for suffix in suffixes:
- if s.endswith(suffix):
- s = s[:-len(suffix)]
- break
- # 截断字符:如果DI前后有空格,就把DI及后面的字符截掉
- if ' DI ' in s:
- s = s[:s.index(' DI ')]
- # 去除字符串前后的空格
- s = s.strip()
- return s
- def clean_company_name_idn(company_name: str) -> str or None:
- if company_name:
- name = clean_company_name(company_name)
- if name:
- name = clean_company_name_extra(name)
- return name
- return None
- def get_standard_company_name(s: str) -> str:
- s = s.strip()
- if s.upper().startswith('PT ') or s.upper().startswith('PT.'):
- s = 'PT.' + s[3:].strip()
- elif s.upper().startswith('P.T.') or s.upper().startswith('P T '):
- s = 'PT.' + s[4:].strip()
- elif s.upper().startswith('CV ') or s.upper().startswith('CV.'):
- s = 'CV.' + s[3:].strip()
- else:
- s = 'PT.' + s
- # 检查并去除后缀
- suffixes_to_remove = ['., PT', ', PT', ',PT', 'PT', ',CV', 'CV']
- for suffix in suffixes_to_remove:
- if s.upper().endswith(suffix):
- s = s[:-len(suffix)].strip()
- break
- return s
- if __name__ == '__main__':
- name = 'P.T.ACEH KIAT BEUTARI JL INDONESI'
- print(get_standard_company_name(name))
- pass
|