""" 批量匹配tid """ import hashlib import json from functools import lru_cache from dw_base.spark.udf.enterprise.unique.ent_offline_udf_america import generate_tid_usa, clean_company_name_usa from dw_base.spark.udf.enterprise.unique.ent_offline_udf_india import clean_company_name_ind, generate_tid_ind from dw_base.spark.udf.enterprise.unique.ent_offline_udf_indonesia import clean_company_name_idn, generate_tid_idn from dw_base.spark.udf.enterprise.unique.ent_offline_udf_russia import clean_company_name_rus, generate_tid_rus from dw_base.spark.udf.enterprise.unique.ent_offline_udf_turkey import clean_company_name_tur, generate_tid_tur from dw_base.utils.tid_utils import TidGeneratorFactory mapping = {} tid_generator = TidGeneratorFactory().createTidGenerator('Enterprise') def generate_tid(website, name, country_code3): if not name: return None if country_code3 in ['IDN']: cleaned_name = clean_company_name_idn(name) return match_tid(name, cleaned_name, country_code3) elif country_code3 in ['USA']: cleaned_name = clean_company_name_usa(name) return match_tid(name, cleaned_name, country_code3) elif country_code3 in ['TUR']: cleaned_name = clean_company_name_tur(name) return match_tid(name, cleaned_name, country_code3) elif country_code3 in ['IND']: cleaned_name = clean_company_name_ind(name) return match_tid(name, cleaned_name, country_code3) elif country_code3 in ['RUS']: cleaned_name = clean_company_name_rus(name) return match_tid(name, cleaned_name, country_code3) else: return old_generate_tid(website, name, country_code3) def generate_md5_hash(input_str: str): md5_hash = hashlib.md5(input_str.encode('utf-8')) return md5_hash.hexdigest() def old_generate_tid(website, name, country_code3): if not name: return None input_str = website if website else f"{name}-{country_code3 if country_code3 else ''}" return generate_md5_hash(input_str) def match_tid(name: str, cleaned_name: str, country: str): tid = cache_tid(name, cleaned_name, country) if not tid: if country == 'IDN': return generate_tid_idn(cleaned_name, None, None) elif country == 'USA': return generate_tid_usa(cleaned_name, None, None) elif country == 'TUR': return generate_tid_tur(cleaned_name, None) elif country == 'IND': return generate_tid_ind(cleaned_name, None) elif country == 'RUS': return generate_tid_rus(cleaned_name, None, None) return tid @lru_cache(maxsize=1000000) def cache_tid(name: str, cleaned_name: str, country: str): key = '%s--%s' % ( name if name else "", country if country else "" ) cleaned_key = '%s--%s' % (cleaned_name if cleaned_name else "", country if country else "") tid = mapping.get(key) or mapping.get(cleaned_key) if tid is None: # 如果mapping里没有该tid,进行匹配 tid = tid_generator.match_tid(name, country) if tid is None: # 如果匹配结果为null,则向mapping写入一个空字符串 tid = tid_generator.match_tid(cleaned_name, country) if tid is None: mapping[key] = '' mapping[cleaned_key] = '' else: mapping[cleaned_key] = tid else: mapping[key] = tid elif tid == '': # 对于第一次没有匹配到tid的公司,第二次进入该方法会得到一个空字符串,此时应返回null return None return tid if __name__ == '__main__': print(generate_tid('', 'KENCANA LINTASINDO INTERNASIONAL', 'IDN'))