# -*- coding: utf-8 -*- # Author : Charley # Python : 3.12.10 # Date : 2026/5/18 15:17 import random import time import inspect import schedule import user_agent from loguru import logger from parsel import Selector from curl_cffi import requests from curl_cffi.requests import BrowserType from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed """ 目标网站:https://bid.memorylaneinc.com/lots/gallery?page=2 """ logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") # 直接用库内置的所有浏览器类型,不用手动维护列表 client_identifier_list = [b.value for b in BrowserType] # print(client_identifier_list) headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "user-agent": user_agent.generate_user_agent() } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log) def get_proxys(log): http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931" https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931" try: proxySettings = { "http": http_proxy, "https": https_proxy, } return proxySettings except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log) def get_details(log, url, sql_pool, sql_id): """ 获取详情数据 :param log: logger对象 :param url: 详情页URL :param sql_pool: MySQL连接池 :param sql_id: 数据ID :return: 标题和描述 """ log.info(f">>>>>>>>>>>>>> 正在爬取详情数据URL: {url} <<<<<<<<<<<<<<") response = requests.get(url, headers=headers, impersonate=random.choice(client_identifier_list), timeout=10, proxies=get_proxys(log)) response.raise_for_status() selector = Selector(response.text) category = selector.xpath('//a[@id="MainContent_hCategory"]/text()').get() description = selector.xpath('//*[@id="MainContent_lblOldAuction"]/text()').getall() description = ' '.join(description).strip() if description else None # imgs = selector.xpath('//div[@class="col-md-5 col-sm-5"]//a[@class="MagicThumb-swap"]/@href').getall() imgs = selector.xpath('//div[@class="col-md-5 col-sm-5"]//a[not(@id="Zoomer")]/@href').getall() imgs = ','.join(imgs) if imgs else None # print(category, description, imgs) # 更新数据和状态 sql_pool.update_one_or_dict( table="memory_lane_record", data={"category": category, "description": description, "imgs": imgs, "state": 1}, condition={"id": sql_id} ) def get_single_page(log, page, sql_pool): """ 获取单页数据 :param log: logger对象 :param page: 页码 :param sql_pool: MySQL连接池 :return: 该页数据条数 """ log.info(f"Getting page -> {page} data....................................................") url = "https://bid.memorylaneinc.com/lots/gallery" params = {"page": page} # response = requests.get(url, headers=headers, impersonate="chrome124", params=params, timeout=10) response = requests.get(url, headers=headers, impersonate=random.choice(client_identifier_list), params=params, proxies=get_proxys(log), timeout=10) # print(response.text) response.raise_for_status() selector = Selector(response.text) tag_div_list = selector.xpath( '//div[@class="items"]/div/div[@class="row"]//div[@class="col-lg-3 col-md-4 col-sm-6"]') # print('tag_div_list:', tag_div_list) info_list = [] for tag_div in tag_div_list: title = tag_div.xpath('.//p/a/text()').get() detail_url = tag_div.xpath('.//p/a/@href').get() # img = tag_div.xpath('.//div[@class="item-image"]/a/img/@src').get() tag_div_p = tag_div.xpath('.//div/p[2]/strong/text()').getall() bids = tag_div_p[0] if tag_div_p else None opening_bid = tag_div_p[1] if len(tag_div_p) > 1 else None opening_bid = opening_bid.replace('$', '').replace(',', '').strip() if opening_bid else None status = tag_div_p[2] if len(tag_div_p) > 2 else None current_bid = tag_div.xpath('.//div[@class="item-price"]/a/text()').get() current_bid = current_bid.replace('CURRENT BID $', '').replace(',', '').strip() if current_bid else None data_dict = { "title": title, "detail_url": detail_url, # "img": img, "bids": bids, "opening_bid": opening_bid, "status": status, "current_bid": current_bid } # print('data_dict:', data_dict) info_list.append(data_dict) # 保存数据到数据库 if info_list: sql_pool.insert_many(table="memory_lane_record", data_list=info_list, ignore=True) return len(info_list) def get_sold_list(log, sql_pool): """ 获取已售列表 :param log: logger对象 :param sql_pool: MySQL连接池 :return: 无 """ page = 1 max_page = 10 while page <= max_page: try: len_list = get_single_page(log, page, sql_pool) except Exception as e: log.error(f"Error getting page {page}: {e}") continue if len_list == 0: log.warning(f"No data on page {page}, stopping further requests") break page += 1 @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def meml_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool: log.error("MySQL数据库连接失败") raise Exception("MySQL数据库连接失败") try: try: get_sold_list(log, sql_pool) except Exception as e: log.error(f'Error getting sold list: {e}') # 更新详情页 log.debug('Updating detail pages........................... started') # sql_result = sql_pool.select_all('select id, detail_url from memory_lane_record where state = 0') sql_result = sql_pool.select_all('select id, detail_url from memory_lane_record where state != 1 order by id') for row in sql_result: sql_id = row[0] detail_url = row[1] try: get_details(log, detail_url, sql_pool, sql_id) except Exception as e: log.error(f'Error getting details for {detail_url}: {e}') # 更新数据和状态 sql_pool.update_one_or_dict( table="memory_lane_record", data={"state": 2}, condition={"id": sql_id} ) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def schedule_task(): """ 设置定时任务 """ meml_main(log=logger) schedule.every().day.at("05:00").do(meml_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": # get_single_page(log=logger, page=1, sql_pool=None) schedule_task()