# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/7/8 16:51 from datetime import datetime import utils import inspect from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/add_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") # ---------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_first_group(log, live_id, goods_id, sql_pool): """ 获取第一种情况的购买记录 查询所需的 groupId :param log: :param live_id: :param goods_id: :param sql_pool: """ log.debug( f"{inspect.currentframe().f_code.co_name} get groupId start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/box/live-draw/group" params = { # "shelfId": "876650715072717042" "shelfId": goods_id } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: groupId = resp_json.get("data", {}).get("groupId") if groupId: # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(groupId,)) max_trading_time = max_trading_time[0] if max_trading_time else None get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time) else: log.warning(f"live_id:{live_id}, 未获取到groupId............") else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") def get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time): """ 获取第一种情况的购买记录 列表 翻页 :param log: :param groupId: :param live_id: :param goods_id: :param sql_pool: :param max_trading_time: :return: """ page = 1 max_page = 50 # total_count = 0 while page <= max_page: len_item = get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time) if len_item < 20: log.debug(f"--------------- page {page}, len_item: {len_item} ---------------") break page += 1 @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time): """ 获取第一种情况的购买记录 详情见图 商品点开 直接是购买页面 "goodsType": "LIVE_DRAW", :param log: :param groupId: :param live_id: :param goods_id: :param sql_pool: :param page: :param max_trading_time: """ log.debug(f"{inspect.currentframe().f_code.co_name} get order start, groupId:{groupId} ..........") url = "https://api.qiandao.cn/box/live-draw/draw-records" params = { "limit": "20", # "offset": "0", "offset": str((page - 1) * 20), # "groupId": "883762283266720281" "groupId": groupId } resp_json = utils.request_get_data(log, url, params) print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) records = res_data.get("records", []) if not records: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no records") return 0 info_list = [] for row in records: status = row.get("status") drawResult = row.get("drawResult") # 抽到的号码 userName = row.get("userName") trading_time = row.get("createdAt", "") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break isGoodProduct = row.get("isGoodProduct") # 是否欧皇号码 data_dict = { "live_id": live_id, "goods_id": goods_id, "group_id": groupId, "status": status, "draw_result": drawResult, "user_name": userName, # "created_at": createdAt, "trading_time": trading_time_str, "is_good_product": isGoodProduct } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) return len(records) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") return 0 # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_second(log, live_id, goods_id, sql_pool): """ 获取第二种情况的购买记录 详情见图 "goodsType": "MALL", :param log: :param live_id: :param goods_id: :param sql_pool: """ log.debug( f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/mall/dynamic-detail" params = { # "shelfId": "873540181645335865", "shelfId": goods_id, "deliverPatterns": "0" } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) rows = res_data.get("recentPurchase", {}).get("rows", []) if not rows: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows") return # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(live_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None info_list = [] for row in rows: productName = row.get("productName") productCount = row.get("productCount") trading_time = row.get("paidAt") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break userNickname = row.get("userNickname") data_dict = { "live_id": live_id, "goods_id": goods_id, "product_name": productName, "product_count": productCount, "trading_time": trading_time_str, "user_name": userNickname } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") # ---------------------------------------------------------------------------------------------------------------------- def get_third_list(log, live_id, goods_id, sql_pool): """ 获取第三种情况的购买记录列表 翻页 :param log: :param live_id: :param goods_id: :param sql_pool: """ page = 1 max_page = 50 # total_count = 0 # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(live_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None while page <= max_page: len_item = get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time) if len_item < 10: log.debug(f"--------------- page {page}, len_item: {len_item} ---------------") break page += 1 @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time): """ 获取第三种情况的购买记录 详情见图 "goodsType": "UNBOX", :param log: :param live_id: :param goods_id: :param sql_pool: :param page: :param max_trading_time: """ log.debug( f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/box/unbox/recent-purchase-record" params = { # "shelfId": "876400156344348863", "shelfId": goods_id, "limit": "10", # "offset": "0" "offset": str((page - 1) * 10) } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) rows = res_data.get("rows", []) if not rows: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows") return 0 info_list = [] for row in rows: productName = row.get("productName") productCount = row.get("productCount") trading_time = row.get("paidAt") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break userNickname = row.get("userNickname") orderId = row.get("orderId") data_dict = { "live_id": live_id, "goods_id": goods_id, "product_name": productName, "product_count": productCount, "trading_time": trading_time_str, "user_name": userNickname, "order_id": orderId } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) return len(rows) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") return 0 # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_order_records_fourth(log, live_id, goods_id, sql_pool): """ 获取第四种情况的购买记录 详情见图 "goodsType": "AUCTION", auction_id = goodsId :param log: :param live_id: :param goods_id: :param sql_pool: """ log.debug( f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........") url = "https://api.qiandao.cn/auctioneer/bid/list" params = { # "auction_id": "883831521360768262", "auction_id": goods_id, "max_results": "200" } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: res_data = resp_json.get("data", {}) items = res_data.get("items", []) if not items: log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no items") return # 需要先查询 groupId 最大的 trading_time, 防止数据重复 max_trading_time = sql_pool.select_one( query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s", args=(live_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None info_list = [] for item in items: user_id = item.get("user_id") nickname = item.get("nickname") price = item.get("price") trading_time = item.get("bid_time") trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break hammered = item.get("hammered") # 是否最终成交 data_dict = { "live_id": live_id, "goods_id": goods_id, "user_id": user_id, "user_name": nickname, "price": price, "trading_time": trading_time_str, "hammered": hammered } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list) except Exception as e: log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") def test(): url = "https://api.qiandao.cn/auctioneer/bid/list" params = { "auction_id": "883831521360768262", "max_results": "200" } resp_json = utils.request_get_data(logger, url, params) print(resp_json) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def order_main(log): """ 主函数 详情页 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # LIVE_DRAW MALL UNBOX AUCTION # 查询第一种情况 sql_first_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'LIVE_DRAW'") # sql_first_list = [item[0] for item in sql_first_list] # print(sql_first_list) for live_id, goods_id in sql_first_list: try: get_order_first_group(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") # 查询第二种情况 sql_second_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'MALL'") # sql_second_list = [item[0] for item in sql_second_list] for live_id, goods_id in sql_second_list: try: get_order_records_second(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") # 查询第三种情况 sql_third_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'UNBOX'") # sql_third_list = [item[0] for item in sql_third_list] for live_id, goods_id in sql_third_list: try: get_third_list(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") # 查询第四种情况 sql_fourth_list = sql_pool.select_all( "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'AUCTION'") # sql_fourth_list = [item[0] for item in sql_fourth_list] for live_id, goods_id in sql_fourth_list: try: get_order_records_fourth(log, live_id, goods_id, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") if __name__ == '__main__': order_main(log=logger)