# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/6/30 19:15 # -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/6/19 11:27 import time import pymysql import utils import random import inspect import datetime import schedule from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/sg_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") CRAWL_CATEGORY = '卡牌' def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_category_list(log, sql_pool, sql_p_list): """ 获取类别列表 :param log: :param sql_pool: :param sql_p_list: """ log.debug("Getting category list............") url = "https://api.qiandao.com/c2c-web/v1/stock-order/bargin-v3" data = { "tagIds": [ 1422849, 1132249, 1622975, 1583218, 1706130, 1113003, 1000375, 1572324, 1436291, 1560387, 1276355, 1423075, 1583179, 42474, 324, 1446179, 1533642, 1542944, 1568717, 1572231, 1532176, 1506156, 1568683, 1541805, 1277151, 1529427, 1514792, 1515959, 1541749 ], "withStockCount": 4 } resp_json = utils.request_post_data(log, url, data) if not resp_json: log.error("get_category_list error") raise RuntimeError("get_category_list error") parse_category_list(log, resp_json, sql_pool, sql_p_list) def parse_category_list(log, resp_json, sql_pool, sql_p_list): """ 解析类别列表数据 :param log: :param resp_json: :param sql_pool: :param sql_p_list: """ items = resp_json.get("data", []) for item in items: tag_id = item.get("tagId") if tag_id: try: get_product_list(log, tag_id, sql_pool, sql_p_list) except Exception as e: log.error(f'parse_category_list to get_product_list error: {e}') else: log.error("tag_id is empty") def get_product_list(log, tag_id, sql_pool, sql_p_list): """ 获取商品列表 :param log: :param tag_id: :param sql_pool: :param sql_p_list: """ log.debug(f" {inspect.currentframe().f_code.co_name} for tag_id:{tag_id}.....") page = 1 max_page = 500 total_count = 0 while page <= max_page: len_item, totalCount = get_product_one_page(log, tag_id, page, sql_pool, sql_p_list) if len_item < 10: log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------") break total_count += len_item if total_count >= int(totalCount): log.debug(f"total_count: {total_count} totalCount: {totalCount}") break page += 1 # time.sleep(random.uniform(0.1, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_product_one_page(log, tag_id, page_num, sql_pool, sql_p_list): """ 获取产品 单页数据 :param log: :param tag_id: :param page_num: :param sql_pool: :param sql_p_list: :return: """ log.debug(f"Getting product list for tag_id: {tag_id} page_num: {page_num}............") url = "https://api.qiandao.com/c2c-web/v1/stock-order/stock-order-zone" data = { # "tagId": "1132249", "tagId": tag_id, "limit": 10, "offset": (page_num - 1) * 10, "isWished": False, "sort": "SELL_COUNT_DESC" } resp_json = utils.request_post_data(log, url, data) if not resp_json: log.error("get_product_one_page error") raise RuntimeError("get_product_one_page error") items = resp_json.get("data", {}).get("items", []) totalCount = resp_json.get("data", {}).get("totalCount", 0) parse_product_data(log, items, tag_id, sql_pool, sql_p_list) return len(items), totalCount def parse_product_data(log, items, tag_id, sql_pool, sql_p_list): """ 解析产品数据 :param log: :param items: :param tag_id: :param sql_pool: :param sql_p_list: """ # info_list = [] for item in items: spu_id = item.get("spuId", "") if spu_id in sql_p_list: log.info(f"spu_id:{spu_id} is exist, skip.......") continue spu_name = item.get("name", "") tag_name = item.get("tagName", "") mega_spu_name = item.get("megaSpuName", "") data_dict = { "tag_id": tag_id, "tag_name": tag_name, "spu_id": spu_id, "spu_name": spu_name, "mega_spu_name": mega_spu_name, } # print(data_dict) # info_list.append(data_dict) try: sql_pool.insert_one_or_dict(table="qiandao_sg_category_record", data=data_dict) except pymysql.err.IntegrityError as e: if "Duplicate entry" in str(e): log.warning("存在重复的 spu_id,跳过插入") else: # raise e log.warning(f"{str(e)[:200]}") sql_p_list.append(spu_id) # if info_list: # try: # sql_pool.insert_many(table="qiandao_sg_category_record", data_list=info_list) # except Exception as e: # log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) # ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- def get_sg_history_list(log, sql_pool, sql_spu_id): """ 获取商品历史交易记录 :param log: :param sql_pool: :param sql_spu_id: :return: """ page = 1 max_page = 50 # 获取该 spu 最后一条的时间(最新交易时间) max_trading_time = sql_pool.select_one( f"SELECT MAX(trading_time) FROM qiandao_sg_sold_record WHERE spu_id = '{sql_spu_id}'" ) max_trading_time = max_trading_time[0] if max_trading_time else None while page <= max_page: items = get_sg_history_sold_one_page(log, sql_spu_id, page) if not items: log.debug(f'--------------- page {page} has no items, break ---------------') break has_new_data = parse_sold_data(log, items, sql_pool, sql_spu_id, max_trading_time) if not has_new_data: log.debug(f'--------------- page {page} has no newer data, break ---------------') break if len(items) < 20: log.debug(f'--------------- page {page} has less than 20 items, break ---------------') break page += 1 # time.sleep(random.uniform(0.1, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_sg_history_sold_one_page(log, spu_id, page): """ 获取商品历史交易记录单页数据 :param log: :param spu_id: :param page: :return: """ log.debug(f"get_sg_history_sold_one_page: 开始获取第{page}页.............") url = "https://api.qiandao.com/c2c-web/v1/common/get-spu-trading-history-list" data = { # "spuId": "654833013418492160", "spuId": spu_id, "skuId": "0", # "tradingType": "EXCHANGE", "tradingType": "", "limit": "20", "offset": f"{(page - 1) * 20}" } resp_json = utils.request_post_data(log, url, data, proxy=False) if not resp_json: log.error("get_sg_history_sold_one_page error") raise RuntimeError("get_sg_history_sold_one_page error") items = resp_json.get("data", {}).get("items", []) return items def parse_sold_data(log, items, sql_pool, spu_id, max_trading_time): """ 解析商品历史交易记录数据 :param log: :param items: :param sql_pool: :param spu_id: :param max_trading_time: :return: """ try: info_list = [] has_new_data = False # 是否有新数据 for item in items: tradingTime = item.get("tradingTime", "") trading_time_str = utils.transform_ms(log, tradingTime) if not trading_time_str: continue # 将 trading_time_str 转换为 datetime.datetime try: trading_dt = datetime.datetime.strptime(trading_time_str, '%Y-%m-%d %H:%M:%S') except Exception as e: log.error(f"Error parsing trading_time: {e}") continue # 如果 max_trading_time 为 None,全部保留 if max_trading_time is None: # log.debug('max_trading_time is None') has_new_data = True elif trading_dt > max_trading_time: log.debug(f'trading_dt: {trading_dt} > max_trading_time: {max_trading_time}, 保留') has_new_data = True else: log.debug(f'trading_dt: {trading_dt} <= max_trading_time: {max_trading_time}, 跳过旧数据') break # 跳过旧数据 buyer_name = item.get("buyerName", "") product_name = item.get("productName", "") trading_volume = item.get("tradingVolume", "") trading_amount = item.get("tradingAmount", "") is_acquisition_ = item.get("isAcquisition", "") is_acquisition = 1 if is_acquisition_ else 0 tradingType = item.get("tradingType") # 没有闪电标的是: "C2C", 有闪电标的是: "EXCHANGE" data_dict = { "spu_id": spu_id, "buyer_name": buyer_name, "product_name": product_name, "trading_volume": trading_volume, "trading_amount": trading_amount, "is_acquisition": is_acquisition, "trading_time": trading_time_str, "trading_type": tradingType, "crawl_category": CRAWL_CATEGORY } # print(data_dict) info_list.append(data_dict) # try: # sql_pool.insert_one_or_dict(table="qiandao_sg_sold_record", data=data_dict) # except Exception as e: # log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) if info_list: sql_pool.insert_many(table="qiandao_sg_sold_record", data_list=info_list) return has_new_data # 返回是否有新数据 except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} error: {e}") return False @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def qd_sg_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 获取 闪购 已售 log.info("-------------------------------------- 开始获取 闪购 已售 --------------------------------------") try: sql_spu_id_list = sql_pool.select_all( f"SELECT spu_id FROM qiandao_sg_category_record WHERE crawl_category = '{CRAWL_CATEGORY}'") sql_spu_id_list = [item[0] for item in sql_spu_id_list] # start_processing = False # 控制是否开始处理的标志 for sql_spu_id in sql_spu_id_list: # 715450768790066187 # if sql_spu_id == "776924231970643245": # start_processing = True # 遇到目标 spu_id 后开启处理 # # if not start_processing: # continue # 在遇到目标 spu_id 前跳过 log.info(f"开始获取:{sql_spu_id} 已售商品") try: get_sg_history_list(log, sql_pool, sql_spu_id) except Exception as e: log.error(f"Error fetching get_sold_list for sql_spu_id:{sql_spu_id}, {e}") sql_spu_id_list.clear() except Exception as e: log.error(f"Error fetching sql_shop_id_list: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') # EmailSender().send(subject="【千岛 - 爬虫通知】今日任务已完成", # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。") if __name__ == '__main__': qd_sg_main(log=logger)