# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/8/26 10:47 import inspect import random import time import requests import user_agent from loguru import logger from parsel import Selector from urllib.parse import quote from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool from wx_pokemon_aes_tool import pokemon_aes_encrypt, pokemon_aes_decrypt, api_sign max_page = 1000 crawler_language = "简中" logger.remove() logger.add("./logs/jian_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_parent_single_page(log, page, sql_pool): """ 单页请求 :param log: :param page: :param sql_pool: :return: decrypted_res -> 解密后的数据 """ log.debug(f'Request {inspect.currentframe().f_code.co_name} for page: {page} .................') par = {"pageNum": str(page), "pageSize": "20"} sign_result = api_sign( timeout=1221, user_token="", params=par ) req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"]) headers = { "Content-Type": "application/json", "User-Agent": user_agent.generate_user_agent(), "Api-Access-Token": "", "Nonce": str(sign_result["nonce"]), "Signature": sign_result["signature"], "Timestamp": sign_result["timestamp"], } url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/commodity/queryParent" data = { "encryptionBodyParams": req_data } response = requests.post(url, headers=headers, json=data) # print(response.text) response.raise_for_status() decrypted_res = pokemon_aes_decrypt(response.text) # print(decrypted_res) return decrypted_res def parse_parent_list(log, list_data, sql_pool): """ 解析 parent 数据 :param log: :param list_data: :param sql_pool: :return: """ if not list_data: log.error(f"{inspect.currentframe().f_code.co_name} list_data is None") return # info_list = [] for item in list_data: parent_id = item.get("id") expansion_img = item.get("imageUrl") expansion_series = item.get("name") # data_dict = { # "parent_id": parent_id, # "expansion_series": expansion_series, # "expansion_img": expansion_img, # # "crawler_language": crawler_language # } # print(data_dict) # info_list.append(data_dict) try: log.debug(f"{inspect.currentframe().f_code.co_name} Request -> get_child_list, parent_id: {parent_id}") get_child_list(log, sql_pool, parent_id, expansion_series, expansion_img) except Exception as e: log.error(f"Error parsing child list: {e}") def get_parent_list(log, sql_pool): """ 分页获取所有数据,支持多种停止条件 :param log: :param sql_pool: """ all_data = [] page = 1 total_fetched = 0 # 已获取的总记录数 total_expected = None # 从第一次响应中获取 total while page <= max_page: try: result = get_parent_single_page(log, page, sql_pool) if result.get("code") != 0: log.error(f"请求失败: {result.get('message')}") break data = result.get("data", {}) list_data = data.get("list", []) # 解析数据 parse_parent_list(log, list_data, sql_pool) total = data.get("total") pages = data.get("pages") # has_next_page = data.get("hasNextPage", True) # 默认为 True # is_last_page = data.get("isLastPage", False) # 接口中该字段有问题 # 记录总条数(首次获取) if total_expected is None: total_expected = total log.info(f"总条数: {total_expected}") # 停止条件判断 if ( len(list_data) < 20 or # 没有数据 # is_last_page or # API 明确表示是最后一页 (pages and page >= pages) or # 当前页 >= 总页数 (total_expected and total_fetched + len(list_data) >= total_expected) # 已取完所有数据 ): log.info(f"停止翻页,当前页: {page}, 已获取: {total_fetched + len(list_data)} 条") break # 添加到结果 all_data.extend(list_data) total_fetched += len(list_data) log.info(f"第 {page} 页获取 {len(list_data)} 条,累计 {total_fetched} 条") page += 1 except Exception as e: log.error(f"第 {page} 页请求异常: {e}") break log.info(f"共获取 {len(all_data)} 条数据") # return all_data # --------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_child_single_page(log, page, parentId): """ 获取获取 每个父系列的 子系列 单页数据 :param log: :param page: :param parentId: 查询的父系列id :return: """ log.debug(f'Request {inspect.currentframe().f_code.co_name} for page: {page} .................') par = {'pageNum': str(page), 'pageSize': '20', 'parentId': str(parentId)} sign_result = api_sign( timeout=1221, user_token="", params=par ) req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"]) headers = { "Content-Type": "application/json", "User-Agent": user_agent.generate_user_agent(), "Api-Access-Token": "", "Nonce": str(sign_result["nonce"]), "Signature": sign_result["signature"], "Timestamp": sign_result["timestamp"], } url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/commodity/queryByParentId" data = { "encryptionBodyParams": req_data } response = requests.post(url, headers=headers, json=data) # print(response.text) response.raise_for_status() decrypted_res = pokemon_aes_decrypt(response.text) # print(decrypted_res) return decrypted_res def parse_child_list(log, list_data, sql_pool, parentId, expansion_series, expansion_img): """ 解析 child 数据 :param log: :param list_data: :param sql_pool: :param parentId: :param expansion_series: :param expansion_img: :return: """ if not list_data: log.error(f"{inspect.currentframe().f_code.co_name} list_data is None") return info_list = [] for item in list_data: child_id = item.get("id") commodityCode = item.get("commodityCode") # 商品编码 # series = item.get("series") imageUrl = item.get("imageUrl") child_name = item.get("name") salesDate = item.get("salesDate") description_html = item.get("description") selector = Selector(description_html) description_list = selector.xpath('//p/text()').getall() description = '\n'.join(description_list) data_dict = { "parent_id": parentId, "expansion_series": expansion_series, "expansion_img": expansion_img, "child_id": child_id, "child_name": child_name, "commodity_code": commodityCode, # "series": series, "child_image_url": imageUrl, "sales_date": salesDate, "description": description, "crawler_language": crawler_language } # print(data_dict) info_list.append(data_dict) if info_list: sql_pool.insert_many(table="pokemon_jianz_category", data_list=info_list, ignore=True) def get_child_list(log, sql_pool, parent_id, expansion_series, expansion_img): """ 获取 每个父系列的 子系列 列表 :param log: :param sql_pool: :param parent_id: :param expansion_series: :param expansion_img: :return: """ all_data = [] page = 1 total_fetched = 0 total_expected = None while page <= max_page: try: result = get_child_single_page(log, page, parent_id) if result.get("code") != 0: log.error(f"请求失败: {result.get('message')}") break data = result.get("data", {}) list_data = data.get("list", []) parse_child_list(log, list_data, sql_pool, parent_id, expansion_series, expansion_img) total = data.get("total") pages = data.get("pages") # has_next_page = data.get("hasNextPage", True) # is_last_page = data.get("isLastPage", False) # 记录总条数(首次获取) if total_expected is None: total_expected = total log.info(f"总条数: {total_expected}") if ( len(list_data) < 20 or # is_last_page or (pages and page >= pages) or (total_expected and total_fetched + len(list_data) >= total_expected) ): log.info(f"停止翻页,当前页: {page}, 已获取: {total_fetched + len(list_data)} 条") break all_data.extend(list_data) total_fetched += len(list_data) log.info(f"第 {page} 页获取 {len(list_data)} 条,累计 {total_fetched} 条") page += 1 except Exception as e: log.error(f"第 {page} 页请求异常: {e}") break log.info(f"共获取 {len(all_data)} 条数据") # --------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_series_single_page(log, item_tuple, page): """ 获取 每个子系列的 列表页 单页请求 :param log: :param item_tuple: :param page: :return: """ child_id = item_tuple[0] child_name = item_tuple[1] commodity_code = item_tuple[2] sales_date = item_tuple[3] # print(child_id, child_name, commodity_code, sales_date) log.debug(f'Request {inspect.currentframe().f_code.co_name} for ID: {child_id}, page: {page} .................') par = {"banCardFlag": "0", "commodityIds": str(child_id), "commoditySelectedList": [ {"id": str(child_id), "commodityName": child_name, "commodityCode": commodity_code, "salesDate": sales_date}], "pageNum": str(page), "pageSize": "50"} # print( par) # test_string = '{"banCardFlag":"0","commodityIds":"279","commoditySelectedList":[{"id":"279","commodityName":"收集啦151 惊","commodityCode":"151C3","salesDate":"2025-07-18"}],"pageNum":"8","pageSize":"50"}9124711756448767584fWS21MVyxkYwEoCIAHieg7Tqn0jPl3GzQvRsDJcb' # 字符串:{"banCardFlag":"0","commodityIds":"279","commoditySelectedList":[{"id":"279","commodityName":"收集啦151 惊","commodityCode":"151C3","salesDate":"2025-07-18"}],"pageNum":"8","pageSize":"50"}9124711756448767584fWS21MVyxkYwEoCIAHieg7Tqn0jPl3GzQvRsDJcb sign_result = api_sign( timeout=1221, user_token="", params=par, need_md5=True ) # print(sign_result) # print(type(sign_result)) # req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"]) req_data = pokemon_aes_encrypt(par) # print(req_data) headers = { "Content-Type": "application/json", "User-Agent": user_agent.generate_user_agent(), "Api-Access-Token": "", "Nonce": str(sign_result["nonce"]), "Signature": sign_result["signature"], "Timestamp": sign_result["timestamp"], } url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/card/query" data = { "encryptionBodyParams": req_data } response = requests.post(url, headers=headers, json=data) # print(response.text) response.raise_for_status() decrypted_res = pokemon_aes_decrypt(response.text) # print(decrypted_res) return decrypted_res def parse_series(log, list_data, sql_pool, item_tuple): """ 解析 子系列 详情 数据 :param log: :param list_data: :param sql_pool: :param item_tuple: :return: """ if not list_data: log.error(f"{inspect.currentframe().f_code.co_name} list_data is None") return # info_list = [] for item in list_data: detail_id = item.get("id") # int # yoren_code = item.get("yorenCode") # 洋文代码 # cdnImgUrl = item.get("cdnImgUrl") # cardType = item.get("cardType") # nameSamePokemonId = item.get("nameSamePokemonId") # 口袋妖怪id int # data_dict = { # "child_id": child_id, # "card_id": detail_id, # "crawler_language": crawler_language # # } # print(data_dict) # info_list.append(data_dict) try: get_details_page(log, detail_id, item_tuple, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} {detail_id} 请求异常: {e}") time.sleep(random.randint(1, 5) / 10) # if info_list: # sql_pool.insert_many(table="pokemon_card_child", data_list=info_list, ignore=True) def get_series_list(log, item_tuple, sql_pool): """ 获取 每个子系列的 列表页 :param log: :param item_tuple: :param sql_pool: :return: """ all_data = [] page = 1 total_fetched = 0 total_expected = None while page <= max_page: try: result = get_series_single_page(log, item_tuple, page) if result.get("code") != 0: log.error(f"请求失败: {result.get('message')}") break data = result.get("data", {}) list_data = data.get("list", []) parse_series(log, list_data, sql_pool, item_tuple) total = data.get("total") pages = data.get("pages") # 记录总条数(首次获取) if total_expected is None: total_expected = total log.info(f"总条数: {total_expected}") # 停止条件判断 if ( len(list_data) < 20 or # 没有数据 (pages and page >= pages) or # 当前页 >= 总页数 (total_expected and total_fetched + len(list_data) >= total_expected) # 已取完所有数据 ): log.info(f"停止翻页,当前页: {page}, 已获取: {total_fetched + len(list_data)} 条") break # 添加到结果 all_data.extend(list_data) total_fetched += len(list_data) log.info(f"第 {page} 页获取 {len(list_data)} 条,累计 {total_fetched} 条") page += 1 except Exception as e: log.error(f"第 {page} 页请求异常: {e}") break # --------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_details_page(log, card_id, item_tuple, sql_pool): """ 获取 详情页 数据 :param log: :param card_id: :param item_tuple: :param sql_pool: """ log.debug( f'Request {inspect.currentframe().f_code.co_name} for card_id: {card_id} .................') par = {"id": str(card_id)} sign_result = api_sign( timeout=1221, user_token="", params=par, # need_md5=True ) # print(f"解密结果: {sign_result}") # print(type(sign_result)) # req_data = pokemon_aes_encrypt(sign_result["secretJsonParams"]) req_data = pokemon_aes_encrypt(par) # E = this.encryptionEnable ? "{}" == A ? {} : n({}, "GET" == o ? "encryptionUrlParams" : "encryptionBodyParams", "GET" == o ? encodeURIComponent(P) : P) : i, # Python 的 quote() 函数默认会将空格编码为 + 为了模拟 encodeURIComponent() 将空格编码为 %20 的行为,必须将 safe 参数设置为空字符串 '' req_data = quote(req_data, safe='') # print(req_data) headers = { "Content-Type": "application/json", "User-Agent": user_agent.generate_user_agent(), "Api-Access-Token": "", "Nonce": str(sign_result["nonce"]), "Signature": sign_result["signature"], "Timestamp": sign_result["timestamp"] } url = "https://app-api.pokemon-tcg.cn/app-api/v1/app/card/get" params = { # "encryptionUrlParams": "%2BsGoBpMkJzVjwAgE2Gca2Q%3D%3D" "encryptionUrlParams": req_data } response = requests.get(url, headers=headers, params=params) # print('response.text:', response.text) response.raise_for_status() decrypted_res = pokemon_aes_decrypt(response.text) # print('decrypted_res:', decrypted_res) try: parse_details(log, decrypted_res, sql_pool, card_id, item_tuple) except Exception as e: log.error(f"解析详情页数据异常: {e}") def parse_details(log, list_data, sql_pool, card_id, item_tuple): """ 解析详情页数据 :param log: :param list_data: :param sql_pool: :param card_id: :param item_tuple: :return: """ child_id = item_tuple[0] child_name = item_tuple[1] commodity_code = item_tuple[2] expansion_series = item_tuple[4] """ pg_value -> commodity_code pg_label -> child_name major_category_name -> expansion_series """ if not list_data: log.error(f"{inspect.currentframe().f_code.co_name} list_data is None") return item = list_data.get("data") if not item: log.error(f"{inspect.currentframe().f_code.co_name} item is None") return # pg_value # major_category_name img = item.get("imgUrl") # evolveText = item.get("evolveText") card_name = item.get("cardName") # regulationMarkText = item.get("regulationMarkText") card_no = item.get("collectionNumber") rarity = item.get("rarityText") data_dict = { "child_id": child_id, "major_category_name": expansion_series, "pg_value": commodity_code, "pg_label": child_name, "card_id": card_id, "card_name": card_name, "card_no": card_no, "rarity": rarity, # "regulationMarkText": regulationMarkText, "img": img, "crawler_language": crawler_language } # print(data_dict) sql_pool.insert_one_or_dict(table="pokemon_card_record", data=data_dict, ignore=True) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def jz_pokemon_main(log): """ 主函数 """ log.info(f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务.............................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 获取分类列表 # log.debug(".......... 获取分类列表 ..........") # try: # get_parent_list(log, sql_pool) # except Exception as e: # log.error(f"{inspect.currentframe().f_code.co_name} Request get_category_list error: {e}") # 获取商品详情 log.debug(f"........... 获取商品详情 ..........") # par = {"banCardFlag": "0", "commodityIds": "279", "commoditySelectedList": [ # {"id": "279", "commodityName": "收集啦151 惊", "commodityCode": "151C3", "salesDate": "2025-07-18"}], # "pageNum": str(page), "pageSize": "50"} sql_ietm_id_list = sql_pool.select_all( f"SELECT DISTINCT child_id,child_name,commodity_code,sales_date,expansion_series FROM pokemon_jianz_category WHERE crawler_language='{crawler_language}'") # sql_ietm_id_list = [item_id[0] for item_id in sql_ietm_id_list] log.debug(f"获取商品详情长度为: {len(sql_ietm_id_list)}") for item_tuple in sql_ietm_id_list: try: get_series_list(log, item_tuple, sql_pool) # get_details_page(log, item_id, sql_pool) except Exception as e: log.error(f"Request get_details error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': # get_parent_list(logger, None) # get_child_list(logger, None) # get_series_list(logger, None) # get_details_page(logger, 11364, None) jz_pokemon_main(logger)