spark_mmq_udf.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. import hashlib
  4. import json
  5. import re
  6. from typing import List
  7. from urllib.parse import urlparse
  8. import requests
  9. from pyspark.sql.functions import udf
  10. from pyspark.sql.types import *
  11. def array_to_json(arr: List):
  12. return json.dumps(arr, ensure_ascii=False)
  13. def get_hs_code(arr: List):
  14. url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000'
  15. # 请求参数
  16. params = {
  17. "reportType": "hs_code",
  18. "parameters": {
  19. "sort": "sum_of_money,desc",
  20. "reportName": "海关编码汇总报告"
  21. },
  22. "query": {
  23. "startDate": "2023-01-01",
  24. "endDate": "2023-12-31",
  25. "filterBlank": False,
  26. "filterLogistics": False,
  27. "conditionGroups": [
  28. {
  29. "conditions": [
  30. {
  31. "param": "exporter",
  32. "value": arr
  33. }
  34. ]
  35. }
  36. ]
  37. },
  38. "sortField": "trades,desc",
  39. "reportName": "海关编码汇总报告"
  40. }
  41. headers = {
  42. 'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII'
  43. }
  44. try:
  45. response = requests.post(url=url, json=params, headers=headers)
  46. res = response.json()
  47. if res:
  48. items = res["results"]["items"]
  49. return json.dumps(items, ensure_ascii=False)
  50. return None
  51. except Exception as e:
  52. return None
  53. def get_mdg_code(arr: List):
  54. url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000'
  55. # 请求参数
  56. params = {
  57. "reportType": "country_of_destination_code",
  58. "parameters": {
  59. "sort": "trades,desc",
  60. "reportName": "目的国汇总报告"
  61. },
  62. "query": {
  63. "startDate": "2023-01-01",
  64. "endDate": "2023-12-31",
  65. "filterBlank": False,
  66. "filterLogistics": False,
  67. "conditionGroups": [
  68. {
  69. "conditions": [
  70. {
  71. "param": "exporter",
  72. "value": arr
  73. }
  74. ]
  75. }
  76. ]
  77. },
  78. "sortField": "trades,desc",
  79. "reportName": "目的国汇总报告"
  80. }
  81. headers = {
  82. 'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII'
  83. }
  84. try:
  85. response = requests.post(url=url, json=params, headers=headers)
  86. res = response.json()
  87. if res:
  88. items = res["results"]["items"]
  89. return json.dumps(items, ensure_ascii=False)
  90. return None
  91. except Exception as e:
  92. return None
  93. @udf(returnType=StructType([
  94. StructField("hs_code_4_list", ArrayType(StringType()), False),
  95. StructField("trades_sum_list_str", StringType()),
  96. StructField("trades_sum_total", IntegerType(), False),
  97. StructField("sumOfMoney_sum_total", FloatType(), False),
  98. StructField("sumOfMoney_sum_list_str", StringType()),
  99. StructField("weight_sum_total", FloatType(), False),
  100. StructField("weight_sum_list_str", StringType()),
  101. StructField("quantity_sum_total", FloatType(), False),
  102. StructField("quantity_sum_list_str", StringType())
  103. ]))
  104. def get_hs_code_count(content: str):
  105. if content:
  106. content_json_arr = json.loads(content)
  107. result_dict_trades_sum = {}
  108. result_dict_sumOfMoney_sum = {}
  109. result_dict_weight_sum = {}
  110. result_dict_quantity_sum = {}
  111. for item in content_json_arr:
  112. hs_code_4 = item['__gk'][:4]
  113. item_new = {
  114. "hs_code": hs_code_4,
  115. "trades_sum": int(item['trades_sum']),
  116. "sumOfMoney_sum": round(item['sumOfMoney_sum'], 2),
  117. "weight_sum": round(item['weight_sum'], 2),
  118. "quantity_sum": round(item['quantity_sum'], 2)
  119. }
  120. # 处理 trades_sum
  121. if hs_code_4 in result_dict_trades_sum:
  122. existing_item = result_dict_trades_sum[hs_code_4]
  123. existing_item["trades_sum"] += item_new["trades_sum"]
  124. else:
  125. result_dict_trades_sum[hs_code_4] = {"hs_code": hs_code_4, "trades_sum": item_new["trades_sum"]}
  126. # 处理 sumOfMoney_sum
  127. if hs_code_4 in result_dict_sumOfMoney_sum:
  128. existing_item = result_dict_sumOfMoney_sum[hs_code_4]
  129. existing_item["sumOfMoney_sum"] += item_new["sumOfMoney_sum"]
  130. else:
  131. result_dict_sumOfMoney_sum[hs_code_4] = {"hs_code": hs_code_4,
  132. "sumOfMoney_sum": item_new["sumOfMoney_sum"]}
  133. # 处理 weight_sum
  134. if hs_code_4 in result_dict_weight_sum:
  135. existing_item = result_dict_weight_sum[hs_code_4]
  136. existing_item["weight_sum"] += item_new["weight_sum"]
  137. else:
  138. result_dict_weight_sum[hs_code_4] = {"hs_code": hs_code_4, "weight_sum": item_new["weight_sum"]}
  139. # 处理 quantity_sum
  140. if hs_code_4 in result_dict_quantity_sum:
  141. existing_item = result_dict_quantity_sum[hs_code_4]
  142. existing_item["quantity_sum"] += item_new["quantity_sum"]
  143. else:
  144. result_dict_quantity_sum[hs_code_4] = {"hs_code": hs_code_4, "quantity_sum": item_new["quantity_sum"]}
  145. # 对每个列表按照 "trades_sum" 的值降序排序
  146. trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True)
  147. sumOfMoney_sum_list = sorted(list(result_dict_sumOfMoney_sum.values()), key=lambda x: x["sumOfMoney_sum"],
  148. reverse=True)
  149. weight_sum_list = sorted(list(result_dict_weight_sum.values()), key=lambda x: x["weight_sum"], reverse=True)
  150. quantity_sum_list = sorted(list(result_dict_quantity_sum.values()), key=lambda x: x["quantity_sum"],
  151. reverse=True)
  152. # return list(result_dict_trades_sum.values()), list(result_dict_sumOfMoney_sum.values()), list(
  153. # result_dict_weight_sum.values()), list(result_dict_quantity_sum.values())
  154. hs_code_4_list = [obj['hs_code'] for obj in trades_sum_list]
  155. if 'N/A' in hs_code_4_list:
  156. hs_code_4_list.remove('N/A')
  157. total_trades_sum = 0
  158. for obj in trades_sum_list:
  159. total_trades_sum += obj["trades_sum"]
  160. trades_sum_tatal = int(total_trades_sum)
  161. trades_sum_list_str = ",".join(
  162. ['{' + f'{i["hs_code"]},贸易次数:{i["trades_sum"]}' + '}' for i in trades_sum_list])
  163. total_sumOfMoney_sum = 0.0
  164. for obj in sumOfMoney_sum_list:
  165. total_sumOfMoney_sum += obj["sumOfMoney_sum"]
  166. sumOfMoney_sum_tatal = round(total_sumOfMoney_sum, 2)
  167. sumOfMoney_sum_list_str = ",".join(
  168. ['{' + f'{i["hs_code"]},美元总价:{i["sumOfMoney_sum"]}' + '}' for i in sumOfMoney_sum_list])
  169. total_weight_sum = 0.0
  170. for obj in weight_sum_list:
  171. total_weight_sum += obj["weight_sum"]
  172. weight_sum_tatal = round(total_weight_sum, 2)
  173. weight_sum_list_str = ",".join(
  174. ['{' + f'{i["hs_code"]},千克毛重:{i["weight_sum"]}' + '}' for i in weight_sum_list])
  175. total_quantity_sum = 0.0
  176. for obj in quantity_sum_list:
  177. total_quantity_sum += obj["quantity_sum"]
  178. quantity_sum_tatal = round(total_quantity_sum, 2)
  179. quantity_sum_list_str = ",".join(
  180. ['{' + f'{i["hs_code"]},数量:{i["quantity_sum"]}' + '}' for i in quantity_sum_list])
  181. return hs_code_4_list, trades_sum_list_str, trades_sum_tatal, sumOfMoney_sum_tatal, sumOfMoney_sum_list_str, weight_sum_tatal, weight_sum_list_str, quantity_sum_tatal, quantity_sum_list_str
  182. return None, None, None, None, None, None, None, None, None
  183. def calculate_total_and_list(data_list, key, unit):
  184. total = sum(item[key] for item in data_list)
  185. formatted_list = ', '.join([f"{{{i['hs_code']},{unit}:{i[key]}}}" for i in data_list])
  186. return total, formatted_list
  187. @udf(returnType=StructType([
  188. StructField("hs_code_4", ArrayType(StringType()), False),
  189. StructField("sum_str", StringType()),
  190. StructField("total", FloatType(), False),
  191. ]))
  192. def get_hs_code_count_str(content: str, target_key: str, unit: str):
  193. if content:
  194. content_json_arr = json.loads(content)
  195. result_dict = {}
  196. for item in content_json_arr:
  197. hs_code_4 = item['__gk'][:4]
  198. item_new = {
  199. "hs_code": hs_code_4,
  200. target_key: int(item[target_key]) if target_key == "trades_sum" else round(item[target_key], 2),
  201. }
  202. if hs_code_4 in result_dict:
  203. existing_item = result_dict[hs_code_4]
  204. existing_item[target_key] += item_new[target_key]
  205. else:
  206. result_dict[hs_code_4] = {"hs_code": hs_code_4, target_key: item_new[target_key], "unit": unit}
  207. # 生成结果的列表
  208. hs_code_4_list = [key for key in result_dict.keys() if key != 'N/A']
  209. total, formatted_list = calculate_total_and_list(result_dict.values(), target_key, unit)
  210. return hs_code_4_list, formatted_list, total
  211. return None, None, None
  212. @udf(returnType=StructType([
  213. StructField("des_ctry_code", StringType()),
  214. StructField("sum_str", StringType())
  215. ]))
  216. def get_destination_ctry_count(content: str):
  217. result_dict_trades_sum = {}
  218. if content:
  219. content_json_arr = json.loads(content)
  220. for item in content_json_arr:
  221. des_ctry_code = item['__gk']
  222. item_new = {
  223. "des_ctry_code": des_ctry_code,
  224. "trades_sum": int(item['trades_sum']),
  225. }
  226. if des_ctry_code in result_dict_trades_sum:
  227. existing_item = result_dict_trades_sum[des_ctry_code]
  228. existing_item["trades_sum"] += item_new["trades_sum"]
  229. else:
  230. result_dict_trades_sum[des_ctry_code] = {"des_ctry_code": des_ctry_code,
  231. "trades_sum": item_new["trades_sum"]}
  232. trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True)
  233. destination_ctry_5_list = [obj['des_ctry_code'] for obj in trades_sum_list][0:5]
  234. return json.dumps(destination_ctry_5_list, ensure_ascii=False), json.dumps(trades_sum_list, ensure_ascii=False)
  235. return None, None
  236. @udf(returnType=ArrayType(StringType()))
  237. def arr_str_to_arr(json_str: str) -> list:
  238. if json_str:
  239. return json.loads(json_str)
  240. return []
  241. @udf(ArrayType(StringType()))
  242. def array_slice(input_array, start, end):
  243. if input_array:
  244. result_array = input_array[start:end]
  245. return result_array
  246. return []
  247. def get_union_tax_no(tax_no1, tax_no2):
  248. tax_set = set()
  249. if tax_no1:
  250. tax_set.add(tax_no1)
  251. if tax_no2:
  252. try:
  253. tax_no2_arr = json.loads(tax_no2)
  254. if isinstance(tax_no2_arr, list):
  255. tax_set.update(tax_no2_arr)
  256. else:
  257. tax_set.add(tax_no2)
  258. except json.JSONDecodeError:
  259. tax_set.add(tax_no2)
  260. if not tax_set:
  261. return None
  262. return json.dumps(list(tax_set))
  263. @udf(ArrayType(StringType()))
  264. def tran_social_media(social_media):
  265. if social_media:
  266. try:
  267. social_media_arr = json.loads(social_media)
  268. if isinstance(social_media_arr, list):
  269. name_link_map = {
  270. 'fb': ('facebook', 'facebook.com'),
  271. 'yt': ('youtube', 'youtube.com'),
  272. 'li': ('linkedin', 'linkedin.com'),
  273. 'gp': ('google', 'google.com'),
  274. 'tw': ('twitter', 'twitter.com'),
  275. 'eb': ('ebay', 'ebay.com'),
  276. 'ig': ('instagram', 'instagram.com'),
  277. 'wa': ('whatsapp', 'whatsapp.com'),
  278. 'pi': ('pinterest', 'pinterest.com')
  279. }
  280. cleaned_social_media_arr = []
  281. for social_media in social_media_arr:
  282. name = social_media.get('name')
  283. link = social_media.get('link')
  284. if name in name_link_map:
  285. new_name, new_domain = name_link_map[name]
  286. social_media['name'] = new_name
  287. social_media['link'] = link.replace(f'{name}.com', new_domain)
  288. # 如果是whatsapp,处理包含<br>的情况
  289. if name == 'wa':
  290. phone_numbers = link.split('<br>')
  291. for number in phone_numbers:
  292. cleaned_social_media_arr.append(json.dumps({
  293. 'name': new_name,
  294. 'link': number.strip()
  295. }))
  296. else:
  297. cleaned_social_media_arr.append(json.dumps(social_media))
  298. return cleaned_social_media_arr
  299. except json.JSONDecodeError:
  300. return []
  301. return []
  302. @udf(StringType())
  303. def clean_phone_string(input_str):
  304. if not input_str:
  305. return None
  306. # 转英文逗号
  307. cleaned_str = input_str.replace(',', ',')
  308. # 将前后都是数字的逗号替换为空
  309. cleaned_str = re.sub(r'(\d),(\d)', r'\1\2', cleaned_str)
  310. # "<br>" 转换为英文逗号
  311. cleaned_str = cleaned_str.replace('<br>', ',').replace('<br', ',').replace('br>', ',')
  312. # 去除所有的空格
  313. cleaned_str = cleaned_str.replace(' ', '')
  314. # 去除所有输入法的特殊字符
  315. cleaned_str = re.sub(r'[-()()]', '', cleaned_str)
  316. # 去除“.”
  317. cleaned_str = cleaned_str.replace('.', '')
  318. # "+"号前面若是数字,增加英文逗号
  319. cleaned_str = re.sub(r'(\d)\+', r'\1,+', cleaned_str)
  320. # 去除所有的引号
  321. cleaned_str = cleaned_str.replace('"', '').replace("'", '').replace("‘", '').replace("’", '').replace("“",
  322. '').replace(
  323. "”", '').replace("[", '').replace("]", '')
  324. return json.dumps(cleaned_str.split(','))
  325. @udf(ArrayType(StringType()))
  326. def str_to_json_arr(json_str):
  327. if json_str:
  328. try:
  329. str_arr = json.loads(json_str)
  330. if isinstance(str_arr, list):
  331. return [json.dumps(sm) for sm in str_arr]
  332. except json.JSONDecodeError:
  333. return []
  334. return []
  335. def extract_domain(url):
  336. if not url:
  337. return None
  338. if not url.startswith(('http://', 'https://')):
  339. url = 'http://' + url
  340. try:
  341. domain = urlparse(url).netloc
  342. return domain[4:] if domain.startswith('www.') else domain
  343. except Exception:
  344. return None
  345. def add_company_item(input_string: str, input_list_str: str):
  346. if not input_list_str:
  347. input_list = []
  348. else:
  349. try:
  350. input_list = json.loads(input_list_str)
  351. if not isinstance(input_list, list):
  352. raise ValueError("input_list_str must be a JSON representation of a list")
  353. except json.JSONDecodeError:
  354. raise ValueError("input_list_str is not a valid JSON")
  355. if input_string:
  356. try:
  357. potential_list = json.loads(input_string)
  358. if isinstance(potential_list, list):
  359. input_list.extend(potential_list)
  360. else:
  361. raise ValueError
  362. except (json.JSONDecodeError, ValueError):
  363. input_list.append(input_string)
  364. unique_list = list(set(input_list))
  365. return json.dumps(unique_list)
  366. def taiwan_company_status_mapping(status):
  367. mapping = {
  368. "撤回認許已清算完結": "Dissolved",
  369. "臺中市政府": None, # 特殊值,置为 None
  370. "廢止": "Dissolved",
  371. "撤銷許可": "Status unknown",
  372. "廢止許可": "Dissolved",
  373. "廢止登記已清算完結": "Dissolved",
  374. "核准登記": "Active",
  375. "廢止認許": "Dissolved",
  376. "解散": "Dissolved",
  377. "破產": "Bankruptcy",
  378. "撤銷登記": "Dissolved",
  379. "合併解散": "Dissolved (merger or take-over)",
  380. "撤回登記": "Dissolved",
  381. "撤銷公司設立": "Status unknown",
  382. "停業": "Inactive (no precision)",
  383. "破產已清算完結": "Dissolved (bankruptcy)",
  384. "廢止已清算完結": "Dissolved",
  385. "Dissolution / Closed / Deregistration": "Dissolved",
  386. "廢止登記": "Dissolved",
  387. "撤回認許": "Dissolved",
  388. "核准設立,但已命令解散": "Dissolved",
  389. "解散已清算完結": "Dissolved",
  390. "撤銷已清算完結": "Dissolved",
  391. "撤銷": "Dissolved",
  392. "核准設立": "Active",
  393. "Establishment approved": "Active"
  394. }
  395. return mapping.get(status, None)
  396. def clean_ven_website(website):
  397. if not website:
  398. return None
  399. # 1. 过滤掉包含 @ 的网址
  400. if '@' in website:
  401. return None
  402. # 2. 过滤掉没有 `.` 的网址
  403. if '.' not in website:
  404. return None
  405. # 3. 清洗成域名格式,去掉 "http", "www." 并转换为小写
  406. cleaned_url = website.lower()
  407. cleaned_url = re.sub(r'^https?://|^https?//|^https?:\\\\|^https?:', '', cleaned_url) # 去掉 http 或 https
  408. # cleaned_url = cleaned_url.replace('http:\\\\', '')
  409. cleaned_url = re.sub(r'^www\.|^wwww\.|^www\d*\.|^www,|^www//|^www/|^www |^www:|^www', '', cleaned_url) #
  410. if '.' not in cleaned_url:
  411. return None
  412. if not re.search(r'[a-zA-Z]', cleaned_url):
  413. return None
  414. cleaned_url = cleaned_url.replace('; www.antriol.com.ve', '').replace(', www.velastindari.com.ve', '')
  415. return cleaned_url
  416. def common_clean_website(url):
  417. if url:
  418. cleaned_url = url.lower()
  419. # 去除前缀符号
  420. cleaned_url = re.sub(r'^[^a-z0-9]*', '', cleaned_url)
  421. # 去除前缀http
  422. cleaned_url = re.sub(r'^(web)?h?https?[^a-z0-9]*', '', cleaned_url)
  423. # 去除前缀www
  424. cleaned_url = re.sub(r'^www[0-9]*[^a-z0-9]*', '', cleaned_url)
  425. cleaned_url = re.sub(r'^www[^a-z0-9]*', '', cleaned_url)
  426. # 删除匹配符号后的内容
  427. pattern = r'[?&,,/](.*)'
  428. match = re.search(pattern, cleaned_url)
  429. if match:
  430. cleaned_url = cleaned_url[:match.start()]
  431. # 去除后缀符号
  432. cleaned_url = re.sub(r'[^a-z0-9]*$', '', cleaned_url)
  433. # 去除后缀http
  434. cleaned_url = re.sub(r'[^a-z0-9]*https?$', '', cleaned_url)
  435. # 去除后缀www
  436. cleaned_url = re.sub(r'[^a-z0-9]*www$', '', cleaned_url)
  437. if '.' not in cleaned_url:
  438. return None
  439. if '@' in cleaned_url:
  440. return None
  441. if not re.search(r'[a-z]', cleaned_url):
  442. return None
  443. return cleaned_url
  444. return None
  445. def format_state_name(state_name):
  446. if not state_name:
  447. return None
  448. words = state_name.split()
  449. formatted_words = [word.capitalize() for word in words]
  450. return ' '.join(formatted_words)
  451. if __name__ == '__main__':
  452. result = "http://elcore.kr"
  453. print(common_clean_website(result))
  454. pass