spark_india_format_phone_udf.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import re
  2. from pyspark.sql.functions import udf
  3. from pyspark.sql.types import StructType, StructField, IntegerType , StringType
  4. # 定义结构体类型
  5. # schema = StructType([
  6. # StructField("type", IntegerType(), False),
  7. # StructField("contact", StringType(), True)
  8. # ])
  9. @udf(returnType=StructType([
  10. StructField("type", IntegerType(), False),
  11. StructField("contact", StringType(), True)
  12. ]))
  13. def format_phone(s:str)->(int,str):
  14. # "国区编号+分隔符+10位手机号码
  15. # 国区编号:91、+91、(+91)、(91)或不展示
  16. # 分隔符:一个半角空格或横杠或不展示
  17. # 10位手机号码:10位数字或5个数字+分隔符+5个数字,首个数字为6-9其中的数字"
  18. # +91 9987654321
  19. # 919987654321
  20. # +91 99876-54321
  21. # type:
  22. # 1-手机号
  23. # 2-座机
  24. # 3-邮箱
  25. # 4-地址
  26. # 99-其他
  27. type,res = parse_phone(s)
  28. if type != 99:
  29. return (type,res)
  30. else:
  31. type,res = parse_fixed_phone(s)
  32. if type != 99:
  33. return (type,res)
  34. else:
  35. return parse_email(s)
  36. @udf(returnType=IntegerType())
  37. def check_email_type(s:str)->int:
  38. return parse_email(s)[0]
  39. def parse_email(s:str)->(int,str):
  40. rex = re.search(r'^[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)*(\.[a-zA-Z]+)+$',s)
  41. if rex:
  42. return (3,rex.group(0))
  43. else:
  44. return (99,s)
  45. def parse_fixed_phone(s:str)->(int,str):
  46. s_rex = re.search(r'^(\+91|91|\(91\)|\(\+91\))(.*)', s)
  47. start_pattern = r'^([- ]*)(011|11|022|22|033|33|044|44|020|20|040|40|080|80|0141|141|\(011\)|\(11\)|\(022\)|\(22\)|\(033\)|\(33\)|\(044\)|\(44\)|\(020\)|\(20\)|\(040\)|\(40\)|\(080\)|\(80\)|\(0141\)|\(141\))?[- ]*(.*)'
  48. if s_rex:
  49. last_sub = s_rex.group(2)
  50. return get_fixed_phone_res(s, last_sub, start_pattern)
  51. else:
  52. return get_fixed_phone_res(s, s, start_pattern)
  53. def parse_phone(s:str)->(int,str):
  54. s_rex = re.search(r'^(\+91|91|\(91\)|\(\+91\))(.*)', s)
  55. if s_rex:
  56. last_sub = s_rex.group(2)
  57. return get_phone_res(s, last_sub, r'^[6-9- 0]')
  58. else:
  59. return get_phone_res(s, s, r'^[6-9- 0]')
  60. def get_fixed_phone_res(phone_str:str,last_sub_phone:str,start_pattern:str)->(int,str):
  61. # 分隔符:一个半角空格或横杠或不展示
  62. # 国区编码+分隔符+区号+分隔符+7/8/10座机号码
  63. # 国区编号:同上
  64. # 分隔符:同上
  65. # 区号:011、022、033、044、020、040、080、0141,0可能隐藏,区号外可能带括号
  66. # 7位座机号码:7位数字或者3位数字+分隔符+4位数字
  67. # 8位座机号码:8位数字或者4位数字+分割符+4位数字
  68. # 10位座机号码:10位数字或者4位数字+分隔符+6位数字
  69. re_search_res = re.search(start_pattern, last_sub_phone)
  70. if re_search_res:
  71. area,fixed_phone = re_search_res.group(2),re_search_res.group(3)
  72. left,right = split_fixed_phone(fixed_phone)
  73. if left is None:
  74. if area:
  75. left,right = split_fixed_phone(area + fixed_phone)
  76. if left:
  77. return 2, "{} {} {}".format("+91", left, right)
  78. return 99,phone_str
  79. else:
  80. if not area:
  81. return 2, "{} {} {}".format("+91", left,right)
  82. area_num_rex = re.search(r'\d+',area)
  83. if area_num_rex:
  84. area_num = area_num_rex.group(0)
  85. if area_num.startswith('0'):
  86. area_str="".join(["(",area_num,")"])
  87. else:
  88. area_str = "".join(["(0", area_num, ")"])
  89. return 2, "{} {} {} {}".format("+91", area_str, left,right)
  90. else:
  91. return 99,phone_str
  92. def get_phone_res(phone_str:str,last_sub_phone:str,start_pattern:str)->(int,str):
  93. # 分隔符:一个半角空格或横杠或不展示
  94. if not bool(re.match(start_pattern, last_sub_phone)):
  95. return 99,phone_str
  96. rex_last_sub_phone = re.search(r'([0]?)(\d.*$)', last_sub_phone)
  97. if rex_last_sub_phone:
  98. phone = rex_last_sub_phone.group(2)
  99. # 10位手机号码:10位数字或5个数字+分隔符+5个数字,首个数字为6-9其中的数字
  100. if not bool(re.match(r'^[6-9]{1}[0-9]{4}[- ]*[0-9]{5}$', phone)):
  101. return 99,phone_str
  102. return 1,"{} {} {}".format("+91",phone[0:5],phone[-5:])
  103. else:
  104. return 99, phone_str
  105. def split_fixed_phone(fixed_phone:str)->(str,str):
  106. p1 = re.search(r'^([1-9]{1}[0-9]{3})([- ]*)([0-9]{6})$', fixed_phone)
  107. if p1:
  108. return p1.group(1),p1.group(3)
  109. p2 = re.search(r'^([1-9]{1}[0-9]{3})([- ]*)([0-9]{4})$', fixed_phone)
  110. if p2:
  111. return p2.group(1),p2.group(3)
  112. p3 = re.search(r'^([1-9]{1}[0-9]{2})([- ]*)([0-9]{4})$', fixed_phone)
  113. if p3:
  114. return p3.group(1),p3.group(3)
  115. return None,None
  116. # 注册UDF并指定返回类型
  117. get_type_and_format_phone = udf(format_phone)