| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/8/26 10:47
- import inspect
- import random
- import time
- import requests
- import user_agent
- from loguru import logger
- from parsel import Selector
- from urllib.parse import quote
- from tenacity import retry, stop_after_attempt, wait_fixed
- from mysql_pool import MySQLConnectionPool
- from wx_pokemon_aes_tool import pokemon_aes_encrypt, pokemon_aes_decrypt, api_sign
- max_page = 1000
- crawler_language = "简中"
- logger.remove()
- logger.add("./logs/jian_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_parent_single_page(log, page, sql_pool):
- """
- 单页请求
- :param log:
- :param page:
- :param sql_pool:
- :return: decrypted_res -> 解密后的数据
- """
- log.debug(f'Request {inspect.currentframe().f_code.co_name} for page: {page} .................')
- par = {"pageNum": str(page), "pageSize": "20"}
- sign_result = api_sign(
- timeout=1221,
- user_token="",
- params=par
- )
- req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"])
- headers = {
- "Content-Type": "application/json",
- "User-Agent": user_agent.generate_user_agent(),
- "Api-Access-Token": "",
- "Nonce": str(sign_result["nonce"]),
- "Signature": sign_result["signature"],
- "Timestamp": sign_result["timestamp"],
- }
- url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/commodity/queryParent"
- data = {
- "encryptionBodyParams": req_data
- }
- response = requests.post(url, headers=headers, json=data)
- # print(response.text)
- response.raise_for_status()
- decrypted_res = pokemon_aes_decrypt(response.text)
- # print(decrypted_res)
- return decrypted_res
- def parse_parent_list(log, list_data, sql_pool):
- """
- 解析 parent 数据
- :param log:
- :param list_data:
- :param sql_pool:
- :return:
- """
- if not list_data:
- log.error(f"{inspect.currentframe().f_code.co_name} list_data is None")
- return
- # info_list = []
- for item in list_data:
- parent_id = item.get("id")
- expansion_img = item.get("imageUrl")
- expansion_series = item.get("name")
- # data_dict = {
- # "parent_id": parent_id,
- # "expansion_series": expansion_series,
- # "expansion_img": expansion_img,
- # # "crawler_language": crawler_language
- # }
- # print(data_dict)
- # info_list.append(data_dict)
- try:
- log.debug(f"{inspect.currentframe().f_code.co_name} Request -> get_child_list, parent_id: {parent_id}")
- get_child_list(log, sql_pool, parent_id, expansion_series, expansion_img)
- except Exception as e:
- log.error(f"Error parsing child list: {e}")
- def get_parent_list(log, sql_pool):
- """
- 分页获取所有数据,支持多种停止条件
- :param log:
- :param sql_pool:
- """
- all_data = []
- page = 1
- total_fetched = 0 # 已获取的总记录数
- total_expected = None # 从第一次响应中获取 total
- while page <= max_page:
- try:
- result = get_parent_single_page(log, page, sql_pool)
- if result.get("code") != 0:
- log.error(f"请求失败: {result.get('message')}")
- break
- data = result.get("data", {})
- list_data = data.get("list", [])
- # 解析数据
- parse_parent_list(log, list_data, sql_pool)
- total = data.get("total")
- pages = data.get("pages")
- # has_next_page = data.get("hasNextPage", True) # 默认为 True
- # is_last_page = data.get("isLastPage", False) # 接口中该字段有问题
- # 记录总条数(首次获取)
- if total_expected is None:
- total_expected = total
- log.info(f"总条数: {total_expected}")
- # 停止条件判断
- if (
- len(list_data) < 20 or # 没有数据
- # is_last_page or # API 明确表示是最后一页
- (pages and page >= pages) or # 当前页 >= 总页数
- (total_expected and total_fetched + len(list_data) >= total_expected) # 已取完所有数据
- ):
- log.info(f"停止翻页,当前页: {page}, 已获取: {total_fetched + len(list_data)} 条")
- break
- # 添加到结果
- all_data.extend(list_data)
- total_fetched += len(list_data)
- log.info(f"第 {page} 页获取 {len(list_data)} 条,累计 {total_fetched} 条")
- page += 1
- except Exception as e:
- log.error(f"第 {page} 页请求异常: {e}")
- break
- log.info(f"共获取 {len(all_data)} 条数据")
- # return all_data
- # ---------------------------------------------------------------------------------------------------------------------
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_child_single_page(log, page, parentId):
- """
- 获取获取 每个父系列的 子系列 单页数据
- :param log:
- :param page:
- :param parentId: 查询的父系列id
- :return:
- """
- log.debug(f'Request {inspect.currentframe().f_code.co_name} for page: {page} .................')
- par = {'pageNum': str(page), 'pageSize': '20', 'parentId': str(parentId)}
- sign_result = api_sign(
- timeout=1221,
- user_token="",
- params=par
- )
- req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"])
- headers = {
- "Content-Type": "application/json",
- "User-Agent": user_agent.generate_user_agent(),
- "Api-Access-Token": "",
- "Nonce": str(sign_result["nonce"]),
- "Signature": sign_result["signature"],
- "Timestamp": sign_result["timestamp"],
- }
- url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/commodity/queryByParentId"
- data = {
- "encryptionBodyParams": req_data
- }
- response = requests.post(url, headers=headers, json=data)
- # print(response.text)
- response.raise_for_status()
- decrypted_res = pokemon_aes_decrypt(response.text)
- # print(decrypted_res)
- return decrypted_res
- def parse_child_list(log, list_data, sql_pool, parentId, expansion_series, expansion_img):
- """
- 解析 child 数据
- :param log:
- :param list_data:
- :param sql_pool:
- :param parentId:
- :param expansion_series:
- :param expansion_img:
- :return:
- """
- if not list_data:
- log.error(f"{inspect.currentframe().f_code.co_name} list_data is None")
- return
- info_list = []
- for item in list_data:
- child_id = item.get("id")
- commodityCode = item.get("commodityCode") # 商品编码
- # series = item.get("series")
- imageUrl = item.get("imageUrl")
- child_name = item.get("name")
- salesDate = item.get("salesDate")
- description_html = item.get("description")
- selector = Selector(description_html)
- description_list = selector.xpath('//p/text()').getall()
- description = '\n'.join(description_list)
- data_dict = {
- "parent_id": parentId,
- "expansion_series": expansion_series,
- "expansion_img": expansion_img,
- "child_id": child_id,
- "child_name": child_name,
- "commodity_code": commodityCode,
- # "series": series,
- "child_image_url": imageUrl,
- "sales_date": salesDate,
- "description": description,
- "crawler_language": crawler_language
- }
- # print(data_dict)
- info_list.append(data_dict)
- if info_list:
- sql_pool.insert_many(table="pokemon_jianz_category", data_list=info_list, ignore=True)
- def get_child_list(log, sql_pool, parent_id, expansion_series, expansion_img):
- """
- 获取 每个父系列的 子系列 列表
- :param log:
- :param sql_pool:
- :param parent_id:
- :param expansion_series:
- :param expansion_img:
- :return:
- """
- all_data = []
- page = 1
- total_fetched = 0
- total_expected = None
- while page <= max_page:
- try:
- result = get_child_single_page(log, page, parent_id)
- if result.get("code") != 0:
- log.error(f"请求失败: {result.get('message')}")
- break
- data = result.get("data", {})
- list_data = data.get("list", [])
- parse_child_list(log, list_data, sql_pool, parent_id, expansion_series, expansion_img)
- total = data.get("total")
- pages = data.get("pages")
- # has_next_page = data.get("hasNextPage", True)
- # is_last_page = data.get("isLastPage", False)
- # 记录总条数(首次获取)
- if total_expected is None:
- total_expected = total
- log.info(f"总条数: {total_expected}")
- if (
- len(list_data) < 20 or
- # is_last_page or
- (pages and page >= pages) or
- (total_expected and total_fetched + len(list_data) >= total_expected)
- ):
- log.info(f"停止翻页,当前页: {page}, 已获取: {total_fetched + len(list_data)} 条")
- break
- all_data.extend(list_data)
- total_fetched += len(list_data)
- log.info(f"第 {page} 页获取 {len(list_data)} 条,累计 {total_fetched} 条")
- page += 1
- except Exception as e:
- log.error(f"第 {page} 页请求异常: {e}")
- break
- log.info(f"共获取 {len(all_data)} 条数据")
- # ---------------------------------------------------------------------------------------------------------------------
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_series_single_page(log, item_tuple, page):
- """
- 获取 每个子系列的 列表页 单页请求
- :param log:
- :param item_tuple:
- :param page:
- :return:
- """
- child_id = item_tuple[0]
- child_name = item_tuple[1]
- commodity_code = item_tuple[2]
- sales_date = item_tuple[3]
- # print(child_id, child_name, commodity_code, sales_date)
- log.debug(f'Request {inspect.currentframe().f_code.co_name} for ID: {child_id}, page: {page} .................')
- par = {"banCardFlag": "0", "commodityIds": str(child_id), "commoditySelectedList": [
- {"id": str(child_id), "commodityName": child_name, "commodityCode": commodity_code, "salesDate": sales_date}],
- "pageNum": str(page), "pageSize": "50"}
- # print( par)
- # test_string = '{"banCardFlag":"0","commodityIds":"279","commoditySelectedList":[{"id":"279","commodityName":"收集啦151 惊","commodityCode":"151C3","salesDate":"2025-07-18"}],"pageNum":"8","pageSize":"50"}9124711756448767584fWS21MVyxkYwEoCIAHieg7Tqn0jPl3GzQvRsDJcb'
- # 字符串:{"banCardFlag":"0","commodityIds":"279","commoditySelectedList":[{"id":"279","commodityName":"收集啦151 惊","commodityCode":"151C3","salesDate":"2025-07-18"}],"pageNum":"8","pageSize":"50"}9124711756448767584fWS21MVyxkYwEoCIAHieg7Tqn0jPl3GzQvRsDJcb
- sign_result = api_sign(
- timeout=1221,
- user_token="",
- params=par,
- need_md5=True
- )
- # print(sign_result)
- # print(type(sign_result))
- # req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"])
- req_data = pokemon_aes_encrypt(par)
- # print(req_data)
- headers = {
- "Content-Type": "application/json",
- "User-Agent": user_agent.generate_user_agent(),
- "Api-Access-Token": "",
- "Nonce": str(sign_result["nonce"]),
- "Signature": sign_result["signature"],
- "Timestamp": sign_result["timestamp"],
- }
- url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/card/query"
- data = {
- "encryptionBodyParams": req_data
- }
- response = requests.post(url, headers=headers, json=data)
- # print(response.text)
- response.raise_for_status()
- decrypted_res = pokemon_aes_decrypt(response.text)
- # print(decrypted_res)
- return decrypted_res
- def parse_series(log, list_data, sql_pool, item_tuple):
- """
- 解析 子系列 详情 数据
- :param log:
- :param list_data:
- :param sql_pool:
- :param item_tuple:
- :return:
- """
- if not list_data:
- log.error(f"{inspect.currentframe().f_code.co_name} list_data is None")
- return
- # info_list = []
- for item in list_data:
- detail_id = item.get("id") # int
- # yoren_code = item.get("yorenCode") # 洋文代码
- # cdnImgUrl = item.get("cdnImgUrl")
- # cardType = item.get("cardType")
- # nameSamePokemonId = item.get("nameSamePokemonId") # 口袋妖怪id int
- # data_dict = {
- # "child_id": child_id,
- # "card_id": detail_id,
- # "crawler_language": crawler_language
- #
- # }
- # print(data_dict)
- # info_list.append(data_dict)
- try:
- get_details_page(log, detail_id, item_tuple, sql_pool)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} {detail_id} 请求异常: {e}")
- time.sleep(random.randint(1, 5) / 10)
- # if info_list:
- # sql_pool.insert_many(table="pokemon_card_child", data_list=info_list, ignore=True)
- def get_series_list(log, item_tuple, sql_pool):
- """
- 获取 每个子系列的 列表页
- :param log:
- :param item_tuple:
- :param sql_pool:
- :return:
- """
- all_data = []
- page = 1
- total_fetched = 0
- total_expected = None
- while page <= max_page:
- try:
- result = get_series_single_page(log, item_tuple, page)
- if result.get("code") != 0:
- log.error(f"请求失败: {result.get('message')}")
- break
- data = result.get("data", {})
- list_data = data.get("list", [])
- parse_series(log, list_data, sql_pool, item_tuple)
- total = data.get("total")
- pages = data.get("pages")
- # 记录总条数(首次获取)
- if total_expected is None:
- total_expected = total
- log.info(f"总条数: {total_expected}")
- # 停止条件判断
- if (
- len(list_data) < 20 or # 没有数据
- (pages and page >= pages) or # 当前页 >= 总页数
- (total_expected and total_fetched + len(list_data) >= total_expected) # 已取完所有数据
- ):
- log.info(f"停止翻页,当前页: {page}, 已获取: {total_fetched + len(list_data)} 条")
- break
- # 添加到结果
- all_data.extend(list_data)
- total_fetched += len(list_data)
- log.info(f"第 {page} 页获取 {len(list_data)} 条,累计 {total_fetched} 条")
- page += 1
- except Exception as e:
- log.error(f"第 {page} 页请求异常: {e}")
- break
- # ---------------------------------------------------------------------------------------------------------------------
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_details_page(log, card_id, item_tuple, sql_pool):
- """
- 获取 详情页 数据
- :param log:
- :param card_id:
- :param item_tuple:
- :param sql_pool:
- """
- log.debug(
- f'Request {inspect.currentframe().f_code.co_name} for card_id: {card_id} .................')
- par = {"id": str(card_id)}
- sign_result = api_sign(
- timeout=1221,
- user_token="",
- params=par,
- # need_md5=True
- )
- # print(f"解密结果: {sign_result}")
- # print(type(sign_result))
- # req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"])
- req_data = pokemon_aes_encrypt(par)
- # E = this.encryptionEnable ? "{}" == A ? {} : n({}, "GET" == o ? "encryptionUrlParams" : "encryptionBodyParams", "GET" == o ? encodeURIComponent(P) : P) : i,
- # Python 的 quote() 函数默认会将空格编码为 + 为了模拟 encodeURIComponent() 将空格编码为 %20 的行为,必须将 safe 参数设置为空字符串 ''
- req_data = quote(req_data, safe='')
- # print(req_data)
- headers = {
- "Content-Type": "application/json",
- "User-Agent": user_agent.generate_user_agent(),
- "Api-Access-Token": "",
- "Nonce": str(sign_result["nonce"]),
- "Signature": sign_result["signature"],
- "Timestamp": sign_result["timestamp"]
- }
- url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/card/get"
- params = {
- # "encryptionUrlParams": "%2BsGoBpMkJzVjwAgE2Gca2Q%3D%3D"
- "encryptionUrlParams": req_data
- }
- response = requests.get(url, headers=headers, params=params)
- # print('response.text:', response.text)
- response.raise_for_status()
- decrypted_res = pokemon_aes_decrypt(response.text)
- # print('decrypted_res:', decrypted_res)
- try:
- parse_details(log, decrypted_res, sql_pool, card_id, item_tuple)
- except Exception as e:
- log.error(f"解析详情页数据异常: {e}")
- def parse_details(log, list_data, sql_pool, card_id, item_tuple):
- """
- 解析详情页数据
- :param log:
- :param list_data:
- :param sql_pool:
- :param card_id:
- :param item_tuple:
- :return:
- """
- child_id = item_tuple[0]
- child_name = item_tuple[1]
- commodity_code = item_tuple[2]
- expansion_series = item_tuple[4]
- """
- pg_value -> commodity_code
- pg_label -> child_name
- major_category_name -> expansion_series
- """
- if not list_data:
- log.error(f"{inspect.currentframe().f_code.co_name} list_data is None")
- return
- item = list_data.get("data")
- if not item:
- log.error(f"{inspect.currentframe().f_code.co_name} item is None")
- return
- # pg_value
- # major_category_name
- img = item.get("imgUrl")
- # evolveText = item.get("evolveText")
- card_name = item.get("cardName")
- # regulationMarkText = item.get("regulationMarkText")
- card_no = item.get("collectionNumber")
- rarity = item.get("rarityText")
- data_dict = {
- "child_id": child_id,
- "major_category_name": expansion_series,
- "pg_value": commodity_code,
- "pg_label": child_name,
- "card_id": card_id,
- "card_name": card_name,
- "card_no": card_no,
- "rarity": rarity,
- # "regulationMarkText": regulationMarkText,
- "img": img,
- "crawler_language": crawler_language
- }
- # print(data_dict)
- sql_pool.insert_one_or_dict(table="pokemon_card_record", data=data_dict, ignore=True)
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def jz_pokemon_main(log):
- """
- 主函数
- """
- log.info(f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务.............................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 获取分类列表
- # log.debug(".......... 获取分类列表 ..........")
- # try:
- # get_parent_list(log, sql_pool)
- # except Exception as e:
- # log.error(f"{inspect.currentframe().f_code.co_name} Request get_category_list error: {e}")
- # 获取商品详情
- log.debug(f"........... 获取商品详情 ..........")
- # par = {"banCardFlag": "0", "commodityIds": "279", "commoditySelectedList": [
- # {"id": "279", "commodityName": "收集啦151 惊", "commodityCode": "151C3", "salesDate": "2025-07-18"}],
- # "pageNum": str(page), "pageSize": "50"}
- sql_ietm_id_list = sql_pool.select_all(
- f"SELECT DISTINCT child_id,child_name,commodity_code,sales_date,expansion_series FROM pokemon_jianz_category WHERE crawler_language='{crawler_language}'")
- # sql_ietm_id_list = [item_id[0] for item_id in sql_ietm_id_list]
- log.debug(f"获取商品详情长度为: {len(sql_ietm_id_list)}")
- for item_tuple in sql_ietm_id_list:
- try:
- get_series_list(log, item_tuple, sql_pool)
- # get_details_page(log, item_id, sql_pool)
- except Exception as e:
- log.error(f"Request get_details error: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- # get_parent_list(logger, None)
- # get_child_list(logger, None)
- # get_series_list(logger, None)
- # get_details_page(logger, 11364, None)
- jz_pokemon_main(logger)
|