| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.8.10
- # Date : 2025/4/14 11:22
- import time
- import schedule
- from mysql_pool import MySQLConnectionPool
- from settings import *
- from get_kw_sign import get_sign
- baseUrl = "https://app.cardplayd.com/",
- def parse_shop_list(log, sql_pool, data_info_list, sql_shop_list):
- data_list = []
- for data_info in data_info_list:
- if data_info is None:
- continue
- shop_id = data_info.get('shopId')
- if shop_id in sql_shop_list:
- log.debug(f'{inspect.currentframe().f_code.co_name} Shop {shop_id} already exists in the database.')
- continue
- data_dict = {
- "shop_id": shop_id,
- "shop_name": data_info.get('shopName'),
- "fans_num": data_info.get('fansCount'),
- "group_num": data_info.get('collageSuccessCount')
- }
- data_list.append(data_dict)
- sql_shop_list.append(shop_id)
- # print(data_list)
- if data_list:
- sql_pool.insert_many(table='kawan_shop_record', data_list=data_list)
- else:
- log.debug(f'{inspect.currentframe().f_code.co_name} No new shop data found............')
- def get_shop_one_page(log, headers, page_num=1):
- log.debug(f'{inspect.currentframe().f_code.co_name} Request page_num: {page_num}')
- url = "https://app.cardplayd.com/app/system/shop/queryShopList"
- nonce, a, sign = get_sign(url)
- headers.update(timestamp=a, nonce=nonce, signature=sign)
- params = {
- "pageNum": str(page_num),
- "pageSize": "10"
- }
- # response = requests.get(url, headers=headers, params=params)
- response = make_request(log, 'GET', url, params=params, headers=headers)
- # print(response)
- return response
- def get_shop_list(log, sql_pool, sql_shop_list, headers):
- page_num = 1
- len_all_shops = 0
- while True:
- log.debug(f'{inspect.currentframe().f_code.co_name} Requesting.............')
- response_json = get_shop_one_page(log, headers, page_num)
- if response_json is None:
- log.error("Failed to fetch shop list. Exiting...")
- break
- data = response_json.get('data', {})
- data_info_list = data.get('dataInfo', [])
- # all_shops.extend(data_info_list)
- len_all_shops += len(data_info_list)
- # 解析店铺列表
- parse_shop_list(log, sql_pool, data_info_list, sql_shop_list)
- total_shops = data.get('total', 0)
- if not data_info_list or len(data_info_list) < 10:
- log.info("No more shops found. Stopping requests.")
- break
- page_num += 1
- # 如果当前已获取的店铺数量达到或超过总店铺数量,停止请求
- if len_all_shops >= total_shops:
- log.info("Total shops fetched. Stopping requests.")
- break
- log.info(f"Total shops fetched: {len_all_shops}")
- # ----------------------------------------------------------------------------------------------------------------------
- def get_acticity_xplain(log, product_id, headers):
- url = f"https://app.cardplayd.com/app/system/cardCollage/queryCollageActivityExplainList/{product_id}"
- # url_p = "https://app.cardplayd.com/app/system/cardCollage/queryCollageActivityExplainList"
- nonce, a, sign = get_sign(url)
- headers.update(timestamp=a, nonce=nonce, signature=sign)
- # response = requests.get(url, headers=headers)
- response = make_request(log, 'GET', url, headers=headers)
- # print(response)
- try:
- json_data = response.get('data', [{}])
- if json_data:
- json_data_list = json_data[0]
- explain_name = json_data_list.get('explainName')
- explain_info = json_data_list.get('explainInfo')
- # print(explain_name, explain_info)
- return explain_name, explain_info
- else:
- log.warning(f'{inspect.currentframe().f_code.co_name} Request product_id: {product_id}, Error: No data')
- return None, None
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} Request product_id: {product_id}, Error: {e}')
- return None, None
- def parse_sold_list(log, sql_pool, data_info_list, sql_product_id_list, headers, shop_name):
- data_list = []
- for data_info in data_info_list:
- product_id = data_info.get('id')
- if product_id in sql_product_id_list:
- log.debug(f'{inspect.currentframe().f_code.co_name} Product {product_id} already exists in the database.')
- continue
- no = data_info.get('collageCode')
- title = data_info.get('collageName')
- img = data_info.get('rotationImagePath')
- price_sale = data_info.get('firstBuyPrice')
- original_price = data_info.get('unitPrice') # 原价
- total_price = data_info.get('totalPrice')
- sale_num = data_info.get('lotCount')
- play_way = data_info.get('playWay')
- spec_config = data_info.get('specifications')
- spec_config_count = data_info.get('count')
- sheets_number = data_info.get('sheetNumber')
- state = data_info.get('collageStatus')
- shop_id = data_info.get('shopId')
- category_id = data_info.get('cardCategoryId')
- on_sale_time = data_info.get('startTime') # 开售时间
- end_time = data_info.get('applyTime') # 理论完成时间
- finish_time = data_info.get('endTime') # 实际完成时间
- begin_live_time = data_info.get('beginLiveTime') # 开始直播时间
- live_complete_time = data_info.get('liveCompleteTime') # 直播完成时间
- explain_name, explain_info = get_acticity_xplain(log, product_id, headers) # 活动说明 # 活动说明信息
- video_url = data_info.get('liveBackPath')
- sold_data = {
- "product_id": product_id,
- "no": no,
- "title": title,
- "img": img,
- "price_sale": price_sale,
- "original_price": original_price,
- "total_price": total_price,
- "sale_num": sale_num,
- "play_way": play_way,
- "spec_config": spec_config,
- "spec_config_count": spec_config_count,
- "sheets_number": sheets_number,
- "state": state,
- "shop_id": shop_id,
- "shop_name": shop_name,
- "category_id": category_id,
- "on_sale_time": on_sale_time,
- "end_time": end_time,
- "finish_time": finish_time,
- "begin_live_time": begin_live_time,
- "live_complete_time": live_complete_time,
- "explain_name": explain_name,
- "explain_info": explain_info,
- "video_url": video_url
- }
- data_list.append(sold_data)
- sql_product_id_list.append(product_id)
- # print(data_list)
- if data_list:
- sql_pool.insert_many(table="kawan_product_record", data_list=data_list)
- def get_sold_one_page(log, shopId, headers, page_num=1):
- log.debug(f'{inspect.currentframe().f_code.co_name} Request page_num: {page_num}')
- url = "https://app.cardplayd.com/app/system/shopInfo/collageList"
- params = {
- "pageNum": str(page_num),
- "pageSize": "10",
- "status": "6",
- "timeLimit": "true",
- "shopId": shopId
- }
- nonce, a, sign = get_sign(url)
- headers.update(timestamp=a, nonce=nonce, signature=sign)
- # response = requests.get(url, headers=headers, params=params)
- response = make_request(log, 'GET', url, params=params, headers=headers)
- # print(response)
- return response
- def get_sold_list(log, sql_pool, shopId, shop_name, sql_product_id_list, headers):
- page_num = 1
- len_all_sold = 0
- log.debug(f'{inspect.currentframe().f_code.co_name} Requesting with shopId: {shopId}.............')
- while True:
- response_json = get_sold_one_page(log, shopId, headers, page_num)
- if response_json is None:
- log.error("Failed to fetch sold list. Exiting...")
- break
- data = response_json.get('data', {})
- total_solds = response_json.get('total', 0)
- if total_solds == 0:
- log.warning(
- f"Warning {inspect.currentframe().f_code.co_name}: total_solds == 0, shop_id:{shopId}没有已售数据")
- break
- sold_info_list = data.get('dataInfo', [])
- # print(sold_info_list)
- if not sold_info_list:
- log.warning(
- f"Warning {inspect.currentframe().f_code.co_name}: sold_info_list为空, shop_id:{shopId}没有已售数据")
- break
- len_all_sold += len(sold_info_list)
- # 解析已售列表
- parse_sold_list(log, sql_pool, sold_info_list, sql_product_id_list, headers, shop_name)
- if not sold_info_list or len(sold_info_list) < 10:
- log.info("No more sold_info_list found. Stopping requests.")
- break
- page_num += 1
- # 如果当前已获取的店铺数量达到或超过总数量total,停止请求
- if len_all_sold >= total_solds:
- log.info("Total sold_info_list fetched. Stopping requests.")
- break
- log.info(f"Total sold_info_list fetched: {len_all_sold}")
- # ----------------------------------------------------------------------------------------------------------------------
- def get_product_detail(log, product_id, headers):
- # 暂时用不到 备用
- log.debug(f'{inspect.currentframe().f_code.co_name} Request product_id: {product_id}')
- url = "https://app.cardplayd.com/app/system/cardCollage/getCardCollageInfoById"
- params = {
- # "collageId": "1911391864602927105"
- "collageId": product_id
- }
- nonce, a, sign = get_sign(url)
- headers.update(timestamp=a, nonce=nonce, signature=sign)
- response = requests.get(url, headers=headers, params=params)
- print(response.text)
- print(response)
- # ----------------------------------------------------------------------------------------------------------------------
- def get_player_list(log, cardCollageId):
- log.debug(f'{inspect.currentframe().f_code.co_name} Request cardCollageId: {cardCollageId}')
- url = "https://app.cardplayd.com/app/system/cardReport/getCarmiPublicityVoByTypeList"
- # data = {
- # "F9gls72L1UmU0/fdUzTFS3ry8fQCShi/nU0HmHsW3WtEtgxZ9j3kG2SQKu3iQ3FDtBlp4bnHXXxDZqeXjyEiEICm1Xo4QJTYKIB9kijJy3mA2V2Ayt2X5Rqf+eipjEX+5ES+7D3gZdEmcdT9gPOjjn69z4hqnweX3thbvg5/LXJ1531bkv/otiMYFshgbbMQ51el/Tlh20zDkpj952Y8Gg": "="
- # }
- # data = "DuDAqydlmgNlK/1CtwT2hIVdQuBreX0MQC0hjznCSJyH3ZIKgyk7yEK1+Fs3E3eFoR9kKSrQnDVis5jh0SwKmDlQ9cEdQMPa1facZd5asXV10oQrud4aONB4/RjyQh/iNL7tWTIT2HZCtwJIz61kA/6kRqIIpnBfoOkFTegD6TvzG1XhmYlMcZ70PWIpF4o+VMYhAVyTFfsLa7kBFJUdqA=="
- player_headers = {
- "User-Agent": "Dart/3.6 (dart:io)",
- "Accept-Encoding": "gzip",
- "Content-Type": "application/json",
- "authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiJhcHBMb2dpbiIsImxvZ2luSWQiOiJhcHBfdXNlcjoxOTExNjI3MDU4NDk2ODM5NjgyIiwicm5TdHIiOiJoUTNtS2VwQ210RWZ4VFkzQVNIUTcxV1RRZlo1ajBkNCIsImNsaWVudGlkIjoiNDI4YTgzMTBjZDQ0Mjc1N2FlNjk5ZGY1ZDg5NGYwNTEiLCJ1c2VySWQiOjE5MTE2MjcwNTg0OTY4Mzk2ODIsImludml0ZUNvZGUiOiIyMDc3MTYiLCJwaG9uZSI6IjEzMDE0NjE3NjE0In0.IuWoS8kCmG4OQFh1XINJOHpbeKMZKlMmticVglAVF_Y",
- "content-language": "zh_CN",
- # "encrypt-key": "n8XhkgeVYg26D8/nY3MGThxNzKI59EMd69AjUF3Jk5ZT9ixwo21PABLWhwLMJFuSXqASVsUaq2KhUnjsaaIXDA==",
- "app-version": "1.0.12",
- # "isencrypt": "true",
- # "isencrypt": "false",
- }
- # data = "916L0IKDzAb1hrnjyzFDCH+prEuxPR0LqfU5m79fYlfZTCvFQhehf43vS0P9Gz91+ySAFH8cvuIaC8f2A6Awo3HXmjJY4GzUXPTDNNehEgMugpVAXsS1ly9tWuWgQp0nnZuFZzWL281CNuo9cY8XkrcyL9p2QqVs5GDNnSFNi2Y8LRPk+1aiED2n+rvY7j0stupez5m9+1AcNGAUyKO/hQ=="
- # data = "eyJjYXJkQ29sbGFnZUlkIjogIjE5MTEzOTE4NjQ2MDI5MjcxMDUiLCJwYWdlU2l6ZSI6ICIxMCIsInBhZ2VOdW0iOiAiMSIsImVuY3J5cHQta2V5IjogIm44WGhrZ2VWWWcyNkQ4L25ZM01HVGh4TnpLSTU5RU1kNjlBalVGM0prNVpUOWl4d28yMVBBQkxXaHdMTUpGdVNYcUFTVnNVYXEyS2hVbmpzYWFJWERBPT0ifQ=="
- # data = {"type": "1", "cardCollageId": "1906194540964519937", "userAnonymous": None, "pageSize": 100, "pageNum": 1,
- # "isAsc": "", "orderByColumn": "create_time", "filterInfo": ""}
- data = {"type": "1", "cardCollageId": cardCollageId, "userAnonymous": None, "pageSize": 100, "pageNum": 1,
- "isAsc": "", "orderByColumn": "create_time", "filterInfo": ""}
- nonce, a, sign = get_sign(url)
- player_headers.update(timestamp=a, nonce=nonce, signature=sign)
- response = requests.post(url, headers=player_headers, data=data)
- # response = make_request(log, 'POST', url, headers=headers, data=data)
- print(response.text)
- print(response)
- # ----------------------------------------------------------------------------------------------------------------------
- def get_report_one_page(log, collageId, headers, page_num=1):
- url = "https://app.cardplayd.com/app/system/cardReport/getOpenReportInfo"
- params = {
- "cardCollageId": collageId,
- "pageSize": "10",
- "pageNum": str(page_num)
- }
- nonce, a, sign = get_sign(url)
- headers.update(timestamp=a, nonce=nonce, signature=sign)
- # response = requests.get(url, headers=headers, params=params)
- response = make_request(log, 'GET', url, headers=headers, params=params)
- # print(response)
- return response
- def parse_report_list(sql_pool, report_info_list, collageId):
- data_list = []
- for report_info in report_info_list:
- data = {
- "product_id": collageId,
- "card_name": report_info.get("carmiInfo"),
- "open_card_time": report_info.get("openCardTime"),
- "imgs": report_info.get("frontImagePath")
- }
- data_list.append(data)
- if data_list:
- sql_pool.insert_many(table="kawan_report_record", data_list=data_list)
- def get_report_list(log, sql_pool, collageId, headers):
- page_num = 1
- len_all_report = 0
- while True:
- response_json = get_report_one_page(log, collageId, headers, page_num)
- if response_json is None:
- log.error("Failed to fetch report list. Exiting...")
- break
- data = response_json.get('data', {})
- total_reports = data.get('total', 0)
- if total_reports == 0:
- log.warning(
- f"Warning {inspect.currentframe().f_code.co_name}: {response_json['msg']}, collageId:{collageId}没有 report 数据")
- break
- report_info_list = data.get('otherCardReportResultList', [])
- if not report_info_list:
- log.warning(
- f"Warning {inspect.currentframe().f_code.co_name}: {response_json['msg']}, collageId:{collageId}没有 report 数据")
- break
- len_all_report += len(report_info_list)
- parse_report_list(sql_pool, report_info_list, collageId)
- if not report_info_list or len(report_info_list) < 10:
- log.info("No more report_info_list found. Stopping requests.")
- break
- page_num += 1
- if len_all_report >= total_reports:
- log.info("Total report_info_list fetched. Stopping requests.")
- break
- log.info(f"Total report_info_list fetched: {len_all_report}")
- sql_pool.update_one("update kawan_product_record set report_state = 1 where product_id = %s", (collageId,))
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def kawan_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- sql_token = sql_pool.select_one("SELECT token FROM kawan_token")
- # sql_token_str = 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiJhcHBMb2dpbiIsImxvZ2luSWQiOiJhcHBfdXNlcjoxOTExNjI0MDUzMzQ5MzI2ODQ5Iiwicm5TdHIiOiJvZURBT2QwTEFvYmlmTFR2Y0xVVXpNQ0haaWVLWXRrUyIsImNsaWVudGlkIjoiNDI4YTgzMTBjZDQ0Mjc1N2FlNjk5ZGY1ZDg5NGYwNTEiLCJ1c2VySWQiOjE5MTE2MjQwNTMzNDkzMjY4NDksImludml0ZUNvZGUiOiI0NjI2MTgiLCJwaG9uZSI6IjE5NTIxNTAwODUwIn0.PY7l7OvS2fOHsgl-YsHcEy1TyKsIgkmSxSV4RZxWaxc'
- headers = {
- "User-Agent": "Dart/3.6 (dart:io)",
- "Accept-Encoding": "gzip",
- # "authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiJhcHBMb2dpbiIsImxvZ2luSWQiOiJhcHBfdXNlcjoxOTExNjI3MDU4NDk2ODM5NjgyIiwicm5TdHIiOiJoUTNtS2VwQ210RWZ4VFkzQVNIUTcxV1RRZlo1ajBkNCIsImNsaWVudGlkIjoiNDI4YTgzMTBjZDQ0Mjc1N2FlNjk5ZGY1ZDg5NGYwNTEiLCJ1c2VySWQiOjE5MTE2MjcwNTg0OTY4Mzk2ODIsImludml0ZUNvZGUiOiIyMDc3MTYiLCJwaG9uZSI6IjEzMDE0NjE3NjE0In0.IuWoS8kCmG4OQFh1XINJOHpbeKMZKlMmticVglAVF_Y",
- "authorization": sql_token[0],
- # "authorization": sql_token_str,
- "content-type": "application/json",
- "app-version": "1.0.12",
- "content-language": "zh_CN",
- # "nonce": "ceac8160-18e3-11f0-bb6e-95de6e5ff903",
- "isencrypt": "false"
- }
- # 获取 商家 列表
- try:
- sql_shop_list = sql_pool.select_all("SELECT shop_id FROM kawan_shop_record")
- sql_shop_list = [item[0] for item in sql_shop_list]
- get_shop_list(log, sql_pool, sql_shop_list, headers)
- sql_shop_list.clear()
- except Exception as e:
- log.error(f"Error fetching last_product_id: {e}")
- time.sleep(5)
- # 获取已售商品
- try:
- sql_shop_id_list = sql_pool.select_all("SELECT shop_id, shop_name FROM kawan_shop_record")
- # sql_shop_id_list = [item[0] for item in sql_shop_id_list]
- # 获取 product_id_list
- sql_product_id_list = sql_pool.select_all("SELECT product_id FROM kawan_product_record")
- sql_product_id_list = [item[0] for item in sql_product_id_list]
- for shop_id_name in sql_shop_id_list:
- shop_id = shop_id_name[0]
- shop_name = shop_id_name[1]
- log.info(f"开始获取商家:{shop_id} 已售商品")
- try:
- get_sold_list(log, sql_pool, shop_id, shop_name, sql_product_id_list, headers)
- except Exception as e:
- log.error(f"Error fetching get_sold_list for shop_id:{shop_id}, {e}")
- sql_product_id_list.clear()
- except Exception as e:
- log.error(f"Error fetching sql_shop_id_list: {e}")
- time.sleep(5)
- # 获取拆卡报告
- try:
- sql_product_id_list_for_report = sql_pool.select_all(
- "SELECT product_id FROM kawan_product_record WHERE report_state = 0")
- sql_product_id_list_for_report = [item[0] for item in sql_product_id_list_for_report]
- for product_id in sql_product_id_list_for_report:
- log.info(f"开始获取商品:{product_id} 拆卡报告")
- try:
- get_report_list(log, sql_pool, product_id, headers)
- except Exception as e:
- log.error(f"Error fetching reports for product_id:{product_id}, {e}")
- sql_pool.update_one("update kawan_product_record set report_state = 2 where product_id = %s",
- (product_id,))
- except Exception as e:
- log.error(f"Error fetching reports: {e}")
- # time.sleep(5)
- # 获取商品玩家
- # try:
- # get_player_list(log, sql_pool)
- # except Exception as e:
- # log.error(f"Error fetching players: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- def schedule_task():
- """
- 爬虫模块的启动文件
- """
- # 立即运行一次任务
- # kawan_main(log=logger)
- # 设置定时任务
- schedule.every().day.at("00:01").do(kawan_main, log=logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- # get_shop_list(logger, None)
- # get_sold_list(logger)
- # get_acticity_xplain(logger, '1910557299676192770')
- # get_product_detail(logger, '1910557299676192770')
- # get_player_list(logger)
- # get_report_list(logger, '1910557299676192770')
- # kawan_main(logger)
- schedule_task()
|