import re from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, IntegerType , StringType # 定义结构体类型 # schema = StructType([ # StructField("type", IntegerType(), False), # StructField("contact", StringType(), True) # ]) @udf(returnType=StructType([ StructField("type", IntegerType(), False), StructField("contact", StringType(), True) ])) def format_phone(s:str)->(int,str): # "国区编号+分隔符+10位手机号码 # 国区编号:91、+91、(+91)、(91)或不展示 # 分隔符:一个半角空格或横杠或不展示 # 10位手机号码:10位数字或5个数字+分隔符+5个数字,首个数字为6-9其中的数字" # +91 9987654321 # 919987654321 # +91 99876-54321 # type: # 1-手机号 # 2-座机 # 3-邮箱 # 4-地址 # 99-其他 type,res = parse_phone(s) if type != 99: return (type,res) else: type,res = parse_fixed_phone(s) if type != 99: return (type,res) else: return parse_email(s) @udf(returnType=IntegerType()) def check_email_type(s:str)->int: return parse_email(s)[0] def parse_email(s:str)->(int,str): rex = re.search(r'^[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*(\.[a-zA-Z]+)+$',s) if rex: return (3,rex.group(0)) else: return (99,s) def parse_fixed_phone(s:str)->(int,str): s_rex = re.search(r'^(\+91|91|\(91\)|\(\+91\))(.*)', s) start_pattern = r'^([- ]*)(011|11|022|22|033|33|044|44|020|20|040|40|080|80|0141|141|\(011\)|\(11\)|\(022\)|\(22\)|\(033\)|\(33\)|\(044\)|\(44\)|\(020\)|\(20\)|\(040\)|\(40\)|\(080\)|\(80\)|\(0141\)|\(141\))?[- ]*(.*)' if s_rex: last_sub = s_rex.group(2) return get_fixed_phone_res(s, last_sub, start_pattern) else: return get_fixed_phone_res(s, s, start_pattern) def parse_phone(s:str)->(int,str): s_rex = re.search(r'^(\+91|91|\(91\)|\(\+91\))(.*)', s) if s_rex: last_sub = s_rex.group(2) return get_phone_res(s, last_sub, r'^[6-9- 0]') else: return get_phone_res(s, s, r'^[6-9- 0]') def get_fixed_phone_res(phone_str:str,last_sub_phone:str,start_pattern:str)->(int,str): # 分隔符:一个半角空格或横杠或不展示 # 国区编码+分隔符+区号+分隔符+7/8/10座机号码 # 国区编号:同上 # 分隔符:同上 # 区号:011、022、033、044、020、040、080、0141,0可能隐藏,区号外可能带括号 # 7位座机号码:7位数字或者3位数字+分隔符+4位数字 # 8位座机号码:8位数字或者4位数字+分割符+4位数字 # 10位座机号码:10位数字或者4位数字+分隔符+6位数字 re_search_res = re.search(start_pattern, last_sub_phone) if re_search_res: area,fixed_phone = re_search_res.group(2),re_search_res.group(3) left,right = split_fixed_phone(fixed_phone) if left is None: if area: left,right = split_fixed_phone(area + fixed_phone) if left: return 2, "{} {} {}".format("+91", left, right) return 99,phone_str else: if not area: return 2, "{} {} {}".format("+91", left,right) area_num_rex = re.search(r'\d+',area) if area_num_rex: area_num = area_num_rex.group(0) if area_num.startswith('0'): area_str="".join(["(",area_num,")"]) else: area_str = "".join(["(0", area_num, ")"]) return 2, "{} {} {} {}".format("+91", area_str, left,right) else: return 99,phone_str def get_phone_res(phone_str:str,last_sub_phone:str,start_pattern:str)->(int,str): # 分隔符:一个半角空格或横杠或不展示 if not bool(re.match(start_pattern, last_sub_phone)): return 99,phone_str rex_last_sub_phone = re.search(r'([0]?)(\d.*$)', last_sub_phone) if rex_last_sub_phone: phone = rex_last_sub_phone.group(2) # 10位手机号码:10位数字或5个数字+分隔符+5个数字,首个数字为6-9其中的数字 if not bool(re.match(r'^[6-9]{1}[0-9]{4}[- ]*[0-9]{5}$', phone)): return 99,phone_str return 1,"{} {} {}".format("+91",phone[0:5],phone[-5:]) else: return 99, phone_str def split_fixed_phone(fixed_phone:str)->(str,str): p1 = re.search(r'^([1-9]{1}[0-9]{3})([- ]*)([0-9]{6})$', fixed_phone) if p1: return p1.group(1),p1.group(3) p2 = re.search(r'^([1-9]{1}[0-9]{3})([- ]*)([0-9]{4})$', fixed_phone) if p2: return p2.group(1),p2.group(3) p3 = re.search(r'^([1-9]{1}[0-9]{2})([- ]*)([0-9]{4})$', fixed_phone) if p3: return p3.group(1),p3.group(3) return None,None # 注册UDF并指定返回类型 get_type_and_format_phone = udf(format_phone)