| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- import hashlib
- import json
- import re
- from typing import List
- from urllib.parse import urlparse
- import requests
- from pyspark.sql.functions import udf
- from pyspark.sql.types import *
- def array_to_json(arr: List):
- return json.dumps(arr, ensure_ascii=False)
- def get_hs_code(arr: List):
- url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000'
- # 请求参数
- params = {
- "reportType": "hs_code",
- "parameters": {
- "sort": "sum_of_money,desc",
- "reportName": "海关编码汇总报告"
- },
- "query": {
- "startDate": "2023-01-01",
- "endDate": "2023-12-31",
- "filterBlank": False,
- "filterLogistics": False,
- "conditionGroups": [
- {
- "conditions": [
- {
- "param": "exporter",
- "value": arr
- }
- ]
- }
- ]
- },
- "sortField": "trades,desc",
- "reportName": "海关编码汇总报告"
- }
- headers = {
- 'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII'
- }
- try:
- response = requests.post(url=url, json=params, headers=headers)
- res = response.json()
- if res:
- items = res["results"]["items"]
- return json.dumps(items, ensure_ascii=False)
- return None
- except Exception as e:
- return None
- def get_mdg_code(arr: List):
- url = 'https://api.tendata.cn/data/customs/v1/imports/china_stat,panama,kenya,uganda,liberia,botswana,lesotho,namibia,south_africa_stat,new_zealand,australia,ivory_coast,turkey,thailand,venezuela_bol,moldova,costarica,nigeria,indonesia_stat,russia_rail,canada_stat,honduras,hongkong_stat,fiji,zimbabwe,ghana,cameroon,chad,honduras_stat,central_african_republic,maritime_silk_bol,burundi,eurasian_eu,taiwan_stat,ecuador_bol,tanzania,tanzania_tboe,bolivia_stat,spain_co,mexico,rwanda,malawi,congo_kinshasa,south_korea_co,england_stat,angola_stat,mexico_bol,nicaragua,canada,salvador_stat,salvador,guatemala,argentina,america,paraguay,brazil_stat,brazil,brazil_bol,peru,peru_exp,bolivia,ecuador,colombia,venezuela,uruguay,america_stat,chile,russia,ukraine,england,spain,european_union,eurasian_bol,cis,pakistan,pakistan_bol,south_korea,south_korea_stat,india,india_exp,vietnam,taiwan,philippines,dominica,philippines_stat,kazakhstan,kyrghyzstan,sri_lanka,uzbekistan,indonesia,japan,bangladesh,turkey_stat,thailand_stat,ethiopia/report?page=1&size=100000'
- # 请求参数
- params = {
- "reportType": "country_of_destination_code",
- "parameters": {
- "sort": "trades,desc",
- "reportName": "目的国汇总报告"
- },
- "query": {
- "startDate": "2023-01-01",
- "endDate": "2023-12-31",
- "filterBlank": False,
- "filterLogistics": False,
- "conditionGroups": [
- {
- "conditions": [
- {
- "param": "exporter",
- "value": arr
- }
- ]
- }
- ]
- },
- "sortField": "trades,desc",
- "reportName": "目的国汇总报告"
- }
- headers = {
- 'x-api-key': 'l0EEiokwMKLywfwbW08ESzCzea1egvMreXmehIII'
- }
- try:
- response = requests.post(url=url, json=params, headers=headers)
- res = response.json()
- if res:
- items = res["results"]["items"]
- return json.dumps(items, ensure_ascii=False)
- return None
- except Exception as e:
- return None
- @udf(returnType=StructType([
- StructField("hs_code_4_list", ArrayType(StringType()), False),
- StructField("trades_sum_list_str", StringType()),
- StructField("trades_sum_total", IntegerType(), False),
- StructField("sumOfMoney_sum_total", FloatType(), False),
- StructField("sumOfMoney_sum_list_str", StringType()),
- StructField("weight_sum_total", FloatType(), False),
- StructField("weight_sum_list_str", StringType()),
- StructField("quantity_sum_total", FloatType(), False),
- StructField("quantity_sum_list_str", StringType())
- ]))
- def get_hs_code_count(content: str):
- if content:
- content_json_arr = json.loads(content)
- result_dict_trades_sum = {}
- result_dict_sumOfMoney_sum = {}
- result_dict_weight_sum = {}
- result_dict_quantity_sum = {}
- for item in content_json_arr:
- hs_code_4 = item['__gk'][:4]
- item_new = {
- "hs_code": hs_code_4,
- "trades_sum": int(item['trades_sum']),
- "sumOfMoney_sum": round(item['sumOfMoney_sum'], 2),
- "weight_sum": round(item['weight_sum'], 2),
- "quantity_sum": round(item['quantity_sum'], 2)
- }
- # 处理 trades_sum
- if hs_code_4 in result_dict_trades_sum:
- existing_item = result_dict_trades_sum[hs_code_4]
- existing_item["trades_sum"] += item_new["trades_sum"]
- else:
- result_dict_trades_sum[hs_code_4] = {"hs_code": hs_code_4, "trades_sum": item_new["trades_sum"]}
- # 处理 sumOfMoney_sum
- if hs_code_4 in result_dict_sumOfMoney_sum:
- existing_item = result_dict_sumOfMoney_sum[hs_code_4]
- existing_item["sumOfMoney_sum"] += item_new["sumOfMoney_sum"]
- else:
- result_dict_sumOfMoney_sum[hs_code_4] = {"hs_code": hs_code_4,
- "sumOfMoney_sum": item_new["sumOfMoney_sum"]}
- # 处理 weight_sum
- if hs_code_4 in result_dict_weight_sum:
- existing_item = result_dict_weight_sum[hs_code_4]
- existing_item["weight_sum"] += item_new["weight_sum"]
- else:
- result_dict_weight_sum[hs_code_4] = {"hs_code": hs_code_4, "weight_sum": item_new["weight_sum"]}
- # 处理 quantity_sum
- if hs_code_4 in result_dict_quantity_sum:
- existing_item = result_dict_quantity_sum[hs_code_4]
- existing_item["quantity_sum"] += item_new["quantity_sum"]
- else:
- result_dict_quantity_sum[hs_code_4] = {"hs_code": hs_code_4, "quantity_sum": item_new["quantity_sum"]}
- # 对每个列表按照 "trades_sum" 的值降序排序
- trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True)
- sumOfMoney_sum_list = sorted(list(result_dict_sumOfMoney_sum.values()), key=lambda x: x["sumOfMoney_sum"],
- reverse=True)
- weight_sum_list = sorted(list(result_dict_weight_sum.values()), key=lambda x: x["weight_sum"], reverse=True)
- quantity_sum_list = sorted(list(result_dict_quantity_sum.values()), key=lambda x: x["quantity_sum"],
- reverse=True)
- # return list(result_dict_trades_sum.values()), list(result_dict_sumOfMoney_sum.values()), list(
- # result_dict_weight_sum.values()), list(result_dict_quantity_sum.values())
- hs_code_4_list = [obj['hs_code'] for obj in trades_sum_list]
- if 'N/A' in hs_code_4_list:
- hs_code_4_list.remove('N/A')
- total_trades_sum = 0
- for obj in trades_sum_list:
- total_trades_sum += obj["trades_sum"]
- trades_sum_tatal = int(total_trades_sum)
- trades_sum_list_str = ",".join(
- ['{' + f'{i["hs_code"]},贸易次数:{i["trades_sum"]}' + '}' for i in trades_sum_list])
- total_sumOfMoney_sum = 0.0
- for obj in sumOfMoney_sum_list:
- total_sumOfMoney_sum += obj["sumOfMoney_sum"]
- sumOfMoney_sum_tatal = round(total_sumOfMoney_sum, 2)
- sumOfMoney_sum_list_str = ",".join(
- ['{' + f'{i["hs_code"]},美元总价:{i["sumOfMoney_sum"]}' + '}' for i in sumOfMoney_sum_list])
- total_weight_sum = 0.0
- for obj in weight_sum_list:
- total_weight_sum += obj["weight_sum"]
- weight_sum_tatal = round(total_weight_sum, 2)
- weight_sum_list_str = ",".join(
- ['{' + f'{i["hs_code"]},千克毛重:{i["weight_sum"]}' + '}' for i in weight_sum_list])
- total_quantity_sum = 0.0
- for obj in quantity_sum_list:
- total_quantity_sum += obj["quantity_sum"]
- quantity_sum_tatal = round(total_quantity_sum, 2)
- quantity_sum_list_str = ",".join(
- ['{' + f'{i["hs_code"]},数量:{i["quantity_sum"]}' + '}' for i in quantity_sum_list])
- return hs_code_4_list, trades_sum_list_str, trades_sum_tatal, sumOfMoney_sum_tatal, sumOfMoney_sum_list_str, weight_sum_tatal, weight_sum_list_str, quantity_sum_tatal, quantity_sum_list_str
- return None, None, None, None, None, None, None, None, None
- def calculate_total_and_list(data_list, key, unit):
- total = sum(item[key] for item in data_list)
- formatted_list = ', '.join([f"{{{i['hs_code']},{unit}:{i[key]}}}" for i in data_list])
- return total, formatted_list
- @udf(returnType=StructType([
- StructField("hs_code_4", ArrayType(StringType()), False),
- StructField("sum_str", StringType()),
- StructField("total", FloatType(), False),
- ]))
- def get_hs_code_count_str(content: str, target_key: str, unit: str):
- if content:
- content_json_arr = json.loads(content)
- result_dict = {}
- for item in content_json_arr:
- hs_code_4 = item['__gk'][:4]
- item_new = {
- "hs_code": hs_code_4,
- target_key: int(item[target_key]) if target_key == "trades_sum" else round(item[target_key], 2),
- }
- if hs_code_4 in result_dict:
- existing_item = result_dict[hs_code_4]
- existing_item[target_key] += item_new[target_key]
- else:
- result_dict[hs_code_4] = {"hs_code": hs_code_4, target_key: item_new[target_key], "unit": unit}
- # 生成结果的列表
- hs_code_4_list = [key for key in result_dict.keys() if key != 'N/A']
- total, formatted_list = calculate_total_and_list(result_dict.values(), target_key, unit)
- return hs_code_4_list, formatted_list, total
- return None, None, None
- @udf(returnType=StructType([
- StructField("des_ctry_code", StringType()),
- StructField("sum_str", StringType())
- ]))
- def get_destination_ctry_count(content: str):
- result_dict_trades_sum = {}
- if content:
- content_json_arr = json.loads(content)
- for item in content_json_arr:
- des_ctry_code = item['__gk']
- item_new = {
- "des_ctry_code": des_ctry_code,
- "trades_sum": int(item['trades_sum']),
- }
- if des_ctry_code in result_dict_trades_sum:
- existing_item = result_dict_trades_sum[des_ctry_code]
- existing_item["trades_sum"] += item_new["trades_sum"]
- else:
- result_dict_trades_sum[des_ctry_code] = {"des_ctry_code": des_ctry_code,
- "trades_sum": item_new["trades_sum"]}
- trades_sum_list = sorted(list(result_dict_trades_sum.values()), key=lambda x: x["trades_sum"], reverse=True)
- destination_ctry_5_list = [obj['des_ctry_code'] for obj in trades_sum_list][0:5]
- return json.dumps(destination_ctry_5_list, ensure_ascii=False), json.dumps(trades_sum_list, ensure_ascii=False)
- return None, None
- @udf(returnType=ArrayType(StringType()))
- def arr_str_to_arr(json_str: str) -> list:
- if json_str:
- return json.loads(json_str)
- return []
- @udf(ArrayType(StringType()))
- def array_slice(input_array, start, end):
- if input_array:
- result_array = input_array[start:end]
- return result_array
- return []
- def get_union_tax_no(tax_no1, tax_no2):
- tax_set = set()
- if tax_no1:
- tax_set.add(tax_no1)
- if tax_no2:
- try:
- tax_no2_arr = json.loads(tax_no2)
- if isinstance(tax_no2_arr, list):
- tax_set.update(tax_no2_arr)
- else:
- tax_set.add(tax_no2)
- except json.JSONDecodeError:
- tax_set.add(tax_no2)
- if not tax_set:
- return None
- return json.dumps(list(tax_set))
- @udf(ArrayType(StringType()))
- def tran_social_media(social_media):
- if social_media:
- try:
- social_media_arr = json.loads(social_media)
- if isinstance(social_media_arr, list):
- name_link_map = {
- 'fb': ('facebook', 'facebook.com'),
- 'yt': ('youtube', 'youtube.com'),
- 'li': ('linkedin', 'linkedin.com'),
- 'gp': ('google', 'google.com'),
- 'tw': ('twitter', 'twitter.com'),
- 'eb': ('ebay', 'ebay.com'),
- 'ig': ('instagram', 'instagram.com'),
- 'wa': ('whatsapp', 'whatsapp.com'),
- 'pi': ('pinterest', 'pinterest.com')
- }
- cleaned_social_media_arr = []
- for social_media in social_media_arr:
- name = social_media.get('name')
- link = social_media.get('link')
- if name in name_link_map:
- new_name, new_domain = name_link_map[name]
- social_media['name'] = new_name
- social_media['link'] = link.replace(f'{name}.com', new_domain)
- # 如果是whatsapp,处理包含<br>的情况
- if name == 'wa':
- phone_numbers = link.split('<br>')
- for number in phone_numbers:
- cleaned_social_media_arr.append(json.dumps({
- 'name': new_name,
- 'link': number.strip()
- }))
- else:
- cleaned_social_media_arr.append(json.dumps(social_media))
- return cleaned_social_media_arr
- except json.JSONDecodeError:
- return []
- return []
- @udf(StringType())
- def clean_phone_string(input_str):
- if not input_str:
- return None
- # 转英文逗号
- cleaned_str = input_str.replace(',', ',')
- # 将前后都是数字的逗号替换为空
- cleaned_str = re.sub(r'(\d),(\d)', r'\1\2', cleaned_str)
- # "<br>" 转换为英文逗号
- cleaned_str = cleaned_str.replace('<br>', ',').replace('<br', ',').replace('br>', ',')
- # 去除所有的空格
- cleaned_str = cleaned_str.replace(' ', '')
- # 去除所有输入法的特殊字符
- cleaned_str = re.sub(r'[-()()]', '', cleaned_str)
- # 去除“.”
- cleaned_str = cleaned_str.replace('.', '')
- # "+"号前面若是数字,增加英文逗号
- cleaned_str = re.sub(r'(\d)\+', r'\1,+', cleaned_str)
- # 去除所有的引号
- cleaned_str = cleaned_str.replace('"', '').replace("'", '').replace("‘", '').replace("’", '').replace("“",
- '').replace(
- "”", '').replace("[", '').replace("]", '')
- return json.dumps(cleaned_str.split(','))
- @udf(ArrayType(StringType()))
- def str_to_json_arr(json_str):
- if json_str:
- try:
- str_arr = json.loads(json_str)
- if isinstance(str_arr, list):
- return [json.dumps(sm) for sm in str_arr]
- except json.JSONDecodeError:
- return []
- return []
- def extract_domain(url):
- if not url:
- return None
- if not url.startswith(('http://', 'https://')):
- url = 'http://' + url
- try:
- domain = urlparse(url).netloc
- return domain[4:] if domain.startswith('www.') else domain
- except Exception:
- return None
- def add_company_item(input_string: str, input_list_str: str):
- if not input_list_str:
- input_list = []
- else:
- try:
- input_list = json.loads(input_list_str)
- if not isinstance(input_list, list):
- raise ValueError("input_list_str must be a JSON representation of a list")
- except json.JSONDecodeError:
- raise ValueError("input_list_str is not a valid JSON")
- if input_string:
- try:
- potential_list = json.loads(input_string)
- if isinstance(potential_list, list):
- input_list.extend(potential_list)
- else:
- raise ValueError
- except (json.JSONDecodeError, ValueError):
- input_list.append(input_string)
- unique_list = list(set(input_list))
- return json.dumps(unique_list)
- def taiwan_company_status_mapping(status):
- mapping = {
- "撤回認許已清算完結": "Dissolved",
- "臺中市政府": None, # 特殊值,置为 None
- "廢止": "Dissolved",
- "撤銷許可": "Status unknown",
- "廢止許可": "Dissolved",
- "廢止登記已清算完結": "Dissolved",
- "核准登記": "Active",
- "廢止認許": "Dissolved",
- "解散": "Dissolved",
- "破產": "Bankruptcy",
- "撤銷登記": "Dissolved",
- "合併解散": "Dissolved (merger or take-over)",
- "撤回登記": "Dissolved",
- "撤銷公司設立": "Status unknown",
- "停業": "Inactive (no precision)",
- "破產已清算完結": "Dissolved (bankruptcy)",
- "廢止已清算完結": "Dissolved",
- "Dissolution / Closed / Deregistration": "Dissolved",
- "廢止登記": "Dissolved",
- "撤回認許": "Dissolved",
- "核准設立,但已命令解散": "Dissolved",
- "解散已清算完結": "Dissolved",
- "撤銷已清算完結": "Dissolved",
- "撤銷": "Dissolved",
- "核准設立": "Active",
- "Establishment approved": "Active"
- }
- return mapping.get(status, None)
- def clean_ven_website(website):
- if not website:
- return None
- # 1. 过滤掉包含 @ 的网址
- if '@' in website:
- return None
- # 2. 过滤掉没有 `.` 的网址
- if '.' not in website:
- return None
- # 3. 清洗成域名格式,去掉 "http", "www." 并转换为小写
- cleaned_url = website.lower()
- cleaned_url = re.sub(r'^https?://|^https?//|^https?:\\\\|^https?:', '', cleaned_url) # 去掉 http 或 https
- # cleaned_url = cleaned_url.replace('http:\\\\', '')
- cleaned_url = re.sub(r'^www\.|^wwww\.|^www\d*\.|^www,|^www//|^www/|^www |^www:|^www', '', cleaned_url) #
- if '.' not in cleaned_url:
- return None
- if not re.search(r'[a-zA-Z]', cleaned_url):
- return None
- cleaned_url = cleaned_url.replace('; www.antriol.com.ve', '').replace(', www.velastindari.com.ve', '')
- return cleaned_url
- def common_clean_website(url):
- if url:
- cleaned_url = url.lower()
- # 去除前缀符号
- cleaned_url = re.sub(r'^[^a-z0-9]*', '', cleaned_url)
- # 去除前缀http
- cleaned_url = re.sub(r'^(web)?h?https?[^a-z0-9]*', '', cleaned_url)
- # 去除前缀www
- cleaned_url = re.sub(r'^www[0-9]*[^a-z0-9]*', '', cleaned_url)
- cleaned_url = re.sub(r'^www[^a-z0-9]*', '', cleaned_url)
- # 删除匹配符号后的内容
- pattern = r'[?&,,/](.*)'
- match = re.search(pattern, cleaned_url)
- if match:
- cleaned_url = cleaned_url[:match.start()]
- # 去除后缀符号
- cleaned_url = re.sub(r'[^a-z0-9]*$', '', cleaned_url)
- # 去除后缀http
- cleaned_url = re.sub(r'[^a-z0-9]*https?$', '', cleaned_url)
- # 去除后缀www
- cleaned_url = re.sub(r'[^a-z0-9]*www$', '', cleaned_url)
- if '.' not in cleaned_url:
- return None
- if '@' in cleaned_url:
- return None
- if not re.search(r'[a-z]', cleaned_url):
- return None
- return cleaned_url
- return None
- def format_state_name(state_name):
- if not state_name:
- return None
- words = state_name.split()
- formatted_words = [word.capitalize() for word in words]
- return ' '.join(formatted_words)
- if __name__ == '__main__':
- result = "http://elcore.kr"
- print(common_clean_website(result))
- pass
|