| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/6/30 19:15
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/6/19 11:27
- import time
- import pymysql
- import utils
- import random
- import inspect
- import datetime
- import schedule
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/sg_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- CRAWL_CATEGORY = '卡牌'
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_category_list(log, sql_pool, sql_p_list):
- """
- 获取类别列表
- :param log:
- :param sql_pool:
- :param sql_p_list:
- """
- log.debug("Getting category list............")
- url = "https://api.qiandao.com/c2c-web/v1/stock-order/bargin-v3"
- data = {
- "tagIds": [
- 1422849,
- 1132249,
- 1622975,
- 1583218,
- 1706130,
- 1113003,
- 1000375,
- 1572324,
- 1436291,
- 1560387,
- 1276355,
- 1423075,
- 1583179,
- 42474,
- 324,
- 1446179,
- 1533642,
- 1542944,
- 1568717,
- 1572231,
- 1532176,
- 1506156,
- 1568683,
- 1541805,
- 1277151,
- 1529427,
- 1514792,
- 1515959,
- 1541749
- ],
- "withStockCount": 4
- }
- resp_json = utils.request_post_data(log, url, data)
- if not resp_json:
- log.error("get_category_list error")
- raise RuntimeError("get_category_list error")
- parse_category_list(log, resp_json, sql_pool, sql_p_list)
- def parse_category_list(log, resp_json, sql_pool, sql_p_list):
- """
- 解析类别列表数据
- :param log:
- :param resp_json:
- :param sql_pool:
- :param sql_p_list:
- """
- items = resp_json.get("data", [])
- for item in items:
- tag_id = item.get("tagId")
- if tag_id:
- try:
- get_product_list(log, tag_id, sql_pool, sql_p_list)
- except Exception as e:
- log.error(f'parse_category_list to get_product_list error: {e}')
- else:
- log.error("tag_id is empty")
- def get_product_list(log, tag_id, sql_pool, sql_p_list):
- """
- 获取商品列表
- :param log:
- :param tag_id:
- :param sql_pool:
- :param sql_p_list:
- """
- log.debug(f" {inspect.currentframe().f_code.co_name} for tag_id:{tag_id}.....")
- page = 1
- max_page = 500
- total_count = 0
- while page <= max_page:
- len_item, totalCount = get_product_one_page(log, tag_id, page, sql_pool, sql_p_list)
- if len_item < 10:
- log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
- break
- total_count += len_item
- if total_count >= int(totalCount):
- log.debug(f"total_count: {total_count} totalCount: {totalCount}")
- break
- page += 1
- # time.sleep(random.uniform(0.1, 1))
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_product_one_page(log, tag_id, page_num, sql_pool, sql_p_list):
- """
- 获取产品 单页数据
- :param log:
- :param tag_id:
- :param page_num:
- :param sql_pool:
- :param sql_p_list:
- :return:
- """
- log.debug(f"Getting product list for tag_id: {tag_id} page_num: {page_num}............")
- url = "https://api.qiandao.com/c2c-web/v1/stock-order/stock-order-zone"
- data = {
- # "tagId": "1132249",
- "tagId": tag_id,
- "limit": 10,
- "offset": (page_num - 1) * 10,
- "isWished": False,
- "sort": "SELL_COUNT_DESC"
- }
- resp_json = utils.request_post_data(log, url, data)
- if not resp_json:
- log.error("get_product_one_page error")
- raise RuntimeError("get_product_one_page error")
- items = resp_json.get("data", {}).get("items", [])
- totalCount = resp_json.get("data", {}).get("totalCount", 0)
- parse_product_data(log, items, tag_id, sql_pool, sql_p_list)
- return len(items), totalCount
- def parse_product_data(log, items, tag_id, sql_pool, sql_p_list):
- """
- 解析产品数据
- :param log:
- :param items:
- :param tag_id:
- :param sql_pool:
- :param sql_p_list:
- """
- # info_list = []
- for item in items:
- spu_id = item.get("spuId", "")
- if spu_id in sql_p_list:
- log.info(f"spu_id:{spu_id} is exist, skip.......")
- continue
- spu_name = item.get("name", "")
- tag_name = item.get("tagName", "")
- mega_spu_name = item.get("megaSpuName", "")
- data_dict = {
- "tag_id": tag_id,
- "tag_name": tag_name,
- "spu_id": spu_id,
- "spu_name": spu_name,
- "mega_spu_name": mega_spu_name,
- }
- # print(data_dict)
- # info_list.append(data_dict)
- try:
- sql_pool.insert_one_or_dict(table="qiandao_sg_category_record", data=data_dict)
- except pymysql.err.IntegrityError as e:
- if "Duplicate entry" in str(e):
- log.warning("存在重复的 spu_id,跳过插入")
- else:
- # raise e
- log.warning(f"{str(e)[:200]}")
- sql_p_list.append(spu_id)
- # if info_list:
- # try:
- # sql_pool.insert_many(table="qiandao_sg_category_record", data_list=info_list)
- # except Exception as e:
- # log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
- # -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- def get_sg_history_list(log, sql_pool, sql_spu_id):
- """
- 获取商品历史交易记录
- :param log:
- :param sql_pool:
- :param sql_spu_id:
- :return:
- """
- page = 1
- max_page = 50
- # 获取该 spu 最后一条的时间(最新交易时间)
- max_trading_time = sql_pool.select_one(
- f"SELECT MAX(trading_time) FROM qiandao_sg_sold_record WHERE spu_id = '{sql_spu_id}'"
- )
- max_trading_time = max_trading_time[0] if max_trading_time else None
- while page <= max_page:
- items = get_sg_history_sold_one_page(log, sql_spu_id, page)
- if not items:
- log.debug(f'--------------- page {page} has no items, break ---------------')
- break
- has_new_data = parse_sold_data(log, items, sql_pool, sql_spu_id, max_trading_time)
- if not has_new_data:
- log.debug(f'--------------- page {page} has no newer data, break ---------------')
- break
- if len(items) < 20:
- log.debug(f'--------------- page {page} has less than 20 items, break ---------------')
- break
- page += 1
- # time.sleep(random.uniform(0.1, 1))
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_sg_history_sold_one_page(log, spu_id, page):
- """
- 获取商品历史交易记录单页数据
- :param log:
- :param spu_id:
- :param page:
- :return:
- """
- log.debug(f"get_sg_history_sold_one_page: 开始获取第{page}页.............")
- url = "https://api.qiandao.com/c2c-web/v1/common/get-spu-trading-history-list"
- data = {
- # "spuId": "654833013418492160",
- "spuId": spu_id,
- "skuId": "0",
- # "tradingType": "EXCHANGE",
- "tradingType": "",
- "limit": "20",
- "offset": f"{(page - 1) * 20}"
- }
- resp_json = utils.request_post_data(log, url, data, proxy=False)
- if not resp_json:
- log.error("get_sg_history_sold_one_page error")
- raise RuntimeError("get_sg_history_sold_one_page error")
- items = resp_json.get("data", {}).get("items", [])
- return items
- def parse_sold_data(log, items, sql_pool, spu_id, max_trading_time):
- """
- 解析商品历史交易记录数据
- :param log:
- :param items:
- :param sql_pool:
- :param spu_id:
- :param max_trading_time:
- :return:
- """
- try:
- info_list = []
- has_new_data = False # 是否有新数据
- for item in items:
- tradingTime = item.get("tradingTime", "")
- trading_time_str = utils.transform_ms(log, tradingTime)
- if not trading_time_str:
- continue
- # 将 trading_time_str 转换为 datetime.datetime
- try:
- trading_dt = datetime.datetime.strptime(trading_time_str, '%Y-%m-%d %H:%M:%S')
- except Exception as e:
- log.error(f"Error parsing trading_time: {e}")
- continue
- # 如果 max_trading_time 为 None,全部保留
- if max_trading_time is None:
- # log.debug('max_trading_time is None')
- has_new_data = True
- elif trading_dt > max_trading_time:
- log.debug(f'trading_dt: {trading_dt} > max_trading_time: {max_trading_time}, 保留')
- has_new_data = True
- else:
- log.debug(f'trading_dt: {trading_dt} <= max_trading_time: {max_trading_time}, 跳过旧数据')
- break # 跳过旧数据
- buyer_name = item.get("buyerName", "")
- product_name = item.get("productName", "")
- trading_volume = item.get("tradingVolume", "")
- trading_amount = item.get("tradingAmount", "")
- is_acquisition_ = item.get("isAcquisition", "")
- is_acquisition = 1 if is_acquisition_ else 0
- tradingType = item.get("tradingType") # 没有闪电标的是: "C2C", 有闪电标的是: "EXCHANGE"
- data_dict = {
- "spu_id": spu_id,
- "buyer_name": buyer_name,
- "product_name": product_name,
- "trading_volume": trading_volume,
- "trading_amount": trading_amount,
- "is_acquisition": is_acquisition,
- "trading_time": trading_time_str,
- "trading_type": tradingType,
- "crawl_category": CRAWL_CATEGORY
- }
- # print(data_dict)
- info_list.append(data_dict)
- # try:
- # sql_pool.insert_one_or_dict(table="qiandao_sg_sold_record", data=data_dict)
- # except Exception as e:
- # log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
- if info_list:
- sql_pool.insert_many(table="qiandao_sg_sold_record", data_list=info_list)
- return has_new_data # 返回是否有新数据
- except Exception as e:
- log.error(f" {inspect.currentframe().f_code.co_name} error: {e}")
- return False
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def qd_sg_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 获取 闪购 已售
- log.info("-------------------------------------- 开始获取 闪购 已售 --------------------------------------")
- try:
- sql_spu_id_list = sql_pool.select_all(
- f"SELECT spu_id FROM qiandao_sg_category_record WHERE crawl_category = '{CRAWL_CATEGORY}'")
- sql_spu_id_list = [item[0] for item in sql_spu_id_list]
- # start_processing = False # 控制是否开始处理的标志
- for sql_spu_id in sql_spu_id_list:
- # 715450768790066187
- # if sql_spu_id == "776924231970643245":
- # start_processing = True # 遇到目标 spu_id 后开启处理
- #
- # if not start_processing:
- # continue # 在遇到目标 spu_id 前跳过
- log.info(f"开始获取:{sql_spu_id} 已售商品")
- try:
- get_sg_history_list(log, sql_pool, sql_spu_id)
- except Exception as e:
- log.error(f"Error fetching get_sold_list for sql_spu_id:{sql_spu_id}, {e}")
- sql_spu_id_list.clear()
- except Exception as e:
- log.error(f"Error fetching sql_shop_id_list: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- # EmailSender().send(subject="【千岛 - 爬虫通知】今日任务已完成",
- # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。")
- if __name__ == '__main__':
- qd_sg_main(log=logger)
|