# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/7/8 16:51 import threading import time from datetime import datetime import utils import inspect import schedule from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") def get_live_list(log, sql_pool, sql_live_id_list): """ 获取 live 数据列表 :param log: :param sql_pool: :param sql_live_id_list: :return: """ log.debug(f"{inspect.currentframe().f_code.co_name} crawl start ..........") page = 1 max_page = 50 total_count = 0 while page <= max_page: len_item, totalCount = get_live_one_page(log, page, sql_pool, sql_live_id_list) if len_item < 20: log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------") break total_count += len_item if total_count >= int(totalCount): log.debug(f"total_count: {total_count} totalCount: {totalCount}") break page += 1 # time.sleep(random.uniform(0.1, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_live_one_page(log, page, sql_pool, sql_live_id_list): """ 获取 live 单页数据 :param log: :param page: :param sql_pool: :param sql_live_id_list: :return: """ log.debug(f"{inspect.currentframe().f_code.co_name} for page:{page}..........") url = "https://api.qiandao.com/live/list" data = { "limit": 20, # "offset": 0, "offset": (page - 1) * 20, "status": [ "PREPARE", "UNDERWAY" ], "type": "" } resp_json = utils.request_post_data(logger, url, data) # print(resp_json) if not resp_json: log.error("get_luckybag_one_page error") raise RuntimeError("get_luckybag_one_page error") rows = resp_json.get("data", {}).get("rows", []) total_count = resp_json.get("data", {}).get("count", 0) try: parse_live_data(log, rows, sql_pool, sql_live_id_list) except Exception as e: log.error(f"parse_live_data error: {e}") return len(rows), total_count def parse_live_data(log, rows, sql_pool, sql_live_id_list): """ 解析 live 数据 :param log: :param rows: :param sql_pool: :param sql_live_id_list: :return: """ live_list = [] for row in rows: live_id = row.get("id") if live_id in sql_live_id_list: log.info(f"live_id:{live_id} is exist, skip .......") continue ownerId = row.get("ownerId") title = row.get("title") images = row.get("images", {}).get("coverImages", [])[0] startTime = row.get("startTime") startTime = utils.transform_ms(log, startTime) endTime = row.get("endTime") if endTime != '0': endTime = utils.transform_ms(log, endTime) status = row.get("status") pullUrl = row.get("pullUrl") createdAt = row.get("createdAt") updatedAt = row.get("updatedAt") user_id = row.get("user", {}).get("id") officialUserId = row.get("user", {}).get("officialUserId") nickname = row.get("user", {}).get("nickname") live_type = row.get("type") live_dict = { "live_id": live_id, "owner_id": ownerId, "title": title, "images": images, "start_time": startTime, "end_time": endTime, "status": status, "pull_url": pullUrl, "created_at": createdAt, "updated_at": updatedAt, "user_id": user_id, "official_user_id": officialUserId, "nickname": nickname, "live_type": live_type } # print(live_dict) live_list.append(live_dict) sql_live_id_list.append(live_id) # print(live_list) if live_list: try: sql_pool.insert_many(table="qiandao_live_list_record", data_list=live_list) except Exception as e: log.error(f"insert_many error: {e}") # def get_live_detail(log, live_id): # url = "https://api.qiandao.cn/live/detail" # params = { # "liveId": "883752924533007599" # } # resp_json = utils.request_get_data(log, url, params) # print(resp_json) # if resp_json.get("code") == 0: # res_data = resp_json.get("data", {}) # ownerId = res_data.get("ownerId") # title = res_data.get("title") # else: # log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") def get_live_product_list(log, live_id, sql_pool, sql_product_list): """ 获取 live 商品列表 :param log: :param live_id: :param sql_pool: :param sql_product_list: :return: """ page = 1 max_page = 10 total_count = 0 while page <= max_page: len_item, totalCount = get_product_single_page(log, live_id, page, sql_pool, sql_product_list) if len_item < 50: log.debug( f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount}, Less than 50, break ---------------") break total_count += len_item if total_count >= int(totalCount): log.debug(f"total_count: {total_count} totalCount: {totalCount}") break page += 1 # time.sleep(random.uniform(0.1, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_product_single_page(log, live_id, page, sql_pool, sql_product_list): """ 获取 live 商品列表单页数据 :param log: :param live_id: :param page: :param sql_pool: :param sql_product_list: :return: """ log.debug(f"{inspect.currentframe().f_code.co_name} for live_id:{live_id}, .... page:{page}..........") url = "https://api.qiandao.cn/live/goods/list" params = { # "liveId": "883752924533007599", "liveId": live_id, "limit": "50", # "offset": "0" "offset": str((page - 1) * 50), } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) totalCount = res_data.get("count") rows = res_data.get("rows", []) if rows: parse_product_data(log, rows, sql_pool, sql_product_list, live_id) return len(rows), totalCount else: return 0, 0 else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") return 0, 0 def parse_product_data(log, items, sql_pool, sql_product_list, live_id): """ 解析 live 商品数据 :param log: :param items: :param sql_pool: :param sql_product_list: :param live_id: :return: """ log.debug(f"{inspect.currentframe().f_code.co_name} parse product start, live_id:{live_id} ..........") info_list = [] for row in items: row_id = row.get("id") # 883752962113930805 goodsId = row.get("goodsId") # 883585812724201771 if goodsId in sql_product_list: log.info(f"live_id:{live_id} goodsId:{goodsId} is exist, skip .......") continue goodsType = row.get("goodsType") goodsName = row.get("goodsName") goodsImages = row.get("goodsImages", {}) if goodsImages: goods_image = goodsImages.get("coverImages", [])[0] else: goods_image = "" goods_status = row.get("status") price = row.get("price") isSoldOut = row.get("isSoldOut") data_dict = { "live_id": live_id, "row_id": row_id, "goods_id": goodsId, "goods_type": goodsType, "goods_name": goodsName, "goods_image": goods_image, "goods_status": goods_status, "price": price, "is_sold_out": isSoldOut } # print(data_dict) info_list.append(data_dict) sql_product_list.append(goodsId) if info_list: try: sql_pool.insert_many(table="qiandao_live_product_record", data_list=info_list) except Exception as e: log.warning(f"插入失败: {e}") # ---------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_first_group(log, live_id, goods_id, sql_pool): """ 获取第一种情况的购买记录 查询所需的 groupId :param log: :param live_id: :param goods_id: :param sql_pool: """ log.debug( f"{inspect.currentframe().f_code.co_name} get groupId start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/box/live-draw/group" params = { # "shelfId": "876650715072717042" "shelfId": goods_id } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: groupId = resp_json.get("data", {}).get("groupId") if groupId: # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(goods_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time) else: log.warning(f"live_id:{live_id}, 未获取到groupId............") else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") def get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time): """ 获取第一种情况的购买记录 列表 翻页 :param log: :param groupId: :param live_id: :param goods_id: :param sql_pool: :param max_trading_time: :return: """ page = 1 max_page = 50 # total_count = 0 while page <= max_page: len_item = get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time) if len_item < 20: log.debug(f"--------------- page {page}, len_item: {len_item} ---------------") break page += 1 @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time): """ 获取第一种情况的购买记录 详情见图 商品点开 直接是购买页面 "goodsType": "LIVE_DRAW", :param log: :param groupId: :param live_id: :param goods_id: :param sql_pool: :param page: :param max_trading_time: """ log.debug(f"{inspect.currentframe().f_code.co_name} get order start, groupId:{groupId} ..........") url = "https://api.qiandao.cn/box/live-draw/draw-records" params = { "limit": "20", # "offset": "0", "offset": str((page - 1) * 20), # "groupId": "883762283266720281" "groupId": groupId } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) records = res_data.get("records", []) if not records: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no records") return 0 info_list = [] for row in records: status = row.get("status") drawResult = row.get("drawResult") # 抽到的号码 userName = row.get("userName") trading_time = row.get("createdAt", "") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break isGoodProduct = row.get("isGoodProduct") # 是否欧皇号码 data_dict = { "live_id": live_id, "goods_id": goods_id, "group_id": groupId, "status": status, "draw_result": drawResult, "user_name": userName, # "created_at": createdAt, "trading_time": trading_time_str, "is_good_product": isGoodProduct } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) return len(records) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") return 0 # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_second(log, live_id, goods_id, sql_pool): """ 获取第二种情况的购买记录 详情见图 "goodsType": "MALL", :param log: :param live_id: :param goods_id: :param sql_pool: """ log.debug( f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/mall/dynamic-detail" params = { # "shelfId": "873540181645335865", "shelfId": goods_id, "deliverPatterns": "0" } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) rows = res_data.get("recentPurchase", {}).get("rows", []) if not rows: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows") return # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(goods_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None info_list = [] for row in rows: productName = row.get("productName") productCount = row.get("productCount") trading_time = row.get("paidAt") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break userNickname = row.get("userNickname") data_dict = { "live_id": live_id, "goods_id": goods_id, "product_name": productName, "product_count": productCount, "trading_time": trading_time_str, "user_name": userNickname } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") # ---------------------------------------------------------------------------------------------------------------------- def get_third_list(log, live_id, goods_id, sql_pool): """ 获取第三种情况的购买记录列表 翻页 :param log: :param live_id: :param goods_id: :param sql_pool: """ page = 1 max_page = 50 # total_count = 0 # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(goods_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None while page <= max_page: len_item = get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time) if len_item < 10: log.debug(f"--------------- page {page}, len_item: {len_item} ---------------") break page += 1 @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time): """ 获取第三种情况的购买记录 详情见图 "goodsType": "UNBOX", :param log: :param live_id: :param goods_id: :param sql_pool: :param page: :param max_trading_time: """ log.debug( f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/box/unbox/recent-purchase-record" params = { # "shelfId": "876400156344348863", "shelfId": goods_id, "limit": "10", # "offset": "0" "offset": str((page - 1) * 10) } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) rows = res_data.get("rows", []) if not rows: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows") return 0 info_list = [] for row in rows: productName = row.get("productName") productCount = row.get("productCount") trading_time = row.get("paidAt") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break userNickname = row.get("userNickname") orderId = row.get("orderId") data_dict = { "live_id": live_id, "goods_id": goods_id, "product_name": productName, "product_count": productCount, "trading_time": trading_time_str, "user_name": userNickname, "order_id": orderId } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) return len(rows) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") return 0 # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_fourth(log, live_id, goods_id, sql_pool): """ 获取第四种情况的购买记录 详情见图 "goodsType": "AUCTION", auction_id = goodsId :param log: :param live_id: :param goods_id: :param sql_pool: """ log.debug( f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/auctioneer/bid/list" params = { # "auction_id": "883831521360768262", "auction_id": goods_id, "max_results": "200" } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) items = res_data.get("items", []) if not items: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no items") return # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(goods_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None info_list = [] for item in items: user_id = item.get("user_id") nickname = item.get("nickname") price = item.get("price") trading_time = item.get("bid_time") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break hammered = item.get("hammered") # 是否最终成交 data_dict = { "live_id": live_id, "goods_id": goods_id, "user_id": user_id, "user_name": nickname, "price": price, "trading_time": trading_time_str, "hammered": hammered } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") def test(): url = "https://api.qiandao.cn/auctioneer/bid/list" params = { "auction_id": "883831521360768262", "max_results": "200" } resp_json = utils.request_get_data(logger, url, params) print(resp_json) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def list_main(log): """ 主函数 列表页 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: sql_live_id_list = sql_pool.select_all("SELECT live_id FROM qiandao_live_list_record") sql_live_id_list = [item[0] for item in sql_live_id_list] get_live_list(log, sql_pool, sql_live_id_list) time.sleep(10) sql_pid_list = sql_pool.select_all("SELECT goods_id FROM qiandao_live_product_record") sql_pid_list = [item[0] for item in sql_pid_list] for live_id in sql_live_id_list: try: get_live_product_list(log, live_id, sql_pool, sql_pid_list) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def order_main(log): """ 主函数 详情页 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # LIVE_DRAW MALL UNBOX AUCTION # 查询第一种情况 sql_first_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'LIVE_DRAW'") # sql_first_list = [item[0] for item in sql_first_list] for live_id, goods_id in sql_first_list: try: get_order_first_group(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") # 查询第二种情况 sql_second_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'MALL'") # sql_second_list = [item[0] for item in sql_second_list] for live_id, goods_id in sql_second_list: try: get_order_records_second(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") # 查询第三种情况 sql_third_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'UNBOX'") # sql_third_list = [item[0] for item in sql_third_list] for live_id, goods_id in sql_third_list: try: get_third_list(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") # 查询第四种情况 sql_fourth_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'AUCTION'") # sql_fourth_list = [item[0] for item in sql_fourth_list] for live_id, goods_id in sql_fourth_list: try: get_order_records_fourth(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") def run_threaded(job_func, *args, **kwargs): """ 在新线程中运行给定的函数,并传递参数。 :param job_func: 要运行的目标函数 :param args: 位置参数 :param kwargs: 关键字参数 """ job_thread = threading.Thread(target=job_func, args=args, kwargs=kwargs) job_thread.start() def schedule_task(): """ 爬虫模块 定时任务 的启动文件 """ # 立即运行一次任务 list_main(log=logger) # order_main(log=logger) # 设置定时任务 schedule.every(30).minutes.do(run_threaded, list_main, log=logger) schedule.every().day.at("01:06").do(run_threaded, order_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': # get_live_list(logger, None, []) # get_live_detail(logger, "883752924533007599") # get_live_product_list(logger, "883752924533007599", None, []) # test() schedule_task()