Răsfoiți Sursa

update bgs 10.29.1

lei.chen 1 lună în urmă
părinte
comite
bb8ef29306
2 a modificat fișierele cu 228 adăugiri și 8 ștergeri
  1. 12 8
      bgs_spider/bgs_new_daily_spider.py
  2. 216 0
      bgs_spider/bgs_update_missing_data.py

+ 12 - 8
bgs_spider/bgs_new_daily_spider.py

@@ -164,19 +164,23 @@ def bgs_main(log):
             log.error("数据库连接失败")
             raise Exception("数据库连接失败")
 
-        # max_bgs_id = sql_pool.select_one("SELECT MAX(number) AS max_number FROM beckett_bgs_record")
-        # # print(max_bgs_id_list)
-        # max_bgs_id = max_bgs_id[0]
-        # log.info(f"max_bgs_id 从 {max_bgs_id} 开始爬取.........................")
+        max_bgs_id = sql_pool.select_one("SELECT MAX(number) AS max_number FROM beckett_bgs_record")
+        # print(max_bgs_id_list)
+        max_bgs_id = max_bgs_id[0]
+        log.info(f"max_bgs_id 从 {max_bgs_id} 开始爬取.........................")
         # bgs_id_list = [i for i in range(max_bgs_id, max_bgs_id + 3001)]
-        #
-        # sql_pool.insert_all("INSERT INTO bgs_task(auth_code) VALUES (%s)", bgs_id_list)
 
-        # 20250923 查询到Friday, May 16, 2025  往后进行
+
+        # 20251029 往前2000  往后3000
         while True:
             # 倒序查 5000个
             sql_bgs_id_list = sql_pool.select_all(
-                "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code > 17990000 LIMIT 5000")
+                "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code BETWEEN %s AND %s LIMIT 5000",
+                (max_bgs_id - 2000, max_bgs_id + 3000))
+
+            # sql_bgs_id_list = sql_pool.select_all(
+            #     "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code > 18239485 LIMIT 5000")
+                # "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code > 17990000 LIMIT 5000"
             sql_bgs_id_list = [bid[0] for bid in sql_bgs_id_list]
             # for bid in sql_bgs_id_list:
             try:

+ 216 - 0
bgs_spider/bgs_update_missing_data.py

@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/10/20 13:19
+import time
+import requests
+import schedule
+import user_agent
+from loguru import logger
+import concurrent.futures
+from tenacity import stop_after_attempt, wait_fixed, retry
+from mysql_pool import MySQLConnectionPool
+
+"""
+bgs date_graded为null的数据  补充查询
+"""
+
+
+logger.remove()
+logger.add("logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="1 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+def save_data(mysql_pool, info):
+    """
+    :param mysql_pool:
+    :param info:
+    :return:
+    """
+    sql = "INSERT INTO beckett_bgs_record(set_name, player_name, date_graded, centering_grade, corner_grade, edges_grade, surfaces_grade, auto_grade, final_grade, total_grade, cards_grade, number) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+    mysql_pool.insert_one(sql, info)
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_data(log, bgs_id, sql_pool):
+    """
+    :param log:
+    :param bgs_id:
+    :param sql_pool:
+    :return:
+    """
+    headers = {
+        "accept": "application/json, text/plain, */*",
+        "user-agent": user_agent.generate_user_agent()
+    }
+    url = "https://www.beckett.com/api/grading/lookup"
+    params = {
+        "category": "BGS",
+        "serialNumber": str(bgs_id)
+    }
+    response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=5)
+    if response.status_code == 404:
+        # 没有数据 No Record Found 将状态改为3
+        log.warning(f"No Record Found for {bgs_id}")
+        sql_pool.update_one("UPDATE beckett_bgs_record SET miss_state=3 WHERE number=%s", (bgs_id,))
+        return
+
+    if response.status_code != 200:
+        # 查询失败  将状态改为2
+        log.warning(f"Error getting data for {bgs_id}, {response.status_code}")
+        sql_pool.update_one("UPDATE beckett_bgs_record SET miss_state=2 WHERE number=%s", (bgs_id,))
+        return
+
+    # print(response.json())
+    result_dict = response.json()
+    if result_dict:
+        """
+            "label": "silver",
+            "non_bccg_card_total": 0,
+            "item_id": "17932864",
+            "set_name": "2024 Magic the Gathering Secret Lair Dungeons & Dragons 50th Anniversary Bonus Card Foil",
+            "sport_name": "Magic",
+            "card_key": "0879",
+            "player_name": "Minsc & Boo, Timeless Heroes M",
+            "date_graded": "Thursday, April 17, 2025",
+            "center_grade": "9.5",
+            "corners_grade": "8.5",
+            "edges_grade": "8.5",
+            "surfaces_grade": "10.0",
+            "autograph_grade": "0.0",
+            "final_grade": "8.5",
+        """
+        set_name = result_dict.get('set_name')
+        player_name = result_dict.get('player_name')
+        date_graded = result_dict.get('date_graded')
+        centering_grade = result_dict.get('center_grade')
+        corner_grade = result_dict.get('corners_grade')
+        edges_grade = result_dict.get('edges_grade')
+        surfaces_grade = result_dict.get('surface_grade')
+        auto_grade = result_dict.get('autograph_grade')
+        final_grade = result_dict.get('final_grade')
+        total_grade = result_dict.get('pop_report')
+        cards_grade = result_dict.get('pop_higher')
+        info = (set_name, player_name, date_graded, centering_grade, corner_grade, edges_grade, surfaces_grade,
+                auto_grade, final_grade, total_grade, cards_grade, str(bgs_id))
+
+        # 检查所有值是否都为 None或空字符串, 不包含bgs_id
+        all_none_or_empty = all(x is None or x == '' for x in info[:-1])
+        if all_none_or_empty:
+            log.debug("All values are empty")
+        else:
+            # print(info)
+            sql_pool.update_one(
+                "UPDATE beckett_bgs_record SET set_name=%s, player_name=%s, date_graded=%s, centering_grade=%s, corner_grade=%s, edges_grade=%s, surfaces_grade=%s, auto_grade=%s, final_grade=%s, total_grade=%s, cards_grade=%s WHERE number=%s",
+                info
+            )
+            # 查询成功  将状态改为1
+            sql_pool.update_one("UPDATE beckett_bgs_record SET miss_state=1 WHERE number=%s", (bgs_id,))
+
+
+def process_urls(log, ids, sql_pool, batch_size=1000, max_workers=5):
+    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+        for i in range(0, len(ids), batch_size):
+            # print(i)
+            batch = ids[i:i + batch_size]
+            # print(batch)
+            try:
+                futures_to_urls = {executor.submit(get_data, log, url, sql_pool): url for url in batch}
+                for future in concurrent.futures.as_completed(futures_to_urls):
+                    url = futures_to_urls[future]
+                    try:
+                        future.result()
+                        log.debug(f"处理 {url} 成功")
+                    except Exception as exc:
+                        log.debug(f"处理 {url} 出错: {exc}")
+            except Exception as e:
+                log.error(f"提交任务失败: {e}")
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def bgs_main(log):
+    try:
+        log.info(
+            "开始运行 bgs_main 爬虫任务............................................................")
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool:
+            log.error("数据库连接失败")
+            raise Exception("数据库连接失败")
+
+        while True:
+            sql_bgs_id_list = sql_pool.select_all(
+                "SELECT number FROM beckett_bgs_record WHERE date_graded IS NULL AND miss_state = 0 LIMIT 10000")
+            sql_bgs_id_list = [bid[0] for bid in sql_bgs_id_list]
+            log.info(f"开始处理 {len(sql_bgs_id_list)} 条数据..........................................")
+            if not sql_bgs_id_list:
+                log.info("没有需要处理的数据, 退出查询")
+                break
+
+            try:
+                process_urls(log, sql_bgs_id_list, sql_pool, batch_size=1000,
+                             max_workers=10)  # 根据需要调整batch_size和max_workers
+                # get_data(bid, mysql_pool)
+            except Exception as e:
+                log.error('process urls: ', e)
+
+            time.sleep(5)
+
+    except Exception as e:
+        log.error(e)
+    finally:
+        log.info("爬虫程序运行结束,等待下一轮的采集任务.....................")
+
+
+def schedule_task():
+    """
+    设置定时任务
+    """
+    # 立即运行一次任务
+    bgs_main(logger)
+
+    # 设置定时任务
+    schedule.every().day.at("03:01").do(bgs_main, logger)
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    # schedule_task()
+    # get_data('1000743')
+    bgs_main(logger)