# coding=utf-8 """ udf """ import json import logging from dw_base.database.mongodb_utils import MongoDBHandler class TidGenerator(object): def match_pid(self, company_name: str, country: str) -> str: raise Exception("not implemented yet") class MongoTidGenerator(TidGenerator): def __init__(self): self.tid_field = None self.alias_field = None self.country_field = None self.company_aliases = None logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def match_tid(self, company_name: str, country: str) -> str: pid = None if company_name: pid = self.__match_by_company_name(company_name, country) return pid def __match_by_company_name(self, company_name: str, country: str) -> str: if not company_name: return None documents = [] find_result_by_name = self.company_aliases.find({self.alias_field: {"$eq": company_name}}) for document in find_result_by_name: tid_value = document.get(self.tid_field) if tid_value and tid_value[:3] == country: documents.append(document) if len(documents) > 0: max_document = max(documents, key=lambda x: x.get(self.tid_field, 0)) return max_document.get(self.tid_field) return None class EnterpriseTidGenerator(MongoTidGenerator): def __init__(self): super().__init__() self.uri = 'mongodb://tendata_corp:TD_corpqyk22@192.168.11.27:21868/?authSource=tendata_corp' self.database = "tendata_corp" self.collection_alias = "company_aliases" self.tid_field = 'tid' self.alias_field = 'alias' self.country_field = 'country_code3' self.mongo_client = MongoDBHandler(self.uri).mongo_client self.company_aliases = self.mongo_client.get_database(self.database).get_collection(self.collection_alias) class HBaseTidGenerator(TidGenerator): def __init__(self): raise Exception("not implemented yet") # 自定义异常类 class UnsupportedDimensionError(Exception): def __init__(self, dimension): self.dimension = dimension super().__init__(f"Not supported generator dimension: {dimension}") class TidGeneratorFactory(object): @staticmethod def createTidGenerator(dimension: str): if dimension is None: raise ValueError("Dimension cannot be None") switch_generator_dict = { 'Enterprise': EnterpriseTidGenerator(), } if dimension not in switch_generator_dict: raise UnsupportedDimensionError(dimension) return switch_generator_dict[dimension] if __name__ == '__main__': pass