| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/6/30 17:59
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/6/19 14:12
- import time
- from datetime import datetime
- import utils
- import random
- import inspect
- import pymysql
- import schedule
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/bag_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def get_luckybag_list(log, sql_pool, sql_luckybag_list):
- page = 1
- max_page = 500
- total_count = 0
- while page <= max_page:
- len_item, totalCount = get_luckybag_one_page(log, page, sql_pool,sql_luckybag_list)
- if len_item < 10:
- log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
- break
- total_count += len_item
- if total_count >= int(totalCount):
- log.debug(f"total_count: {total_count} totalCount: {totalCount}")
- break
- page += 1
- time.sleep(random.uniform(0.1, 1))
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_luckybag_one_page(log, page, sql_pool,sql_luckybag_list):
- log.debug(f" {inspect.currentframe().f_code.co_name} for page:{page}.....")
- url = "https://api.qiandao.cn/c2c-web/v1/luckybag/list"
- params = {
- "limit": "10",
- # "offset": "0",
- "offset": str((page - 1) * 10),
- "hostId": "228946998408215136",
- "orderBy": "updatedAt.desc",
- "filterFollow": "false",
- "tagIds": "1429578",
- "typeIds": "1405184,1132249,1000375,49695,1443553,1002041"
- }
- resp_json = utils.request_get_data(log, url, params)
- if not resp_json:
- log.error("get_luckybag_one_page error")
- raise RuntimeError("get_luckybag_one_page error")
- rows = resp_json.get("data", {}).get("rows", [])
- total_count = resp_json.get("data", {}).get("count", 0)
- try:
- parse_luckybag_data(log, rows, sql_pool,sql_luckybag_list)
- except Exception as e:
- log.error(f"parse_luckybag_data error: {e}")
- return len(rows), total_count
- def parse_luckybag_data(log, items, sql_pool,sql_luckybag_list):
- # info_list = []
- for item in items:
- luckybag_id = item.get("id")
- if luckybag_id in sql_luckybag_list:
- log.info(f"luckybag_id:{luckybag_id} is exist, skip.......")
- continue
- name = item.get("name")
- original_price = item.get("price")
- ice_breaking_price = item.get("iceBreakingPrice")
- seller_id = item.get("sellerId")
- seller_name = item.get("sellerName")
- remain_count = item.get("remainCount") # 剩下数量
- total_count = item.get("totalCount") # 总数
- createdAt = item.get("createdAt")
- created_at = utils.transform_ms(log, createdAt)
- updatedAt = item.get("updatedAt")
- updated_at = utils.transform_ms(log, updatedAt)
- info_dict = {
- "luckybag_id": luckybag_id,
- "luckybag_name": name,
- "original_price": original_price,
- "ice_breaking_price": ice_breaking_price,
- "seller_id": seller_id,
- "seller_name": seller_name,
- "remain_count": remain_count,
- "total_count": total_count,
- "created_at": created_at,
- "updated_at": updated_at
- }
- # print(info_dict)
- # info_list.append(info_dict)
- try:
- sql_pool.insert_one_or_dict(
- table="qiandao_luckybag_list_record",
- data=info_dict
- )
- except Exception as e:
- log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
- sql_luckybag_list.append(luckybag_id)
- # if info_list:
- # try:
- # sql_pool.insert_many(table="qiandao_luckybag_list_record", data_list=info_list)
- # except pymysql.err.IntegrityError as e:
- # if "Duplicate entry" in str(e):
- # logger.warning("存在重复的 luckybag_id,跳过插入")
- # else:
- # raise e
- # -------------------------------------------------------------------------------------------------------------------
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_luckybag_detail(log, luckyBagId: str):
- """
- 获取福袋详情
- :param luckyBagId: luckyBagId
- :param log: logger对象
- """
- url = "https://api.qiandao.cn/c2c-web/v1/luckybag"
- params = {
- # "luckyBagId": "876352952506283967"
- "luckyBagId": luckyBagId
- }
- resp_json = utils.request_get_data(log, url, params)
- if not resp_json:
- log.error("get_luckybag_detail error")
- raise RuntimeError("get_luckybag_detail error")
- return resp_json
- def parse_luckybag_detail_basic_info_data(log, resp_json, sql_pool, luckyBagId):
- try:
- basic_info = resp_json.get("data", {}).get("basicInfo", {})
- sell_end_time = basic_info.get("sellEndTime")
- sell_end_time = utils.transform_ms(log, sell_end_time)
- # moderate_status = basic_info.get("moderateStatus")
- status = basic_info.get("status")
- # 更新 qiandao_luckybag_list_record 表
- updata_dict = {
- "sell_end_time": sell_end_time,
- # "moderate_status": moderate_status,
- "status": status
- }
- # print(f"updata_dict:{updata_dict}")
- sql_pool.update_one_or_dict(
- table="qiandao_luckybag_list_record",
- data=updata_dict,
- condition={
- "luckybag_id": luckyBagId
- }
- )
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for basic_info error: {e}")
- def parse_luckybag_detail_product_list_data(log, resp_json, sql_pool, luckybag_id):
- try:
- product_list = resp_json.get("data", {}).get("products", [])
- # info_product_list = []
- for product in product_list:
- product_id = product.get("id")
- product_name = product.get("name")
- image = product.get("image")
- reference_price = product.get("referencePrice") # 参考价格
- spu_id = product.get("spuId")
- spec_list = product.get("specValues", [{}])
- spec_name_list = [spec.get("name") for spec in spec_list]
- spec_name = "".join(spec_name_list) # 规格名称
- spec_value_list = [spec.get("value") for spec in spec_list]
- spec_value = "".join(spec_value_list) # 规格值
- remain_count = product.get("remainCount") # 剩余数量
- total_count = product.get("totalCount") # 总数
- product_dict = {
- "luckybag_id": luckybag_id,
- "product_id": product_id,
- "product_name": product_name,
- "image": image,
- "reference_price": reference_price,
- "spu_id": spu_id,
- "spec_name": spec_name,
- "spec_value": spec_value,
- "remain_count": remain_count,
- "total_count": total_count
- }
- # print(f"product_dict:{product_dict}")
- try:
- sql_pool.insert_one_or_dict(table="qiandao_luckybag_products_record", data=product_dict)
- sql_pool.update_one_or_dict(
- table="qiandao_luckybag_list_record",
- data={
- "detail_state": 1
- },
- condition={
- "luckybag_id": luckybag_id
- }
- )
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for product_list error: {e}")
- # info_product_list.append(product_dict)
- # if info_product_list:
- # try:
- # sql_pool.insert_many(table="qiandao_luckybag_products_record", data_list=info_product_list)
- # sql_pool.update_one_or_dict(
- # table="qiandao_luckybag_list_record",
- # data={
- # "detail_state": 1
- # },
- # condition={
- # "luckybag_id": luckybag_id
- # }
- # )
- # except pymysql.err.IntegrityError as e:
- # if "Duplicate entry" in str(e):
- # logger.warning("存在重复的 product_id,跳过插入")
- # else:
- # raise e
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for product_list error: {e}")
- def parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckybag_id):
- try:
- order_list = []
- # 先查询该福袋 购买数据的最大时间
- max_luckybay_time = sql_pool.select_one(
- query="SELECT MAX(order_created_at) FROM qiandao_luckybag_orders_record WHERE luckybag_id = %s",
- args=(luckybag_id,))
- max_luckybay_time = max_luckybay_time[0] if max_luckybay_time else None
- records = resp_json.get("data", {}).get("records", [])
- for record in records:
- product_id = record.get("productId")
- # product_name = record.get("productName")
- serial_number = record.get("serialNumber") # 序列号
- order_id = record.get("orderId")
- trading_time = record.get("orderCreatedAt")
- trading_time_str = utils.transform_ms(log, trading_time)
- if not trading_time_str:
- log.debug(f"Invalid trading time: {trading_time_str}")
- continue # 跳过无效时间
- # 字符串 -> datetime
- trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
- if max_luckybay_time and trading_time <= max_luckybay_time:
- log.debug(f"trading_time: {trading_time_str} <= max_luckybay_time: {max_luckybay_time}, 跳过旧数据")
- break
- buyerNickname = record.get("buyerNickname")
- order_dict = {
- "luckybag_id": luckybag_id,
- "product_id": product_id,
- "order_id": order_id,
- "serial_number": serial_number,
- "order_created_at": trading_time_str,
- "buyer_nickname": buyerNickname
- }
- # print(f"order_dict:{order_dict}")
- # try:
- # sql_pool.insert_one_or_dict(table="qiandao_luckybag_orders_record", data=order_dict)
- # sql_pool.update_one_or_dict(
- # table="qiandao_luckybag_list_record",
- # data={
- # "order_state": 1
- # },
- # condition={
- # "luckybag_id": luckybag_id
- # }
- # )
- # except Exception as e:
- # log.error(f"{inspect.currentframe().f_code.co_name} for order_list error: {e}")
- order_list.append(order_dict)
- if order_list:
- try:
- sql_pool.insert_many(table="qiandao_luckybag_orders_record", data_list=order_list)
- sql_pool.update_one_or_dict(
- table="qiandao_luckybag_list_record",
- data={
- "order_state": 1
- },
- condition={
- "luckybag_id": luckybag_id
- }
- )
- except pymysql.err.IntegrityError as e:
- if "Duplicate entry" in str(e):
- logger.warning("存在重复的 order_id,跳过插入")
- else:
- # raise e
- log.warning(f"{str(e)[:200]}")
- sql_pool.update_one_or_dict(
- table="qiandao_luckybag_list_record",
- data={
- "order_state": 2
- },
- condition={
- "luckybag_id": luckybag_id
- }
- )
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for order_list error: {e}")
- def get_detail_main(log, sql_pool):
- """
- 1. 将 qiandao_luckybag_list_record 表中 detail_state = 0 的数据进行更新
- 插入 product list 数据 共用一个状态
- 2. 将 qiandao_luckybag_list_record 表中 order_state = 0 并且 status 不是 'ON_SALE' 的数据进行更新
- :param log: logger对象
- :param sql_pool: MySQLConnectionPool对象
- """
- sql_detail_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record WHERE order_state = 0")
- sql_detail_list = [item[0] for item in sql_detail_list]
- for luckyBagId in sql_detail_list:
- try:
- resp_json = get_luckybag_detail(log, luckyBagId)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for sql_detail_list: {luckyBagId}, error: {e}")
- resp_json = {}
- # parse_luckybag_detail_basic_info_data(log, resp_json, sql_pool, luckyBagId)
- # parse_luckybag_detail_product_list_data(log, resp_json, sql_pool, luckyBagId)
- parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckyBagId)
- # ---------------------------------------------------------------------------------------
- # order 信息待后续分析 status 后 再运行
- # sql_order_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record WHERE order_state = 0 AND status != 'ON_SALE'")
- # sql_order_list = [item[0] for item in sql_order_list]
- #
- # for luckyBagId2 in sql_order_list:
- # try:
- # resp_json = get_luckybag_detail(log, luckyBagId2)
- # except Exception as e:
- # log.error(f"{inspect.currentframe().f_code.co_name} for sql_order_list: {luckyBagId2}, error: {e}")
- # resp_json = {}
- #
- # parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckyBagId2)
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def qd_lb_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 获取 福袋 列表
- log.info("-------------------------------------- 开始 获取 福袋 列表 --------------------------------------")
- # sql_luckybag_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record")
- # sql_luckybag_list = [item[0] for item in sql_luckybag_list]
- # try:
- # get_luckybag_list(log, sql_pool,sql_luckybag_list)
- # except Exception as e:
- # log.error(f"Error fetching get_luckybag_list: {e}")
- #
- # sql_luckybag_list.clear()
- #
- # time.sleep(5)
- # 获取 福袋 详情
- log.info("-------------------------------------- 开始获取 福袋 详情 --------------------------------------")
- try:
- get_detail_main(log, sql_pool)
- except Exception as e:
- log.error(f"Error fetching get_detail_main: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- # EmailSender().send(subject="【千岛 - 爬虫通知】今日任务已完成",
- # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。")
- if __name__ == '__main__':
- qd_lb_main(logger)
|