# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/6/30 17:59 # -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/6/19 14:12 import time from datetime import datetime import utils import random import inspect import pymysql import schedule from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/bag_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") def get_luckybag_list(log, sql_pool, sql_luckybag_list): page = 1 max_page = 500 total_count = 0 while page <= max_page: len_item, totalCount = get_luckybag_one_page(log, page, sql_pool,sql_luckybag_list) if len_item < 10: log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------") break total_count += len_item if total_count >= int(totalCount): log.debug(f"total_count: {total_count} totalCount: {totalCount}") break page += 1 time.sleep(random.uniform(0.1, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_luckybag_one_page(log, page, sql_pool,sql_luckybag_list): log.debug(f" {inspect.currentframe().f_code.co_name} for page:{page}.....") url = "https://api.qiandao.cn/c2c-web/v1/luckybag/list" params = { "limit": "10", # "offset": "0", "offset": str((page - 1) * 10), "hostId": "228946998408215136", "orderBy": "updatedAt.desc", "filterFollow": "false", "tagIds": "1429578", "typeIds": "1405184,1132249,1000375,49695,1443553,1002041" } resp_json = utils.request_get_data(log, url, params) if not resp_json: log.error("get_luckybag_one_page error") raise RuntimeError("get_luckybag_one_page error") rows = resp_json.get("data", {}).get("rows", []) total_count = resp_json.get("data", {}).get("count", 0) try: parse_luckybag_data(log, rows, sql_pool,sql_luckybag_list) except Exception as e: log.error(f"parse_luckybag_data error: {e}") return len(rows), total_count def parse_luckybag_data(log, items, sql_pool,sql_luckybag_list): # info_list = [] for item in items: luckybag_id = item.get("id") if luckybag_id in sql_luckybag_list: log.info(f"luckybag_id:{luckybag_id} is exist, skip.......") continue name = item.get("name") original_price = item.get("price") ice_breaking_price = item.get("iceBreakingPrice") seller_id = item.get("sellerId") seller_name = item.get("sellerName") remain_count = item.get("remainCount") # 剩下数量 total_count = item.get("totalCount") # 总数 createdAt = item.get("createdAt") created_at = utils.transform_ms(log, createdAt) updatedAt = item.get("updatedAt") updated_at = utils.transform_ms(log, updatedAt) info_dict = { "luckybag_id": luckybag_id, "luckybag_name": name, "original_price": original_price, "ice_breaking_price": ice_breaking_price, "seller_id": seller_id, "seller_name": seller_name, "remain_count": remain_count, "total_count": total_count, "created_at": created_at, "updated_at": updated_at } # print(info_dict) # info_list.append(info_dict) try: sql_pool.insert_one_or_dict( table="qiandao_luckybag_list_record", data=info_dict ) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) sql_luckybag_list.append(luckybag_id) # if info_list: # try: # sql_pool.insert_many(table="qiandao_luckybag_list_record", data_list=info_list) # except pymysql.err.IntegrityError as e: # if "Duplicate entry" in str(e): # logger.warning("存在重复的 luckybag_id,跳过插入") # else: # raise e # ------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_luckybag_detail(log, luckyBagId: str): """ 获取福袋详情 :param luckyBagId: luckyBagId :param log: logger对象 """ url = "https://api.qiandao.cn/c2c-web/v1/luckybag" params = { # "luckyBagId": "876352952506283967" "luckyBagId": luckyBagId } resp_json = utils.request_get_data(log, url, params) if not resp_json: log.error("get_luckybag_detail error") raise RuntimeError("get_luckybag_detail error") return resp_json def parse_luckybag_detail_basic_info_data(log, resp_json, sql_pool, luckyBagId): try: basic_info = resp_json.get("data", {}).get("basicInfo", {}) sell_end_time = basic_info.get("sellEndTime") sell_end_time = utils.transform_ms(log, sell_end_time) # moderate_status = basic_info.get("moderateStatus") status = basic_info.get("status") # 更新 qiandao_luckybag_list_record 表 updata_dict = { "sell_end_time": sell_end_time, # "moderate_status": moderate_status, "status": status } # print(f"updata_dict:{updata_dict}") sql_pool.update_one_or_dict( table="qiandao_luckybag_list_record", data=updata_dict, condition={ "luckybag_id": luckyBagId } ) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} for basic_info error: {e}") def parse_luckybag_detail_product_list_data(log, resp_json, sql_pool, luckybag_id): try: product_list = resp_json.get("data", {}).get("products", []) # info_product_list = [] for product in product_list: product_id = product.get("id") product_name = product.get("name") image = product.get("image") reference_price = product.get("referencePrice") # 参考价格 spu_id = product.get("spuId") spec_list = product.get("specValues", [{}]) spec_name_list = [spec.get("name") for spec in spec_list] spec_name = "".join(spec_name_list) # 规格名称 spec_value_list = [spec.get("value") for spec in spec_list] spec_value = "".join(spec_value_list) # 规格值 remain_count = product.get("remainCount") # 剩余数量 total_count = product.get("totalCount") # 总数 product_dict = { "luckybag_id": luckybag_id, "product_id": product_id, "product_name": product_name, "image": image, "reference_price": reference_price, "spu_id": spu_id, "spec_name": spec_name, "spec_value": spec_value, "remain_count": remain_count, "total_count": total_count } # print(f"product_dict:{product_dict}") try: sql_pool.insert_one_or_dict(table="qiandao_luckybag_products_record", data=product_dict) sql_pool.update_one_or_dict( table="qiandao_luckybag_list_record", data={ "detail_state": 1 }, condition={ "luckybag_id": luckybag_id } ) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} for product_list error: {e}") # info_product_list.append(product_dict) # if info_product_list: # try: # sql_pool.insert_many(table="qiandao_luckybag_products_record", data_list=info_product_list) # sql_pool.update_one_or_dict( # table="qiandao_luckybag_list_record", # data={ # "detail_state": 1 # }, # condition={ # "luckybag_id": luckybag_id # } # ) # except pymysql.err.IntegrityError as e: # if "Duplicate entry" in str(e): # logger.warning("存在重复的 product_id,跳过插入") # else: # raise e except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} for product_list error: {e}") def parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckybag_id): try: order_list = [] # 先查询该福袋 购买数据的最大时间 max_luckybay_time = sql_pool.select_one( query="SELECT MAX(order_created_at) FROM qiandao_luckybag_orders_record WHERE luckybag_id = %s", args=(luckybag_id,)) max_luckybay_time = max_luckybay_time[0] if max_luckybay_time else None records = resp_json.get("data", {}).get("records", []) for record in records: product_id = record.get("productId") # product_name = record.get("productName") serial_number = record.get("serialNumber") # 序列号 order_id = record.get("orderId") trading_time = record.get("orderCreatedAt") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_luckybay_time and trading_time <= max_luckybay_time: log.debug(f"trading_time: {trading_time_str} <= max_luckybay_time: {max_luckybay_time}, 跳过旧数据") break buyerNickname = record.get("buyerNickname") order_dict = { "luckybag_id": luckybag_id, "product_id": product_id, "order_id": order_id, "serial_number": serial_number, "order_created_at": trading_time_str, "buyer_nickname": buyerNickname } # print(f"order_dict:{order_dict}") # try: # sql_pool.insert_one_or_dict(table="qiandao_luckybag_orders_record", data=order_dict) # sql_pool.update_one_or_dict( # table="qiandao_luckybag_list_record", # data={ # "order_state": 1 # }, # condition={ # "luckybag_id": luckybag_id # } # ) # except Exception as e: # log.error(f"{inspect.currentframe().f_code.co_name} for order_list error: {e}") order_list.append(order_dict) if order_list: try: sql_pool.insert_many(table="qiandao_luckybag_orders_record", data_list=order_list) sql_pool.update_one_or_dict( table="qiandao_luckybag_list_record", data={ "order_state": 1 }, condition={ "luckybag_id": luckybag_id } ) except pymysql.err.IntegrityError as e: if "Duplicate entry" in str(e): logger.warning("存在重复的 order_id,跳过插入") else: # raise e log.warning(f"{str(e)[:200]}") sql_pool.update_one_or_dict( table="qiandao_luckybag_list_record", data={ "order_state": 2 }, condition={ "luckybag_id": luckybag_id } ) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} for order_list error: {e}") def get_detail_main(log, sql_pool): """ 1. 将 qiandao_luckybag_list_record 表中 detail_state = 0 的数据进行更新 插入 product list 数据 共用一个状态 2. 将 qiandao_luckybag_list_record 表中 order_state = 0 并且 status 不是 'ON_SALE' 的数据进行更新 :param log: logger对象 :param sql_pool: MySQLConnectionPool对象 """ sql_detail_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record WHERE order_state = 0") sql_detail_list = [item[0] for item in sql_detail_list] for luckyBagId in sql_detail_list: try: resp_json = get_luckybag_detail(log, luckyBagId) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} for sql_detail_list: {luckyBagId}, error: {e}") resp_json = {} # parse_luckybag_detail_basic_info_data(log, resp_json, sql_pool, luckyBagId) # parse_luckybag_detail_product_list_data(log, resp_json, sql_pool, luckyBagId) parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckyBagId) # --------------------------------------------------------------------------------------- # order 信息待后续分析 status 后 再运行 # sql_order_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record WHERE order_state = 0 AND status != 'ON_SALE'") # sql_order_list = [item[0] for item in sql_order_list] # # for luckyBagId2 in sql_order_list: # try: # resp_json = get_luckybag_detail(log, luckyBagId2) # except Exception as e: # log.error(f"{inspect.currentframe().f_code.co_name} for sql_order_list: {luckyBagId2}, error: {e}") # resp_json = {} # # parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckyBagId2) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def qd_lb_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 获取 福袋 列表 log.info("-------------------------------------- 开始 获取 福袋 列表 --------------------------------------") # sql_luckybag_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record") # sql_luckybag_list = [item[0] for item in sql_luckybag_list] # try: # get_luckybag_list(log, sql_pool,sql_luckybag_list) # except Exception as e: # log.error(f"Error fetching get_luckybag_list: {e}") # # sql_luckybag_list.clear() # # time.sleep(5) # 获取 福袋 详情 log.info("-------------------------------------- 开始获取 福袋 详情 --------------------------------------") try: get_detail_main(log, sql_pool) except Exception as e: log.error(f"Error fetching get_detail_main: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') # EmailSender().send(subject="【千岛 - 爬虫通知】今日任务已完成", # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。") if __name__ == '__main__': qd_lb_main(logger)