import codecs import re import json from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, StringType from dw_base.spark.udf.customs.common_clean import clean_company_name url_bad_list = [ 'www.,' , 'www.' , '/web:' , 'http:////' , 'http:///' , 'http://' , 'https://' , 'web:' , 'ww�w.' , 'www�' , 'w�' ] china_url_suff_list = [ '.com.cn', '.com', '.cn', '.org.cn' '.org', '.net', '.info', ] # 俄罗斯域名后缀 russia_url_suff_list = [ # --通用顶级域名 '.group', '.hu', '.it', '.com.cy', '.com', '.net', '.org', '.int', '.edu', '.tech', '.group', '.eco', '.eu', '.info', '.company' # --俄罗斯域名 '.рф', '.ru', '.su', '.by', '.biz', '.pro', '.coop', '.aero', '.museum', '.xyz', '.online', '.site' ] # 中国工商URL清洗 def clean_url_china(url): if url is not None: url = url.lower() if url in ['ltd.', '.ltd.']: return None if url.endswith(',ltd.'): return None for bad in url_bad_list: url = url.replace(bad, '') for suffix in china_url_suff_list: if suffix in url: return url[:url.index(suffix)] + suffix return url # 俄罗斯URL清洗 def clean_url_russia(url): if url is not None and url != '': url = url.lower() for bad in url_bad_list: url = url.replace(bad, '') if '.' not in url: return None for suffix in russia_url_suff_list: if suffix in url: return url[:url.index(suffix)] + suffix return url # 美国工商URL清洗 def clean_url_america(url): if url: url = url.lower() for bad in url_bad_list: url = url.replace(bad, '') if ':' in url: # 分割URL以获取域名部分 parts = url.split(':', 1) url = parts[0] # 只保留端口号前的域名部分 # 再次检查URL中是否包含斜杠,如果是,则只保留斜杠前的部分 if '/' in url: parts = url.split('/', 1) url = parts[0] if re.search(r'(\d+\.\d+\.\d+\.\d+)', url): return None return url return None # 通用网址清洗规则 def clean_url_common(url): if url: url = url.lower() for bad in url_bad_list: url = url.replace(bad, '') if not url: return None if '/' in url: parts = url.split('/', 1) return parts[0] else: return url return None # 网址测试 # if __name__ == '__main__': # test_case_list = [ # 'https://www.ianshaw.biz/p/contact-management.php', # 'https://charnleyfertilisers.co.uk/', # 'https://nyulangone.org/doctors/1205925765/carol-dunetz?cid=syn_yext\u0026y_entity_id=1205925765-primary\u0026y_source=1_MjU0NTEyNzEtNDgzLWxvY2F0aW9uLndlYnNpdGU%3D', # 'https://www.carolleviandcompany.it/', # 'https://schrotthandel-heinen.de/', # 'http://201.149.15.54:88/', # 'http://190.107.176.73/~prodinwe/www2/inicio.html', # 'https://findadoctor.atlantichealth.org/provider/Joseph+C+Lugo/1140352?unified=lugo\u0026sort=networks%2Crelevance\u0026_ga=2.142101431.428278081.1637589591-505885973.1636636554\u0026_gac=1.36491028.1637590169.EAIaIQobChMImKmH3JKs9AIVl4TICh2yrwEXEAAYASAAEgKlDvD_BwE' # # # ] # for url in test_case_list: # print(f'url: {url} ----> {clean_url_america(url)}') # 国家工商URL清洗 def clean_url(country, url): if country == 'China': return clean_url_china(url) if country == 'Russia': return clean_url_russia(url) if country == 'America': return clean_url_america(url) return None # 越南电话要替换成分隔符的字符串 vietnam_tel_split_list = [ 'faxno' , 'fax-' , '-fax' , 'fax.' , 'fax' , 'tele' ] vietnam_tel_bad_list = [ 'f' , 'awelexports@gmailcom' , 'm-' , 'axno' , 'ax' , 'no' , '(ext' , 'linhkt' , '.' , 'nhnh3' ] reverse_str_list = [ '.', '/' ] # 字符串反转输出 def reverse_str(str): if str: for str1 in reverse_str_list: if str1 in str: parts = str.split(str1) # 倒序排列分割后的部分 reversed_parts = parts[::-1] # 使用join方法将倒序后的部分重新组合成字符串 str = '-'.join(reversed_parts) return str return None # 英文和空格替换成'' def replace_english_and_space(str): result = re.sub(r'[a-zA-Z\s]', '', str) return result # 数组元素去重 def array_remove_duplicates(str): if str: str_array = str.split(',') unique_str = list(set(str_array)) return ','.join(unique_str) return None if __name__ == '__main__1': test_case_list = [ '[]', '[91220101123911541QCHN]', '[, 91220101123911541QCHN]' ] for arraystr in test_case_list: print(f'tel: {arraystr} ----> {array_remove_duplicates(arraystr)}') company_name_pattern1 = r'(^[0-9]{2}\.[0-9]{3}\.[0-9]{3})(.*)' # 12.575.462 ALVARO PEREIRA DA SILVEIRA FILHO company_name_pattern2 = r'.+( [0-9]+)$' # HEBE DE ABREU VILELA CPF 027116806149 # 公司名称清洗 去重前置xx-xxx-xxx def clean_brazil_company_name(name): if name: namepattern1_match = re.search(company_name_pattern1, name) if namepattern1_match: namepattern1 = namepattern1_match.group(2) return clean_company_name(namepattern1) namepattern2_match = re.search(company_name_pattern2, name) if namepattern2_match: namepattern2 = namepattern2_match.group(1) if len(namepattern2) > 8: return clean_company_name(name.replace(namepattern2, '')) return clean_company_name(name) else: return None # 土耳其 ,分隔电话,如果少于10位,则置空 def phone_clean_turkey(phone): if phone: # 将输入字符串分割成数组 phone_arr = phone.split(',') # 过滤数组元素,长度不等10的元素置空 phone_arr_new = [str for str in phone_arr if len(str) == 10] # 将过滤后的数组重新组合成字符串,如果没有元素则返回空字符串 phone_str = ','.join(phone_arr_new) if phone_arr_new else None return phone_str return None # 土耳其 ,分隔传真,9开头11位,置空;0开头11位,删除0;1位和12为置空 def fax_clean_turkey(fax): if fax: fax_len = len(fax) if fax_len == 10: return fax elif fax_len == 11 and fax.startswith('0'): return fax[1:] return None if __name__ == '__main__1': test_case_list = [ # turkey-phone-alltype '4443361', '2164708444', '2122772674,4', '2123518966,67', '4449911,4441311', '2126944565,4444080', '214511936,2125037861', '2123225997,2123228911', '2165274671,2162663626,4441158', '2163782062,2163782649,2163787830', '', None, # turkey-fax-alltype '021648847322', '02164884732', '92164884732', '2164884732', '0' ] for str in test_case_list: print(f'tel: {str} ----> {fax_clean_turkey(str)}') # 行业代码清洗 pattern = r'\d{2}\.\d{2}\.\d{2}' def turkey_nicecode(nicecode): if nicecode: codes = re.findall(pattern, nicecode) result = ', '.join(codes) result = result.replace('.', '') return result return None if __name__ == '__main__1': test_case_list = [ '["10.71.01-Taze pastane ürünleri imalatı (yaş pasta, kuru pasta, poğaça, kek, börek, pay, turta, waffles vb.)"]', '["15.12.07-Deri, kösele, karma deri ve diğer malzemelerden bavul, el çantası, cüzdan, okul çantası, evrak çantası, deriden sigaralık, deri ayakkabı bağı, kişisel bakım, dikiş, vb. amaçlı seyahat seti, vb. ürünlerin imalatı"]', '["07.29.06-Krom madenciliği"]', '["35.12.13-Elektrik enerjisinin iletimi (elektrik üretim kaynağından dağıtım sistemine aktaran iletim sistemlerinin işletilmesi)","42.22.02-Enerji santralleri inşaatı (hidroelektrik santrali, termik santral, nükleer enerji üretim santralleri vb.)","35.11.19-Elektrik enerjisi üretimi"]' ] # for str in test_case_list: # print(f'tel: {str} ----> {turkey_nicecode(str)}') email_pattern1 = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9]+@[a-zA-Z.]+' # CONTADOR@JANSENANDRE@HOTMAIL.COM email_pattern2 = r'.*@$' # XXXXXXXXX@XXXXX@ @@@@@@@@@@2 @@@@@@@@@@ brazil_bad_email = [ '@', '*', '-', '.', ',' ] # 巴西邮箱清洗 def email_clean_brazil(email): if email: email = email.lower().replace('@@', '@').replace(',.', '.').replace('.,', '.') for badstr in brazil_bad_email: if email.startswith(badstr) | email.endswith(badstr): return None if '.' not in email: return None if email == 'flr@flr.@bol.com.br': return None if email.count('@') == 1: email = email.replace(',', '.') if re.search(r'[a-zA-Z0-9]+[a-zA-Z0-9._%+-]+@[a-zA-Z0-9._%+-]+\.[a-zA-Z]*', email): return email if re.search(email_pattern1, email): return None email_pattern = re.compile(r'[a-zA-Z0-9]+[a-zA-Z0-9._%+-]+@[a-zA-Z0-9._%+-]+\.[a-zA-Z]*') emails = email_pattern.findall(email) return emails return None if __name__ == '__main__': test_case_list = [ 'HUGO.SANSIL@GMAIL.COM E HUGO@SISTEMAFIEG.ORG.BR', "laaltenhofen@brturbo.com.br ou luialtenhofen@hotmail.com", "SANDRA_MMC@BOL.COM.BR NAIRMOTADIAS@HOTMAIL.COM", "fundesco@ig.com.br /e ou juliocesarcoelho@ig.com.br", "choco.mixgold@hotmail.com / ou elisangela.sena@gmail.com", "SALES@ZGC.COM / REPAIR@ZGC.COM / WWW.ZGC.COM", "veronica@beereayres.com.br veronicabeer@uol.com.br advocacia@beereayres.com.br", "emanoel@amazoniaim.aginaria@org.br", "emerson.pires@contabilidadepires@.com.br" , '@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@' , '@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@2' , 'XXXXXXXXXXXXX@XXXXX@' , '@' , 'lcregina@terra.com.br+phcontabil@brturb' , 'lcregina@terra.com.br, phcontabil@brturb' , 'abc.abc@abcabc@brturb' , 'alguizardi@hotmail.com -Terceiros.10@hotmail.com' , 'JURIDICO@LSCONTABILIDADE,COM.BR' , 'LUCIANO.KLEIMAN@B4WASTE.COM.,BR' , 'flr@flr.@bol.com.br' , 'aurenirrodrigues@ig,com.br' , 'kallfotosdigital@hotmailcom' , 'jurandireleicao2020@gmail' , ',,CICERO.BONFIM@HOTMAIL.COM' , ',japsjcampos@ig.combr' , 'FERNANDO@TWINFORMATICA.COM.BR' ] for str in test_case_list: print(f'tel: {str} ----> {email_clean_brazil(str)}') def arr_str_to_str(str): # 检查输入字符串是否为空 if str: str = str.replace('[]', '') if str: # 使用json.loads()解析JSON字符串,然后使用join将列表转换为字符串 return ','.join(json.loads(str)) # 如果输入为空,返回空字符串 return None if __name__ == '__main__1': test_case_list = [ '[]', '["accounting","financial services"]', '["staffing & recruiting"]', '["management consulting","business consulting & services"]', '', None ] for arraystr in test_case_list: print(f'tel: {arraystr} ----> {arr_str_to_str(arraystr)}') bad_tel_part1 = r'[^0-9+]' bad_tel_part2 = re.compile(r'^(.*?)([^\d]+)$') # (r'^(.*?)([a-zA-Z\-\(\) ]+)$') def clean_tel_apollo(str): if str: # str = str.lower() # for bad_tel_str in bad_tel_list: # str= str.replace(bad_tel_str,'') clean_str1 = re.sub(bad_tel_part1, ' ', str) str = ' '.join(clean_str1.split()) bad_match = bad_tel_part2.search(str) if bad_match: str = bad_match.group(1).strip() else: str = str.strip() # 判断位数 cleane_str2 = re.sub(r'[^\d]', '', str) if len(cleane_str2) < 7: return None else: return str return None if __name__ == '__main__1': test_case_list = [ '+1-866-344-7857 ext. 311', '(678)826-BUY1', '(844)800-BULL', '+ (373) 68 488 807 MDA', '++420 606 075 787 (Po - Pá)', '+1 412-281-4100 ext 212', '', None ] for str_tel in test_case_list: print(f'tel: {str_tel} ----> {clean_tel_apollo(str_tel)}') type_url = { "author": "tw.com/", "facebook": "facebook.com/", "google": "google.com/", "google|twcamp": "tw.com/", "instagram": "instagram.com/", "linkedin": "linkedin.com/", "pinterest": "pinterest.com/", "serp|twgr": "tw.com/", "tfw": "tw.com/", "tfw&screen_name=ferrespanola&tw_p=followbutton": "tw.com/", "twitter": "twitter.com/", "youtube": "youtube.com/", "crunchbase": "crunchbase.com/", "angellist": "angel.co/" } bad_url_list = [ 'https:', 'https://www', 'www' ] def socialmedia_url(socialtype, url): if not url: return None # 检查类别是否存在于字典中 if socialtype in type_url: url_split = type_url[socialtype] url = url.lower() if url_split in url: url_clean = url.split(url_split)[-1].rstrip('/|#>+-.;?@}') if url_clean in bad_url_list: return None else: return url_clean url = url.lower().rstrip('/|#>+-.;?@}') if url in bad_url_list: return None else: return url if __name__ == '__main__1': test_case_list = [ ("youtube", "https://youtube.com/user/BrotherCanadaEn"), ("facebook", "https://www.facebook.com/eastwesteng/"), ("google", "https://google.com/search?q=test"), ("author", "https://tw.com/SRAMroad?ref_src=twsrc"), ("tfw&screen_name=ferrespanola&tw_p=followbutton", "https://tw.com/search?q=test"), ("serp|twgr", "https://tw.com/search?q=test"), ("twitter", "https://twitter.com/#"), ("linkedin", "https://www.linkedin.com/in/meb-jsc/#"), ("instagram", "https://www.instagram.com/##############/"), ("facebook", "https://www.facebook.com/https://www.facebook.com/komlider38/"), ("pinterest", "https://www.pinterest.com/lampstore/https://www.pinterest.com/lampstore/"), ("linkedin", "https://www.linkedin.com/start/join?session_redirect=https://www.linkedin.com/company/swelect-energy-systems-ltd?trk=biz-companies-cym&source=D8E90337EA&trk=login_reg"), ("google", "https://twitter.com/search?q=test"), ("whatsapp", "919822025525"), ("nonexistent", "https://nonexistent.com/page"), ("", "919822025525"), ("twitter", "https://twitter.com/92342/3#4"), ("twitter", "https://twitter.com/@#dfw}kdn|"), ("twitter", "https://twitter.com/euroledwwwhttps:"), ("facebook", "https://facebook.com/alburoojrealestate/"), (None, ""), ("", None), (None, None) ] for socialtype, url in test_case_list: suffix = socialmedia_url(socialtype, url) print(f'category: {socialtype}, url: {url} ----> {suffix}') def hongkong_previous_name_clean(str): if str: if str.startswith('-- '): str = str[3:] else: str = str[12:] return str return None if __name__ == '__main__1': test_case_list = [ '-- PACIFIC PRODUCTS LIMITED AUSTRALIAN PRODUCTS LIMITED', '03-MAY-2013 Fuente Union Import And Export Limited 福恩特聯合進出口有限公司' '', None ] for str_tel in test_case_list: print(f':{str_tel}---->{hongkong_previous_name_clean(str_tel)}') # 英国爬虫匹配股份占比 sharepercent_pattern = re.compile(r'\["ownership-of-shares-(.+?)-percent') def uk_sharepercent(str): if str: sharepercent_match = re.search(sharepercent_pattern, str) if sharepercent_match: sharepercent = sharepercent_match.group(1) return sharepercent else: return None if __name__ == '__main__1': test_case_list = [ '["ownership-of-shares-25-to-50-percent","voting-rights-25-to-50-percent"]', '["ownership-of-shares-more-than-25-percent-registered-overseas-entity"]', '', None ] for str_tel in test_case_list: print(f':{str_tel}---->{uk_sharepercent(str_tel)}')