#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- import hashlib import json import re from typing import List from urllib.parse import urlparse import requests from pyspark.sql.functions import udf from pyspark.sql.types import * def array_to_json(arr: List): return json.dumps(arr, ensure_ascii=False) def get_hs_code(arr: List): url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000' # 请求参数 params = { "reportType": "hs_code", "parameters": { "sort": "sum_of_money,desc", "reportName": "海关编码汇总报告" }, "query": { "startDate": "2023-01-01", "endDate": "2023-12-31", "filterBlank": False, "filterLogistics": False, "conditionGroups": [ { "conditions": [ { "param": "exporter", "value": arr } ] } ] }, "sortField": "trades,desc", "reportName": "海关编码汇总报告" } headers = { 'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII' } try: response = requests.post(url=url, json=params, headers=headers) res = response.json() if res: items = res["results"]["items"] return json.dumps(items, ensure_ascii=False) return None except Exception as e: return None def get_mdg_code(arr: List): url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000' # 请求参数 params = { "reportType": "country_of_destination_code", "parameters": { "sort": "trades,desc", "reportName": "目的国汇总报告" }, "query": { "startDate": "2023-01-01", "endDate": "2023-12-31", "filterBlank": False, "filterLogistics": False, "conditionGroups": [ { "conditions": [ { "param": "exporter", "value": arr } ] } ] }, "sortField": "trades,desc", "reportName": "目的国汇总报告" } headers = { 'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII' } try: response = requests.post(url=url, json=params, headers=headers) res = response.json() if res: items = res["results"]["items"] return json.dumps(items, ensure_ascii=False) return None except Exception as e: return None @udf(returnType=StructType([ StructField("hs_code_4_list", ArrayType(StringType()), False), StructField("trades_sum_list_str", StringType()), StructField("trades_sum_total", IntegerType(), False), StructField("sumOfMoney_sum_total", FloatType(), False), StructField("sumOfMoney_sum_list_str", StringType()), StructField("weight_sum_total", FloatType(), False), StructField("weight_sum_list_str", StringType()), StructField("quantity_sum_total", FloatType(), False), StructField("quantity_sum_list_str", StringType()) ])) def get_hs_code_count(content: str): if content: content_json_arr = json.loads(content) result_dict_trades_sum = {} result_dict_sumOfMoney_sum = {} result_dict_weight_sum = {} result_dict_quantity_sum = {} for item in content_json_arr: hs_code_4 = item['__gk'][:4] item_new = { "hs_code": hs_code_4, "trades_sum": int(item['trades_sum']), "sumOfMoney_sum": round(item['sumOfMoney_sum'], 2), "weight_sum": round(item['weight_sum'], 2), "quantity_sum": round(item['quantity_sum'], 2) } # 处理 trades_sum if hs_code_4 in result_dict_trades_sum: existing_item = result_dict_trades_sum[hs_code_4] existing_item["trades_sum"] += item_new["trades_sum"] else: result_dict_trades_sum[hs_code_4] = {"hs_code": hs_code_4, "trades_sum": item_new["trades_sum"]} # 处理 sumOfMoney_sum if hs_code_4 in result_dict_sumOfMoney_sum: existing_item = result_dict_sumOfMoney_sum[hs_code_4] existing_item["sumOfMoney_sum"] += item_new["sumOfMoney_sum"] else: result_dict_sumOfMoney_sum[hs_code_4] = {"hs_code": hs_code_4, "sumOfMoney_sum": item_new["sumOfMoney_sum"]} # 处理 weight_sum if hs_code_4 in result_dict_weight_sum: existing_item = result_dict_weight_sum[hs_code_4] existing_item["weight_sum"] += item_new["weight_sum"] else: result_dict_weight_sum[hs_code_4] = {"hs_code": hs_code_4, "weight_sum": item_new["weight_sum"]} # 处理 quantity_sum if hs_code_4 in result_dict_quantity_sum: existing_item = result_dict_quantity_sum[hs_code_4] existing_item["quantity_sum"] += item_new["quantity_sum"] else: result_dict_quantity_sum[hs_code_4] = {"hs_code": hs_code_4, "quantity_sum": item_new["quantity_sum"]} # 对每个列表按照 "trades_sum" 的值降序排序 trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True) sumOfMoney_sum_list = sorted(list(result_dict_sumOfMoney_sum.values()), key=lambda x: x["sumOfMoney_sum"], reverse=True) weight_sum_list = sorted(list(result_dict_weight_sum.values()), key=lambda x: x["weight_sum"], reverse=True) quantity_sum_list = sorted(list(result_dict_quantity_sum.values()), key=lambda x: x["quantity_sum"], reverse=True) # return list(result_dict_trades_sum.values()), list(result_dict_sumOfMoney_sum.values()), list( # result_dict_weight_sum.values()), list(result_dict_quantity_sum.values()) hs_code_4_list = [obj['hs_code'] for obj in trades_sum_list] if 'N/A' in hs_code_4_list: hs_code_4_list.remove('N/A') total_trades_sum = 0 for obj in trades_sum_list: total_trades_sum += obj["trades_sum"] trades_sum_tatal = int(total_trades_sum) trades_sum_list_str = ",".join( ['{' + f'{i["hs_code"]},贸易次数:{i["trades_sum"]}' + '}' for i in trades_sum_list]) total_sumOfMoney_sum = 0.0 for obj in sumOfMoney_sum_list: total_sumOfMoney_sum += obj["sumOfMoney_sum"] sumOfMoney_sum_tatal = round(total_sumOfMoney_sum, 2) sumOfMoney_sum_list_str = ",".join( ['{' + f'{i["hs_code"]},美元总价:{i["sumOfMoney_sum"]}' + '}' for i in sumOfMoney_sum_list]) total_weight_sum = 0.0 for obj in weight_sum_list: total_weight_sum += obj["weight_sum"] weight_sum_tatal = round(total_weight_sum, 2) weight_sum_list_str = ",".join( ['{' + f'{i["hs_code"]},千克毛重:{i["weight_sum"]}' + '}' for i in weight_sum_list]) total_quantity_sum = 0.0 for obj in quantity_sum_list: total_quantity_sum += obj["quantity_sum"] quantity_sum_tatal = round(total_quantity_sum, 2) quantity_sum_list_str = ",".join( ['{' + f'{i["hs_code"]},数量:{i["quantity_sum"]}' + '}' for i in quantity_sum_list]) return hs_code_4_list, trades_sum_list_str, trades_sum_tatal, sumOfMoney_sum_tatal, sumOfMoney_sum_list_str, weight_sum_tatal, weight_sum_list_str, quantity_sum_tatal, quantity_sum_list_str return None, None, None, None, None, None, None, None, None def calculate_total_and_list(data_list, key, unit): total = sum(item[key] for item in data_list) formatted_list = ', '.join([f"{{{i['hs_code']},{unit}:{i[key]}}}" for i in data_list]) return total, formatted_list @udf(returnType=StructType([ StructField("hs_code_4", ArrayType(StringType()), False), StructField("sum_str", StringType()), StructField("total", FloatType(), False), ])) def get_hs_code_count_str(content: str, target_key: str, unit: str): if content: content_json_arr = json.loads(content) result_dict = {} for item in content_json_arr: hs_code_4 = item['__gk'][:4] item_new = { "hs_code": hs_code_4, target_key: int(item[target_key]) if target_key == "trades_sum" else round(item[target_key], 2), } if hs_code_4 in result_dict: existing_item = result_dict[hs_code_4] existing_item[target_key] += item_new[target_key] else: result_dict[hs_code_4] = {"hs_code": hs_code_4, target_key: item_new[target_key], "unit": unit} # 生成结果的列表 hs_code_4_list = [key for key in result_dict.keys() if key != 'N/A'] total, formatted_list = calculate_total_and_list(result_dict.values(), target_key, unit) return hs_code_4_list, formatted_list, total return None, None, None @udf(returnType=StructType([ StructField("des_ctry_code", StringType()), StructField("sum_str", StringType()) ])) def get_destination_ctry_count(content: str): result_dict_trades_sum = {} if content: content_json_arr = json.loads(content) for item in content_json_arr: des_ctry_code = item['__gk'] item_new = { "des_ctry_code": des_ctry_code, "trades_sum": int(item['trades_sum']), } if des_ctry_code in result_dict_trades_sum: existing_item = result_dict_trades_sum[des_ctry_code] existing_item["trades_sum"] += item_new["trades_sum"] else: result_dict_trades_sum[des_ctry_code] = {"des_ctry_code": des_ctry_code, "trades_sum": item_new["trades_sum"]} trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True) destination_ctry_5_list = [obj['des_ctry_code'] for obj in trades_sum_list][0:5] return json.dumps(destination_ctry_5_list, ensure_ascii=False), json.dumps(trades_sum_list, ensure_ascii=False) return None, None @udf(returnType=ArrayType(StringType())) def arr_str_to_arr(json_str: str) -> list: if json_str: return json.loads(json_str) return [] @udf(ArrayType(StringType())) def array_slice(input_array, start, end): if input_array: result_array = input_array[start:end] return result_array return [] def get_union_tax_no(tax_no1, tax_no2): tax_set = set() if tax_no1: tax_set.add(tax_no1) if tax_no2: try: tax_no2_arr = json.loads(tax_no2) if isinstance(tax_no2_arr, list): tax_set.update(tax_no2_arr) else: tax_set.add(tax_no2) except json.JSONDecodeError: tax_set.add(tax_no2) if not tax_set: return None return json.dumps(list(tax_set)) @udf(ArrayType(StringType())) def tran_social_media(social_media): if social_media: try: social_media_arr = json.loads(social_media) if isinstance(social_media_arr, list): name_link_map = { 'fb': ('facebook', 'facebook.com'), 'yt': ('youtube', 'youtube.com'), 'li': ('linkedin', 'linkedin.com'), 'gp': ('google', 'google.com'), 'tw': ('twitter', 'twitter.com'), 'eb': ('ebay', 'ebay.com'), 'ig': ('instagram', 'instagram.com'), 'wa': ('whatsapp', 'whatsapp.com'), 'pi': ('pinterest', 'pinterest.com') } cleaned_social_media_arr = [] for social_media in social_media_arr: name = social_media.get('name') link = social_media.get('link') if name in name_link_map: new_name, new_domain = name_link_map[name] social_media['name'] = new_name social_media['link'] = link.replace(f'{name}.com', new_domain) # 如果是whatsapp,处理包含
的情况 if name == 'wa': phone_numbers = link.split('
') for number in phone_numbers: cleaned_social_media_arr.append(json.dumps({ 'name': new_name, 'link': number.strip() })) else: cleaned_social_media_arr.append(json.dumps(social_media)) return cleaned_social_media_arr except json.JSONDecodeError: return [] return [] @udf(StringType()) def clean_phone_string(input_str): if not input_str: return None # 转英文逗号 cleaned_str = input_str.replace(',', ',') # 将前后都是数字的逗号替换为空 cleaned_str = re.sub(r'(\d),(\d)', r'\1\2', cleaned_str) # "
" 转换为英文逗号 cleaned_str = cleaned_str.replace('
', ',').replace('', ',') # 去除所有的空格 cleaned_str = cleaned_str.replace(' ', '') # 去除所有输入法的特殊字符 cleaned_str = re.sub(r'[-()()]', '', cleaned_str) # 去除“.” cleaned_str = cleaned_str.replace('.', '') # "+"号前面若是数字,增加英文逗号 cleaned_str = re.sub(r'(\d)\+', r'\1,+', cleaned_str) # 去除所有的引号 cleaned_str = cleaned_str.replace('"', '').replace("'", '').replace("‘", '').replace("’", '').replace("“", '').replace( "”", '').replace("[", '').replace("]", '') return json.dumps(cleaned_str.split(',')) @udf(ArrayType(StringType())) def str_to_json_arr(json_str): if json_str: try: str_arr = json.loads(json_str) if isinstance(str_arr, list): return [json.dumps(sm) for sm in str_arr] except json.JSONDecodeError: return [] return [] def extract_domain(url): if not url: return None if not url.startswith(('http://', 'https://')): url = 'http://' + url try: domain = urlparse(url).netloc return domain[4:] if domain.startswith('www.') else domain except Exception: return None def add_company_item(input_string: str, input_list_str: str): if not input_list_str: input_list = [] else: try: input_list = json.loads(input_list_str) if not isinstance(input_list, list): raise ValueError("input_list_str must be a JSON representation of a list") except json.JSONDecodeError: raise ValueError("input_list_str is not a valid JSON") if input_string: try: potential_list = json.loads(input_string) if isinstance(potential_list, list): input_list.extend(potential_list) else: raise ValueError except (json.JSONDecodeError, ValueError): input_list.append(input_string) unique_list = list(set(input_list)) return json.dumps(unique_list) def taiwan_company_status_mapping(status): mapping = { "撤回認許已清算完結": "Dissolved", "臺中市政府": None, # 特殊值,置为 None "廢止": "Dissolved", "撤銷許可": "Status unknown", "廢止許可": "Dissolved", "廢止登記已清算完結": "Dissolved", "核准登記": "Active", "廢止認許": "Dissolved", "解散": "Dissolved", "破產": "Bankruptcy", "撤銷登記": "Dissolved", "合併解散": "Dissolved (merger or take-over)", "撤回登記": "Dissolved", "撤銷公司設立": "Status unknown", "停業": "Inactive (no precision)", "破產已清算完結": "Dissolved (bankruptcy)", "廢止已清算完結": "Dissolved", "Dissolution / Closed / Deregistration": "Dissolved", "廢止登記": "Dissolved", "撤回認許": "Dissolved", "核准設立,但已命令解散": "Dissolved", "解散已清算完結": "Dissolved", "撤銷已清算完結": "Dissolved", "撤銷": "Dissolved", "核准設立": "Active", "Establishment approved": "Active" } return mapping.get(status, None) def clean_ven_website(website): if not website: return None # 1. 过滤掉包含 @ 的网址 if '@' in website: return None # 2. 过滤掉没有 `.` 的网址 if '.' not in website: return None # 3. 清洗成域名格式,去掉 "http", "www." 并转换为小写 cleaned_url = website.lower() cleaned_url = re.sub(r'^https?://|^https?//|^https?:\\\\|^https?:', '', cleaned_url) # 去掉 http 或 https # cleaned_url = cleaned_url.replace('http:\\\\', '') cleaned_url = re.sub(r'^www\.|^wwww\.|^www\d*\.|^www,|^www//|^www/|^www |^www:|^www', '', cleaned_url) # if '.' not in cleaned_url: return None if not re.search(r'[a-zA-Z]', cleaned_url): return None cleaned_url = cleaned_url.replace('; www.antriol.com.ve', '').replace(', www.velastindari.com.ve', '') return cleaned_url def common_clean_website(url): if url: cleaned_url = url.lower() # 去除前缀符号 cleaned_url = re.sub(r'^[^a-z0-9]*', '', cleaned_url) # 去除前缀http cleaned_url = re.sub(r'^(web)?h?https?[^a-z0-9]*', '', cleaned_url) # 去除前缀www cleaned_url = re.sub(r'^www[0-9]*[^a-z0-9]*', '', cleaned_url) cleaned_url = re.sub(r'^www[^a-z0-9]*', '', cleaned_url) # 删除匹配符号后的内容 pattern = r'[?&,,/](.*)' match = re.search(pattern, cleaned_url) if match: cleaned_url = cleaned_url[:match.start()] # 去除后缀符号 cleaned_url = re.sub(r'[^a-z0-9]*$', '', cleaned_url) # 去除后缀http cleaned_url = re.sub(r'[^a-z0-9]*https?$', '', cleaned_url) # 去除后缀www cleaned_url = re.sub(r'[^a-z0-9]*www$', '', cleaned_url) if '.' not in cleaned_url: return None if '@' in cleaned_url: return None if not re.search(r'[a-z]', cleaned_url): return None return cleaned_url return None def format_state_name(state_name): if not state_name: return None words = state_name.split() formatted_words = [word.capitalize() for word in words] return ' '.join(formatted_words) if __name__ == '__main__': result = "http://elcore.kr" print(common_clean_website(result)) pass