#!/usr/bin/env /usr/bin/python3
# -*- coding:utf-8 -*-
import hashlib
import json
import re
from typing import List
from urllib.parse import urlparse
import requests
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def array_to_json(arr: List):
return json.dumps(arr, ensure_ascii=False)
def get_hs_code(arr: List):
url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000'
# 请求参数
params = {
"reportType": "hs_code",
"parameters": {
"sort": "sum_of_money,desc",
"reportName": "海关编码汇总报告"
},
"query": {
"startDate": "2023-01-01",
"endDate": "2023-12-31",
"filterBlank": False,
"filterLogistics": False,
"conditionGroups": [
{
"conditions": [
{
"param": "exporter",
"value": arr
}
]
}
]
},
"sortField": "trades,desc",
"reportName": "海关编码汇总报告"
}
headers = {
'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII'
}
try:
response = requests.post(url=url, json=params, headers=headers)
res = response.json()
if res:
items = res["results"]["items"]
return json.dumps(items, ensure_ascii=False)
return None
except Exception as e:
return None
def get_mdg_code(arr: List):
url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000'
# 请求参数
params = {
"reportType": "country_of_destination_code",
"parameters": {
"sort": "trades,desc",
"reportName": "目的国汇总报告"
},
"query": {
"startDate": "2023-01-01",
"endDate": "2023-12-31",
"filterBlank": False,
"filterLogistics": False,
"conditionGroups": [
{
"conditions": [
{
"param": "exporter",
"value": arr
}
]
}
]
},
"sortField": "trades,desc",
"reportName": "目的国汇总报告"
}
headers = {
'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII'
}
try:
response = requests.post(url=url, json=params, headers=headers)
res = response.json()
if res:
items = res["results"]["items"]
return json.dumps(items, ensure_ascii=False)
return None
except Exception as e:
return None
@udf(returnType=StructType([
StructField("hs_code_4_list", ArrayType(StringType()), False),
StructField("trades_sum_list_str", StringType()),
StructField("trades_sum_total", IntegerType(), False),
StructField("sumOfMoney_sum_total", FloatType(), False),
StructField("sumOfMoney_sum_list_str", StringType()),
StructField("weight_sum_total", FloatType(), False),
StructField("weight_sum_list_str", StringType()),
StructField("quantity_sum_total", FloatType(), False),
StructField("quantity_sum_list_str", StringType())
]))
def get_hs_code_count(content: str):
if content:
content_json_arr = json.loads(content)
result_dict_trades_sum = {}
result_dict_sumOfMoney_sum = {}
result_dict_weight_sum = {}
result_dict_quantity_sum = {}
for item in content_json_arr:
hs_code_4 = item['__gk'][:4]
item_new = {
"hs_code": hs_code_4,
"trades_sum": int(item['trades_sum']),
"sumOfMoney_sum": round(item['sumOfMoney_sum'], 2),
"weight_sum": round(item['weight_sum'], 2),
"quantity_sum": round(item['quantity_sum'], 2)
}
# 处理 trades_sum
if hs_code_4 in result_dict_trades_sum:
existing_item = result_dict_trades_sum[hs_code_4]
existing_item["trades_sum"] += item_new["trades_sum"]
else:
result_dict_trades_sum[hs_code_4] = {"hs_code": hs_code_4, "trades_sum": item_new["trades_sum"]}
# 处理 sumOfMoney_sum
if hs_code_4 in result_dict_sumOfMoney_sum:
existing_item = result_dict_sumOfMoney_sum[hs_code_4]
existing_item["sumOfMoney_sum"] += item_new["sumOfMoney_sum"]
else:
result_dict_sumOfMoney_sum[hs_code_4] = {"hs_code": hs_code_4,
"sumOfMoney_sum": item_new["sumOfMoney_sum"]}
# 处理 weight_sum
if hs_code_4 in result_dict_weight_sum:
existing_item = result_dict_weight_sum[hs_code_4]
existing_item["weight_sum"] += item_new["weight_sum"]
else:
result_dict_weight_sum[hs_code_4] = {"hs_code": hs_code_4, "weight_sum": item_new["weight_sum"]}
# 处理 quantity_sum
if hs_code_4 in result_dict_quantity_sum:
existing_item = result_dict_quantity_sum[hs_code_4]
existing_item["quantity_sum"] += item_new["quantity_sum"]
else:
result_dict_quantity_sum[hs_code_4] = {"hs_code": hs_code_4, "quantity_sum": item_new["quantity_sum"]}
# 对每个列表按照 "trades_sum" 的值降序排序
trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True)
sumOfMoney_sum_list = sorted(list(result_dict_sumOfMoney_sum.values()), key=lambda x: x["sumOfMoney_sum"],
reverse=True)
weight_sum_list = sorted(list(result_dict_weight_sum.values()), key=lambda x: x["weight_sum"], reverse=True)
quantity_sum_list = sorted(list(result_dict_quantity_sum.values()), key=lambda x: x["quantity_sum"],
reverse=True)
# return list(result_dict_trades_sum.values()), list(result_dict_sumOfMoney_sum.values()), list(
# result_dict_weight_sum.values()), list(result_dict_quantity_sum.values())
hs_code_4_list = [obj['hs_code'] for obj in trades_sum_list]
if 'N/A' in hs_code_4_list:
hs_code_4_list.remove('N/A')
total_trades_sum = 0
for obj in trades_sum_list:
total_trades_sum += obj["trades_sum"]
trades_sum_tatal = int(total_trades_sum)
trades_sum_list_str = ",".join(
['{' + f'{i["hs_code"]},贸易次数:{i["trades_sum"]}' + '}' for i in trades_sum_list])
total_sumOfMoney_sum = 0.0
for obj in sumOfMoney_sum_list:
total_sumOfMoney_sum += obj["sumOfMoney_sum"]
sumOfMoney_sum_tatal = round(total_sumOfMoney_sum, 2)
sumOfMoney_sum_list_str = ",".join(
['{' + f'{i["hs_code"]},美元总价:{i["sumOfMoney_sum"]}' + '}' for i in sumOfMoney_sum_list])
total_weight_sum = 0.0
for obj in weight_sum_list:
total_weight_sum += obj["weight_sum"]
weight_sum_tatal = round(total_weight_sum, 2)
weight_sum_list_str = ",".join(
['{' + f'{i["hs_code"]},千克毛重:{i["weight_sum"]}' + '}' for i in weight_sum_list])
total_quantity_sum = 0.0
for obj in quantity_sum_list:
total_quantity_sum += obj["quantity_sum"]
quantity_sum_tatal = round(total_quantity_sum, 2)
quantity_sum_list_str = ",".join(
['{' + f'{i["hs_code"]},数量:{i["quantity_sum"]}' + '}' for i in quantity_sum_list])
return hs_code_4_list, trades_sum_list_str, trades_sum_tatal, sumOfMoney_sum_tatal, sumOfMoney_sum_list_str, weight_sum_tatal, weight_sum_list_str, quantity_sum_tatal, quantity_sum_list_str
return None, None, None, None, None, None, None, None, None
def calculate_total_and_list(data_list, key, unit):
total = sum(item[key] for item in data_list)
formatted_list = ', '.join([f"{{{i['hs_code']},{unit}:{i[key]}}}" for i in data_list])
return total, formatted_list
@udf(returnType=StructType([
StructField("hs_code_4", ArrayType(StringType()), False),
StructField("sum_str", StringType()),
StructField("total", FloatType(), False),
]))
def get_hs_code_count_str(content: str, target_key: str, unit: str):
if content:
content_json_arr = json.loads(content)
result_dict = {}
for item in content_json_arr:
hs_code_4 = item['__gk'][:4]
item_new = {
"hs_code": hs_code_4,
target_key: int(item[target_key]) if target_key == "trades_sum" else round(item[target_key], 2),
}
if hs_code_4 in result_dict:
existing_item = result_dict[hs_code_4]
existing_item[target_key] += item_new[target_key]
else:
result_dict[hs_code_4] = {"hs_code": hs_code_4, target_key: item_new[target_key], "unit": unit}
# 生成结果的列表
hs_code_4_list = [key for key in result_dict.keys() if key != 'N/A']
total, formatted_list = calculate_total_and_list(result_dict.values(), target_key, unit)
return hs_code_4_list, formatted_list, total
return None, None, None
@udf(returnType=StructType([
StructField("des_ctry_code", StringType()),
StructField("sum_str", StringType())
]))
def get_destination_ctry_count(content: str):
result_dict_trades_sum = {}
if content:
content_json_arr = json.loads(content)
for item in content_json_arr:
des_ctry_code = item['__gk']
item_new = {
"des_ctry_code": des_ctry_code,
"trades_sum": int(item['trades_sum']),
}
if des_ctry_code in result_dict_trades_sum:
existing_item = result_dict_trades_sum[des_ctry_code]
existing_item["trades_sum"] += item_new["trades_sum"]
else:
result_dict_trades_sum[des_ctry_code] = {"des_ctry_code": des_ctry_code,
"trades_sum": item_new["trades_sum"]}
trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True)
destination_ctry_5_list = [obj['des_ctry_code'] for obj in trades_sum_list][0:5]
return json.dumps(destination_ctry_5_list, ensure_ascii=False), json.dumps(trades_sum_list, ensure_ascii=False)
return None, None
@udf(returnType=ArrayType(StringType()))
def arr_str_to_arr(json_str: str) -> list:
if json_str:
return json.loads(json_str)
return []
@udf(ArrayType(StringType()))
def array_slice(input_array, start, end):
if input_array:
result_array = input_array[start:end]
return result_array
return []
def get_union_tax_no(tax_no1, tax_no2):
tax_set = set()
if tax_no1:
tax_set.add(tax_no1)
if tax_no2:
try:
tax_no2_arr = json.loads(tax_no2)
if isinstance(tax_no2_arr, list):
tax_set.update(tax_no2_arr)
else:
tax_set.add(tax_no2)
except json.JSONDecodeError:
tax_set.add(tax_no2)
if not tax_set:
return None
return json.dumps(list(tax_set))
@udf(ArrayType(StringType()))
def tran_social_media(social_media):
if social_media:
try:
social_media_arr = json.loads(social_media)
if isinstance(social_media_arr, list):
name_link_map = {
'fb': ('facebook', 'facebook.com'),
'yt': ('youtube', 'youtube.com'),
'li': ('linkedin', 'linkedin.com'),
'gp': ('google', 'google.com'),
'tw': ('twitter', 'twitter.com'),
'eb': ('ebay', 'ebay.com'),
'ig': ('instagram', 'instagram.com'),
'wa': ('whatsapp', 'whatsapp.com'),
'pi': ('pinterest', 'pinterest.com')
}
cleaned_social_media_arr = []
for social_media in social_media_arr:
name = social_media.get('name')
link = social_media.get('link')
if name in name_link_map:
new_name, new_domain = name_link_map[name]
social_media['name'] = new_name
social_media['link'] = link.replace(f'{name}.com', new_domain)
# 如果是whatsapp,处理包含
的情况
if name == 'wa':
phone_numbers = link.split('
')
for number in phone_numbers:
cleaned_social_media_arr.append(json.dumps({
'name': new_name,
'link': number.strip()
}))
else:
cleaned_social_media_arr.append(json.dumps(social_media))
return cleaned_social_media_arr
except json.JSONDecodeError:
return []
return []
@udf(StringType())
def clean_phone_string(input_str):
if not input_str:
return None
# 转英文逗号
cleaned_str = input_str.replace(',', ',')
# 将前后都是数字的逗号替换为空
cleaned_str = re.sub(r'(\d),(\d)', r'\1\2', cleaned_str)
# "
" 转换为英文逗号
cleaned_str = cleaned_str.replace('
', ',').replace('
', ',')
# 去除所有的空格
cleaned_str = cleaned_str.replace(' ', '')
# 去除所有输入法的特殊字符
cleaned_str = re.sub(r'[-()()]', '', cleaned_str)
# 去除“.”
cleaned_str = cleaned_str.replace('.', '')
# "+"号前面若是数字,增加英文逗号
cleaned_str = re.sub(r'(\d)\+', r'\1,+', cleaned_str)
# 去除所有的引号
cleaned_str = cleaned_str.replace('"', '').replace("'", '').replace("‘", '').replace("’", '').replace("“",
'').replace(
"”", '').replace("[", '').replace("]", '')
return json.dumps(cleaned_str.split(','))
@udf(ArrayType(StringType()))
def str_to_json_arr(json_str):
if json_str:
try:
str_arr = json.loads(json_str)
if isinstance(str_arr, list):
return [json.dumps(sm) for sm in str_arr]
except json.JSONDecodeError:
return []
return []
def extract_domain(url):
if not url:
return None
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
domain = urlparse(url).netloc
return domain[4:] if domain.startswith('www.') else domain
except Exception:
return None
def add_company_item(input_string: str, input_list_str: str):
if not input_list_str:
input_list = []
else:
try:
input_list = json.loads(input_list_str)
if not isinstance(input_list, list):
raise ValueError("input_list_str must be a JSON representation of a list")
except json.JSONDecodeError:
raise ValueError("input_list_str is not a valid JSON")
if input_string:
try:
potential_list = json.loads(input_string)
if isinstance(potential_list, list):
input_list.extend(potential_list)
else:
raise ValueError
except (json.JSONDecodeError, ValueError):
input_list.append(input_string)
unique_list = list(set(input_list))
return json.dumps(unique_list)
def taiwan_company_status_mapping(status):
mapping = {
"撤回認許已清算完結": "Dissolved",
"臺中市政府": None, # 特殊值,置为 None
"廢止": "Dissolved",
"撤銷許可": "Status unknown",
"廢止許可": "Dissolved",
"廢止登記已清算完結": "Dissolved",
"核准登記": "Active",
"廢止認許": "Dissolved",
"解散": "Dissolved",
"破產": "Bankruptcy",
"撤銷登記": "Dissolved",
"合併解散": "Dissolved (merger or take-over)",
"撤回登記": "Dissolved",
"撤銷公司設立": "Status unknown",
"停業": "Inactive (no precision)",
"破產已清算完結": "Dissolved (bankruptcy)",
"廢止已清算完結": "Dissolved",
"Dissolution / Closed / Deregistration": "Dissolved",
"廢止登記": "Dissolved",
"撤回認許": "Dissolved",
"核准設立,但已命令解散": "Dissolved",
"解散已清算完結": "Dissolved",
"撤銷已清算完結": "Dissolved",
"撤銷": "Dissolved",
"核准設立": "Active",
"Establishment approved": "Active"
}
return mapping.get(status, None)
def clean_ven_website(website):
if not website:
return None
# 1. 过滤掉包含 @ 的网址
if '@' in website:
return None
# 2. 过滤掉没有 `.` 的网址
if '.' not in website:
return None
# 3. 清洗成域名格式,去掉 "http", "www." 并转换为小写
cleaned_url = website.lower()
cleaned_url = re.sub(r'^https?://|^https?//|^https?:\\\\|^https?:', '', cleaned_url) # 去掉 http 或 https
# cleaned_url = cleaned_url.replace('http:\\\\', '')
cleaned_url = re.sub(r'^www\.|^wwww\.|^www\d*\.|^www,|^www//|^www/|^www |^www:|^www', '', cleaned_url) #
if '.' not in cleaned_url:
return None
if not re.search(r'[a-zA-Z]', cleaned_url):
return None
cleaned_url = cleaned_url.replace('; www.antriol.com.ve', '').replace(', www.velastindari.com.ve', '')
return cleaned_url
def common_clean_website(url):
if url:
cleaned_url = url.lower()
# 去除前缀符号
cleaned_url = re.sub(r'^[^a-z0-9]*', '', cleaned_url)
# 去除前缀http
cleaned_url = re.sub(r'^(web)?h?https?[^a-z0-9]*', '', cleaned_url)
# 去除前缀www
cleaned_url = re.sub(r'^www[0-9]*[^a-z0-9]*', '', cleaned_url)
cleaned_url = re.sub(r'^www[^a-z0-9]*', '', cleaned_url)
# 删除匹配符号后的内容
pattern = r'[?&,,/](.*)'
match = re.search(pattern, cleaned_url)
if match:
cleaned_url = cleaned_url[:match.start()]
# 去除后缀符号
cleaned_url = re.sub(r'[^a-z0-9]*$', '', cleaned_url)
# 去除后缀http
cleaned_url = re.sub(r'[^a-z0-9]*https?$', '', cleaned_url)
# 去除后缀www
cleaned_url = re.sub(r'[^a-z0-9]*www$', '', cleaned_url)
if '.' not in cleaned_url:
return None
if '@' in cleaned_url:
return None
if not re.search(r'[a-z]', cleaned_url):
return None
return cleaned_url
return None
def format_state_name(state_name):
if not state_name:
return None
words = state_name.split()
formatted_words = [word.capitalize() for word in words]
return ' '.join(formatted_words)
if __name__ == '__main__':
result = "http://elcore.kr"
print(common_clean_website(result))
pass