| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import re
- from pyspark.sql.functions import udf
- from pyspark.sql.types import StructType, StructField, IntegerType , StringType
- # 定义结构体类型
- # schema = StructType([
- # StructField("type", IntegerType(), False),
- # StructField("contact", StringType(), True)
- # ])
- @udf(returnType=StructType([
- StructField("type", IntegerType(), False),
- StructField("contact", StringType(), True)
- ]))
- def format_phone(s:str)->(int,str):
- # "国区编号+分隔符+10位手机号码
- # 国区编号:91、+91、(+91)、(91)或不展示
- # 分隔符:一个半角空格或横杠或不展示
- # 10位手机号码:10位数字或5个数字+分隔符+5个数字,首个数字为6-9其中的数字"
- # +91 9987654321
- # 919987654321
- # +91 99876-54321
- # type:
- # 1-手机号
- # 2-座机
- # 3-邮箱
- # 4-地址
- # 99-其他
- type,res = parse_phone(s)
- if type != 99:
- return (type,res)
- else:
- type,res = parse_fixed_phone(s)
- if type != 99:
- return (type,res)
- else:
- return parse_email(s)
- @udf(returnType=IntegerType())
- def check_email_type(s:str)->int:
- return parse_email(s)[0]
- def parse_email(s:str)->(int,str):
- rex = re.search(r'^[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*(\.[a-zA-Z]+)+$',s)
- if rex:
- return (3,rex.group(0))
- else:
- return (99,s)
- def parse_fixed_phone(s:str)->(int,str):
- s_rex = re.search(r'^(\+91|91|\(91\)|\(\+91\))(.*)', s)
- start_pattern = r'^([- ]*)(011|11|022|22|033|33|044|44|020|20|040|40|080|80|0141|141|\(011\)|\(11\)|\(022\)|\(22\)|\(033\)|\(33\)|\(044\)|\(44\)|\(020\)|\(20\)|\(040\)|\(40\)|\(080\)|\(80\)|\(0141\)|\(141\))?[- ]*(.*)'
- if s_rex:
- last_sub = s_rex.group(2)
- return get_fixed_phone_res(s, last_sub, start_pattern)
- else:
- return get_fixed_phone_res(s, s, start_pattern)
- def parse_phone(s:str)->(int,str):
- s_rex = re.search(r'^(\+91|91|\(91\)|\(\+91\))(.*)', s)
- if s_rex:
- last_sub = s_rex.group(2)
- return get_phone_res(s, last_sub, r'^[6-9- 0]')
- else:
- return get_phone_res(s, s, r'^[6-9- 0]')
- def get_fixed_phone_res(phone_str:str,last_sub_phone:str,start_pattern:str)->(int,str):
- # 分隔符:一个半角空格或横杠或不展示
- # 国区编码+分隔符+区号+分隔符+7/8/10座机号码
- # 国区编号:同上
- # 分隔符:同上
- # 区号:011、022、033、044、020、040、080、0141,0可能隐藏,区号外可能带括号
- # 7位座机号码:7位数字或者3位数字+分隔符+4位数字
- # 8位座机号码:8位数字或者4位数字+分割符+4位数字
- # 10位座机号码:10位数字或者4位数字+分隔符+6位数字
- re_search_res = re.search(start_pattern, last_sub_phone)
- if re_search_res:
- area,fixed_phone = re_search_res.group(2),re_search_res.group(3)
- left,right = split_fixed_phone(fixed_phone)
- if left is None:
- if area:
- left,right = split_fixed_phone(area + fixed_phone)
- if left:
- return 2, "{} {} {}".format("+91", left, right)
- return 99,phone_str
- else:
- if not area:
- return 2, "{} {} {}".format("+91", left,right)
- area_num_rex = re.search(r'\d+',area)
- if area_num_rex:
- area_num = area_num_rex.group(0)
- if area_num.startswith('0'):
- area_str="".join(["(",area_num,")"])
- else:
- area_str = "".join(["(0", area_num, ")"])
- return 2, "{} {} {} {}".format("+91", area_str, left,right)
- else:
- return 99,phone_str
- def get_phone_res(phone_str:str,last_sub_phone:str,start_pattern:str)->(int,str):
- # 分隔符:一个半角空格或横杠或不展示
- if not bool(re.match(start_pattern, last_sub_phone)):
- return 99,phone_str
- rex_last_sub_phone = re.search(r'([0]?)(\d.*$)', last_sub_phone)
- if rex_last_sub_phone:
- phone = rex_last_sub_phone.group(2)
- # 10位手机号码:10位数字或5个数字+分隔符+5个数字,首个数字为6-9其中的数字
- if not bool(re.match(r'^[6-9]{1}[0-9]{4}[- ]*[0-9]{5}$', phone)):
- return 99,phone_str
- return 1,"{} {} {}".format("+91",phone[0:5],phone[-5:])
- else:
- return 99, phone_str
- def split_fixed_phone(fixed_phone:str)->(str,str):
- p1 = re.search(r'^([1-9]{1}[0-9]{3})([- ]*)([0-9]{6})$', fixed_phone)
- if p1:
- return p1.group(1),p1.group(3)
- p2 = re.search(r'^([1-9]{1}[0-9]{3})([- ]*)([0-9]{4})$', fixed_phone)
- if p2:
- return p2.group(1),p2.group(3)
- p3 = re.search(r'^([1-9]{1}[0-9]{2})([- ]*)([0-9]{4})$', fixed_phone)
- if p3:
- return p3.group(1),p3.group(3)
- return None,None
- # 注册UDF并指定返回类型
- get_type_and_format_phone = udf(format_phone)
|