# -*- coding: utf-8 -*- # Author : Charley # Python : 3.8.10 # Date : 2025/4/14 11:22 import time import schedule from mysql_pool import MySQLConnectionPool from settings import * from get_kw_sign import get_sign baseUrl = "https://app.cardplayd.com/", def parse_shop_list(log, sql_pool, data_info_list, sql_shop_list): data_list = [] for data_info in data_info_list: if data_info is None: continue shop_id = data_info.get('shopId') if shop_id in sql_shop_list: log.debug(f'{inspect.currentframe().f_code.co_name} Shop {shop_id} already exists in the database.') continue data_dict = { "shop_id": shop_id, "shop_name": data_info.get('shopName'), "fans_num": data_info.get('fansCount'), "group_num": data_info.get('collageSuccessCount') } data_list.append(data_dict) sql_shop_list.append(shop_id) # print(data_list) if data_list: sql_pool.insert_many(table='kawan_shop_record', data_list=data_list) else: log.debug(f'{inspect.currentframe().f_code.co_name} No new shop data found............') def get_shop_one_page(log, headers, page_num=1): log.debug(f'{inspect.currentframe().f_code.co_name} Request page_num: {page_num}') url = "https://app.cardplayd.com/app/system/shop/queryShopList" nonce, a, sign = get_sign(url) headers.update(timestamp=a, nonce=nonce, signature=sign) params = { "pageNum": str(page_num), "pageSize": "10" } # response = requests.get(url, headers=headers, params=params) response = make_request(log, 'GET', url, params=params, headers=headers) # print(response) return response def get_shop_list(log, sql_pool, sql_shop_list, headers): page_num = 1 len_all_shops = 0 while True: log.debug(f'{inspect.currentframe().f_code.co_name} Requesting.............') response_json = get_shop_one_page(log, headers, page_num) if response_json is None: log.error("Failed to fetch shop list. Exiting...") break data = response_json.get('data', {}) data_info_list = data.get('dataInfo', []) # all_shops.extend(data_info_list) len_all_shops += len(data_info_list) # 解析店铺列表 parse_shop_list(log, sql_pool, data_info_list, sql_shop_list) total_shops = data.get('total', 0) if not data_info_list or len(data_info_list) < 10: log.info("No more shops found. Stopping requests.") break page_num += 1 # 如果当前已获取的店铺数量达到或超过总店铺数量,停止请求 if len_all_shops >= total_shops: log.info("Total shops fetched. Stopping requests.") break log.info(f"Total shops fetched: {len_all_shops}") # ---------------------------------------------------------------------------------------------------------------------- def get_acticity_xplain(log, product_id, headers): url = f"https://app.cardplayd.com/app/system/cardCollage/queryCollageActivityExplainList/{product_id}" # url_p = "https://app.cardplayd.com/app/system/cardCollage/queryCollageActivityExplainList" nonce, a, sign = get_sign(url) headers.update(timestamp=a, nonce=nonce, signature=sign) # response = requests.get(url, headers=headers) response = make_request(log, 'GET', url, headers=headers) # print(response) try: json_data = response.get('data', [{}]) if json_data: json_data_list = json_data[0] explain_name = json_data_list.get('explainName') explain_info = json_data_list.get('explainInfo') # print(explain_name, explain_info) return explain_name, explain_info else: log.warning(f'{inspect.currentframe().f_code.co_name} Request product_id: {product_id}, Error: No data') return None, None except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} Request product_id: {product_id}, Error: {e}') return None, None def parse_sold_list(log, sql_pool, data_info_list, sql_product_id_list, headers, shop_name): data_list = [] for data_info in data_info_list: product_id = data_info.get('id') if product_id in sql_product_id_list: log.debug(f'{inspect.currentframe().f_code.co_name} Product {product_id} already exists in the database.') continue no = data_info.get('collageCode') title = data_info.get('collageName') img = data_info.get('rotationImagePath') price_sale = data_info.get('firstBuyPrice') original_price = data_info.get('unitPrice') # 原价 total_price = data_info.get('totalPrice') sale_num = data_info.get('lotCount') play_way = data_info.get('playWay') spec_config = data_info.get('specifications') spec_config_count = data_info.get('count') sheets_number = data_info.get('sheetNumber') state = data_info.get('collageStatus') shop_id = data_info.get('shopId') category_id = data_info.get('cardCategoryId') on_sale_time = data_info.get('startTime') # 开售时间 end_time = data_info.get('applyTime') # 理论完成时间 finish_time = data_info.get('endTime') # 实际完成时间 begin_live_time = data_info.get('beginLiveTime') # 开始直播时间 live_complete_time = data_info.get('liveCompleteTime') # 直播完成时间 explain_name, explain_info = get_acticity_xplain(log, product_id, headers) # 活动说明 # 活动说明信息 video_url = data_info.get('liveBackPath') sold_data = { "product_id": product_id, "no": no, "title": title, "img": img, "price_sale": price_sale, "original_price": original_price, "total_price": total_price, "sale_num": sale_num, "play_way": play_way, "spec_config": spec_config, "spec_config_count": spec_config_count, "sheets_number": sheets_number, "state": state, "shop_id": shop_id, "shop_name": shop_name, "category_id": category_id, "on_sale_time": on_sale_time, "end_time": end_time, "finish_time": finish_time, "begin_live_time": begin_live_time, "live_complete_time": live_complete_time, "explain_name": explain_name, "explain_info": explain_info, "video_url": video_url } data_list.append(sold_data) sql_product_id_list.append(product_id) # print(data_list) if data_list: sql_pool.insert_many(table="kawan_product_record", data_list=data_list) def get_sold_one_page(log, shopId, headers, page_num=1): log.debug(f'{inspect.currentframe().f_code.co_name} Request page_num: {page_num}') url = "https://app.cardplayd.com/app/system/shopInfo/collageList" params = { "pageNum": str(page_num), "pageSize": "10", "status": "6", "timeLimit": "true", "shopId": shopId } nonce, a, sign = get_sign(url) headers.update(timestamp=a, nonce=nonce, signature=sign) # response = requests.get(url, headers=headers, params=params) response = make_request(log, 'GET', url, params=params, headers=headers) # print(response) return response def get_sold_list(log, sql_pool, shopId, shop_name, sql_product_id_list, headers): page_num = 1 len_all_sold = 0 log.debug(f'{inspect.currentframe().f_code.co_name} Requesting with shopId: {shopId}.............') while True: response_json = get_sold_one_page(log, shopId, headers, page_num) if response_json is None: log.error("Failed to fetch sold list. Exiting...") break data = response_json.get('data', {}) total_solds = response_json.get('total', 0) if total_solds == 0: log.warning( f"Warning {inspect.currentframe().f_code.co_name}: total_solds == 0, shop_id:{shopId}没有已售数据") break sold_info_list = data.get('dataInfo', []) # print(sold_info_list) if not sold_info_list: log.warning( f"Warning {inspect.currentframe().f_code.co_name}: sold_info_list为空, shop_id:{shopId}没有已售数据") break len_all_sold += len(sold_info_list) # 解析已售列表 parse_sold_list(log, sql_pool, sold_info_list, sql_product_id_list, headers, shop_name) if not sold_info_list or len(sold_info_list) < 10: log.info("No more sold_info_list found. Stopping requests.") break page_num += 1 # 如果当前已获取的店铺数量达到或超过总数量total,停止请求 if len_all_sold >= total_solds: log.info("Total sold_info_list fetched. Stopping requests.") break log.info(f"Total sold_info_list fetched: {len_all_sold}") # ---------------------------------------------------------------------------------------------------------------------- def get_product_detail(log, product_id, headers): # 暂时用不到 备用 log.debug(f'{inspect.currentframe().f_code.co_name} Request product_id: {product_id}') url = "https://app.cardplayd.com/app/system/cardCollage/getCardCollageInfoById" params = { # "collageId": "1911391864602927105" "collageId": product_id } nonce, a, sign = get_sign(url) headers.update(timestamp=a, nonce=nonce, signature=sign) response = requests.get(url, headers=headers, params=params) print(response.text) print(response) # ---------------------------------------------------------------------------------------------------------------------- def get_player_list(log, cardCollageId): log.debug(f'{inspect.currentframe().f_code.co_name} Request cardCollageId: {cardCollageId}') url = "https://app.cardplayd.com/app/system/cardReport/getCarmiPublicityVoByTypeList" # data = { # "F9gls72L1UmU0/fdUzTFS3ry8fQCShi/nU0HmHsW3WtEtgxZ9j3kG2SQKu3iQ3FDtBlp4bnHXXxDZqeXjyEiEICm1Xo4QJTYKIB9kijJy3mA2V2Ayt2X5Rqf+eipjEX+5ES+7D3gZdEmcdT9gPOjjn69z4hqnweX3thbvg5/LXJ1531bkv/otiMYFshgbbMQ51el/Tlh20zDkpj952Y8Gg": "=" # } # data = "DuDAqydlmgNlK/1CtwT2hIVdQuBreX0MQC0hjznCSJyH3ZIKgyk7yEK1+Fs3E3eFoR9kKSrQnDVis5jh0SwKmDlQ9cEdQMPa1facZd5asXV10oQrud4aONB4/RjyQh/iNL7tWTIT2HZCtwJIz61kA/6kRqIIpnBfoOkFTegD6TvzG1XhmYlMcZ70PWIpF4o+VMYhAVyTFfsLa7kBFJUdqA==" player_headers = { "User-Agent": "Dart/3.6 (dart:io)", "Accept-Encoding": "gzip", "Content-Type": "application/json", "authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiJhcHBMb2dpbiIsImxvZ2luSWQiOiJhcHBfdXNlcjoxOTExNjI3MDU4NDk2ODM5NjgyIiwicm5TdHIiOiJoUTNtS2VwQ210RWZ4VFkzQVNIUTcxV1RRZlo1ajBkNCIsImNsaWVudGlkIjoiNDI4YTgzMTBjZDQ0Mjc1N2FlNjk5ZGY1ZDg5NGYwNTEiLCJ1c2VySWQiOjE5MTE2MjcwNTg0OTY4Mzk2ODIsImludml0ZUNvZGUiOiIyMDc3MTYiLCJwaG9uZSI6IjEzMDE0NjE3NjE0In0.IuWoS8kCmG4OQFh1XINJOHpbeKMZKlMmticVglAVF_Y", "content-language": "zh_CN", # "encrypt-key": "n8XhkgeVYg26D8/nY3MGThxNzKI59EMd69AjUF3Jk5ZT9ixwo21PABLWhwLMJFuSXqASVsUaq2KhUnjsaaIXDA==", "app-version": "1.0.12", # "isencrypt": "true", # "isencrypt": "false", } # data = "916L0IKDzAb1hrnjyzFDCH+prEuxPR0LqfU5m79fYlfZTCvFQhehf43vS0P9Gz91+ySAFH8cvuIaC8f2A6Awo3HXmjJY4GzUXPTDNNehEgMugpVAXsS1ly9tWuWgQp0nnZuFZzWL281CNuo9cY8XkrcyL9p2QqVs5GDNnSFNi2Y8LRPk+1aiED2n+rvY7j0stupez5m9+1AcNGAUyKO/hQ==" # data = "eyJjYXJkQ29sbGFnZUlkIjogIjE5MTEzOTE4NjQ2MDI5MjcxMDUiLCJwYWdlU2l6ZSI6ICIxMCIsInBhZ2VOdW0iOiAiMSIsImVuY3J5cHQta2V5IjogIm44WGhrZ2VWWWcyNkQ4L25ZM01HVGh4TnpLSTU5RU1kNjlBalVGM0prNVpUOWl4d28yMVBBQkxXaHdMTUpGdVNYcUFTVnNVYXEyS2hVbmpzYWFJWERBPT0ifQ==" # data = {"type": "1", "cardCollageId": "1906194540964519937", "userAnonymous": None, "pageSize": 100, "pageNum": 1, # "isAsc": "", "orderByColumn": "create_time", "filterInfo": ""} data = {"type": "1", "cardCollageId": cardCollageId, "userAnonymous": None, "pageSize": 100, "pageNum": 1, "isAsc": "", "orderByColumn": "create_time", "filterInfo": ""} nonce, a, sign = get_sign(url) player_headers.update(timestamp=a, nonce=nonce, signature=sign) response = requests.post(url, headers=player_headers, data=data) # response = make_request(log, 'POST', url, headers=headers, data=data) print(response.text) print(response) # ---------------------------------------------------------------------------------------------------------------------- def get_report_one_page(log, collageId, headers, page_num=1): url = "https://app.cardplayd.com/app/system/cardReport/getOpenReportInfo" params = { "cardCollageId": collageId, "pageSize": "10", "pageNum": str(page_num) } nonce, a, sign = get_sign(url) headers.update(timestamp=a, nonce=nonce, signature=sign) # response = requests.get(url, headers=headers, params=params) response = make_request(log, 'GET', url, headers=headers, params=params) # print(response) return response def parse_report_list(sql_pool, report_info_list, collageId): data_list = [] for report_info in report_info_list: data = { "product_id": collageId, "card_name": report_info.get("carmiInfo"), "open_card_time": report_info.get("openCardTime"), "imgs": report_info.get("frontImagePath") } data_list.append(data) if data_list: sql_pool.insert_many(table="kawan_report_record", data_list=data_list) def get_report_list(log, sql_pool, collageId, headers): page_num = 1 len_all_report = 0 while True: response_json = get_report_one_page(log, collageId, headers, page_num) if response_json is None: log.error("Failed to fetch report list. Exiting...") break data = response_json.get('data', {}) total_reports = data.get('total', 0) if total_reports == 0: log.warning( f"Warning {inspect.currentframe().f_code.co_name}: {response_json['msg']}, collageId:{collageId}没有 report 数据") break report_info_list = data.get('otherCardReportResultList', []) if not report_info_list: log.warning( f"Warning {inspect.currentframe().f_code.co_name}: {response_json['msg']}, collageId:{collageId}没有 report 数据") break len_all_report += len(report_info_list) parse_report_list(sql_pool, report_info_list, collageId) if not report_info_list or len(report_info_list) < 10: log.info("No more report_info_list found. Stopping requests.") break page_num += 1 if len_all_report >= total_reports: log.info("Total report_info_list fetched. Stopping requests.") break log.info(f"Total report_info_list fetched: {len_all_report}") sql_pool.update_one("update kawan_product_record set report_state = 1 where product_id = %s", (collageId,)) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def kawan_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: sql_token = sql_pool.select_one("SELECT token FROM kawan_token") # sql_token_str = 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiJhcHBMb2dpbiIsImxvZ2luSWQiOiJhcHBfdXNlcjoxOTExNjI0MDUzMzQ5MzI2ODQ5Iiwicm5TdHIiOiJvZURBT2QwTEFvYmlmTFR2Y0xVVXpNQ0haaWVLWXRrUyIsImNsaWVudGlkIjoiNDI4YTgzMTBjZDQ0Mjc1N2FlNjk5ZGY1ZDg5NGYwNTEiLCJ1c2VySWQiOjE5MTE2MjQwNTMzNDkzMjY4NDksImludml0ZUNvZGUiOiI0NjI2MTgiLCJwaG9uZSI6IjE5NTIxNTAwODUwIn0.PY7l7OvS2fOHsgl-YsHcEy1TyKsIgkmSxSV4RZxWaxc' headers = { "User-Agent": "Dart/3.6 (dart:io)", "Accept-Encoding": "gzip", # "authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiJhcHBMb2dpbiIsImxvZ2luSWQiOiJhcHBfdXNlcjoxOTExNjI3MDU4NDk2ODM5NjgyIiwicm5TdHIiOiJoUTNtS2VwQ210RWZ4VFkzQVNIUTcxV1RRZlo1ajBkNCIsImNsaWVudGlkIjoiNDI4YTgzMTBjZDQ0Mjc1N2FlNjk5ZGY1ZDg5NGYwNTEiLCJ1c2VySWQiOjE5MTE2MjcwNTg0OTY4Mzk2ODIsImludml0ZUNvZGUiOiIyMDc3MTYiLCJwaG9uZSI6IjEzMDE0NjE3NjE0In0.IuWoS8kCmG4OQFh1XINJOHpbeKMZKlMmticVglAVF_Y", "authorization": sql_token[0], # "authorization": sql_token_str, "content-type": "application/json", "app-version": "1.0.12", "content-language": "zh_CN", # "nonce": "ceac8160-18e3-11f0-bb6e-95de6e5ff903", "isencrypt": "false" } # 获取 商家 列表 try: sql_shop_list = sql_pool.select_all("SELECT shop_id FROM kawan_shop_record") sql_shop_list = [item[0] for item in sql_shop_list] get_shop_list(log, sql_pool, sql_shop_list, headers) sql_shop_list.clear() except Exception as e: log.error(f"Error fetching last_product_id: {e}") time.sleep(5) # 获取已售商品 try: sql_shop_id_list = sql_pool.select_all("SELECT shop_id, shop_name FROM kawan_shop_record") # sql_shop_id_list = [item[0] for item in sql_shop_id_list] # 获取 product_id_list sql_product_id_list = sql_pool.select_all("SELECT product_id FROM kawan_product_record") sql_product_id_list = [item[0] for item in sql_product_id_list] for shop_id_name in sql_shop_id_list: shop_id = shop_id_name[0] shop_name = shop_id_name[1] log.info(f"开始获取商家:{shop_id} 已售商品") try: get_sold_list(log, sql_pool, shop_id, shop_name, sql_product_id_list, headers) except Exception as e: log.error(f"Error fetching get_sold_list for shop_id:{shop_id}, {e}") sql_product_id_list.clear() except Exception as e: log.error(f"Error fetching sql_shop_id_list: {e}") time.sleep(5) # 获取拆卡报告 try: sql_product_id_list_for_report = sql_pool.select_all( "SELECT product_id FROM kawan_product_record WHERE report_state = 0") sql_product_id_list_for_report = [item[0] for item in sql_product_id_list_for_report] for product_id in sql_product_id_list_for_report: log.info(f"开始获取商品:{product_id} 拆卡报告") try: get_report_list(log, sql_pool, product_id, headers) except Exception as e: log.error(f"Error fetching reports for product_id:{product_id}, {e}") sql_pool.update_one("update kawan_product_record set report_state = 2 where product_id = %s", (product_id,)) except Exception as e: log.error(f"Error fetching reports: {e}") # time.sleep(5) # 获取商品玩家 # try: # get_player_list(log, sql_pool) # except Exception as e: # log.error(f"Error fetching players: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def schedule_task(): """ 爬虫模块的启动文件 """ # 立即运行一次任务 # kawan_main(log=logger) # 设置定时任务 schedule.every().day.at("00:01").do(kawan_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': # get_shop_list(logger, None) # get_sold_list(logger) # get_acticity_xplain(logger, '1910557299676192770') # get_product_detail(logger, '1910557299676192770') # get_player_list(logger) # get_report_list(logger, '1910557299676192770') # kawan_main(logger) schedule_task()