Browse Source

update 6.13.1

lei.chen 6 months ago
parent
commit
9438be5a06

+ 14 - 0
ags_spider/README.md

@@ -0,0 +1,14 @@
+## 1. ags每日更新爬虫任务
+
+```python
+# 启动命令
+python ags_new_daily.py
+```
+
+## 2. 历史任务爬虫
+
+```python
+# 启动命令
+python ags_history_spider.py
+```
+

+ 74 - 0
ags_spider/YamlLoader.py

@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+\:)?(?P<VAL>[\w\.]+)\}$')
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+
+    def get(self, key:str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+        
+    def getValueAsBool(self, key: str, env: str = None):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+    
+def readYaml(path:str = 'application.yml', profile:str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+
+    if profile != None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 47 - 0
ags_spider/add_task.py

@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/6/3 16:56
+from loguru import logger
+
+from mysq_pool import MySQLConnectionPool
+
+def get_add_cert(start_, end_):
+    num_list = []
+    # 循环生成8位数的字符串
+    for num in range(start_, end_ + 1):
+        # 使用zfill将数字转换为字符串,并确保长度为8位,不足部分用0填充
+        formatted_num = str(num).zfill(8)
+        # print(formatted_num)
+        num_list.append(formatted_num)
+    return num_list
+
+sql_pool = MySQLConnectionPool(log=logger)
+
+# logger.remove()
+# logger.add(lambda record: print(record), colorize=True, format="<green>{time}</green> <level>{message}</level>")
+# 查询最后一条数据  并且 +3000  将新任务插入表中
+# max_cert = sql_pool.select_one("SELECT cert_id FROM ags_record ORDER BY id DESC LIMIT 1")
+start_max_cert = 270000
+end_max_cert = 1000000
+new_id_list = get_add_cert(start_max_cert, end_max_cert)
+new_id_list = [str(ni) for ni in new_id_list]
+# logger.debug(f'查询到最新的 id 为:{max_cert[0]}, 开始生成新数据, 并添加到任务表中.........')
+
+# 防止重复添加任务
+# state0_id_list = sql_pool.select_all("SELECT cert_id FROM ags_task")
+# print(len(state0_id_list))
+# state0_id_list = [i[0] for i in state0_id_list]
+#
+#
+# filtered_new_id_list = []
+#
+# for ni in new_id_list:
+#     if str(ni) in state0_id_list:
+#         logger.debug(f"{ni} 已存在, 跳过添加")
+#     else:
+#         filtered_new_id_list.append(ni)
+# print(f'查询到 {len(filtered_new_id_list)} 条新的任务, 添加到任务表中......')
+# print(filtered_new_id_list)
+
+sql_pool.insert_all("INSERT INTO ags_task(cert_id) VALUES (%s)", new_id_list)

+ 274 - 0
ags_spider/ags_history_spider.py

@@ -0,0 +1,274 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/6/3 17:33
+import json
+import time
+import requests
+import schedule
+import user_agent
+import concurrent.futures
+from loguru import logger
+from retrying import retry
+from parsel import Selector
+from datetime import datetime
+
+from mysq_pool import MySQLConnectionPool
+
+logger.remove()
+logger.add("logs/his_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="30 day")
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_proxys_(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+@retry(stop_max_attempt_number=5, wait_fixed=1000)
+def get_proxys():
+    # 已购买账户  北美
+    # http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+    # https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+    http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
+    https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
+
+    # url = "https://ifconfig.me"
+    try:
+        proxySettings = {
+            "http": http_proxy,
+            "https": https_proxy,
+        }
+        return proxySettings
+    except Exception as e:
+        logger.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=2000)
+def get_price(ucid):
+    # ucid = '204224'
+    headers = {
+        "accept": "application/json, text/plain, */*",
+        "referer": "https://robograding.com/feed/00205223/view",
+        "user-agent": user_agent.generate_user_agent()
+        # "x-xsrf-token": "eyJpdiI6ImVBTksreFRBejJXS0tRWmQvUERjRVE9PSIsInZhbHVlIjoiWXA0VktNMGFpQ0RydkR0SmZrVDNhU0ZCQTU3T0pMcExIUFI0VkFkSm85NURGeEdNUXJKU1hxdlIwOHJkbXJGdVcrMXlpVlJPUDRUR3dBV0ZaQ0d2bFhHeTQzaG40dDFMb1lBSVpmcTF6cDh6UHVwTEZEZFViYXo1RnVjQ0dCQ2siLCJtYWMiOiJjYTdkNDJhMDkyNmE3ZmVjZmRiMWY2ZTBjMDBhNzJjMjhkZWYwY2M4NjAwNDAxMTU0ZmI5YjE4NWZhNTVhNWM3IiwidGFnIjoiIn0="
+    }
+    url = f"https://robograding.com/api/v3/card-price/{ucid}"
+    response = requests.get(url, headers=headers, proxies=get_proxys())
+    # response = requests.get(url, headers=headers)
+
+    if response.status_code != 200:
+        logger.debug('请求失败,重试......................')
+        raise Exception('请求失败,重试......................')
+    price = response.json().get('price')
+    return price
+
+
+def transform_date(date_str):
+    """
+    November 13, 2020 类型的日期字符串 转换成年月日格式
+    :param date_str: 日期字符串
+    :return: formatted_date
+    """
+    # 解析日期字符串
+    date_obj = datetime.strptime(date_str, "%B %d, %Y")
+
+    # 格式化日期
+    formatted_date = date_obj.strftime("%Y-%m-%d")
+    return formatted_date
+
+
+def save_data(sql_pool, info):
+    sql = "INSERT INTO ags_record (cert_id, name, title, score, card_type, release_date, series, card, ags_set, owner, centering_overall, surface_overall, edges_overall, corners_overall, price, front_img, back_img) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+    sql_pool.insert_one(sql, info)
+
+
+def parse_data(cert_id, resp_text, sql_pool):
+    selector = Selector(text=resp_text)
+    name = selector.xpath('//div[@class="feed-view__header__content"]//h1/text()').get()
+    title = selector.xpath('//div[@class="feed-view__header__content"]//h2/text()').get()
+    score = selector.xpath(
+        '//div[@class="feed-view__header__content"]//p[@class="feed-view__header__grade-score"]/text()').get()
+
+    tr_list = selector.xpath('//div[@class="feed-view__right-side"]//tbody/tr[not(contains(@class, "feed-view"))]')
+    result_dict = {'card_type': '',
+                   'release_date': '',
+                   'series': '',
+                   'card': '',
+                   'set': '',
+                   'owner': ''}
+    for tr in tr_list:
+        item_key = tr.xpath('./td[1]/h3/text()').get()
+        item_val = tr.xpath('./td[2]/text()').get()
+        try:
+            item_key = item_key.strip().replace(':', '')
+            item_val = item_val.strip().replace(':', '')
+            # print('item_key,item_val', item_key, item_val)
+            key_snake_case = item_key.replace(' ', '_').replace('/', '_').lower()
+            if key_snake_case in list(result_dict.keys()):
+                # print(key_snake_case, item_val)
+                result_dict[key_snake_case] = item_val
+        except Exception as e:
+            logger.debug(e)
+
+    # overall
+    centering_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[1]/p[2]/text()').get()
+    surface_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[2]/p[2]/text()').get()
+    edges_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[3]/p[2]/text()').get()
+    corners_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[4]/p[2]/text()').get()
+
+    # 获取用户id
+    user_card_id = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores-holder feed-view__breakdown__scores-card-price"]/div/@data-user-card-id').get()
+    if user_card_id:
+        price = get_price(user_card_id)
+    else:
+        price = None
+
+    img_json = selector.xpath(
+        '//section[@class="feed-view__content"]//div[@class="feed-view__card"]/div/@data-images').get()
+    img_dict = json.loads(img_json)
+    front_img = img_dict.get('front_slab_image')
+    back_img = img_dict.get('back_slab_image')
+    if not front_img:
+        front_img = img_dict.get('image_path')
+        back_img = None
+
+    info = (cert_id, name, title, score, result_dict.get('card_type'), transform_date(result_dict.get('release_date')),
+            result_dict.get('series'), result_dict.get('card'), result_dict.get('set'), result_dict.get('owner'),
+            centering_overall, surface_overall, edges_overall, corners_overall, price, front_img, back_img)
+    # print(info)
+    save_data(sql_pool, info)
+
+
+@retry(stop_max_attempt_number=5, wait_fixed=2000)
+def get_data(ags_id_list, sql_pool):
+    sql_id = ags_id_list[0]
+    cert_id = ags_id_list[1]
+    headers = {
+        "user-agent": user_agent.generate_user_agent()
+    }
+    url = f"https://robograding.com/feed/{cert_id}/view"
+
+    try:
+        response = requests.get(url, headers=headers, proxies=get_proxys())
+        # response = requests.get(url, headers=headers)
+        response.raise_for_status()
+
+        if "Grades are not available yet" in response.text:
+            logger.debug("Grades are not available yet in response.text......................")
+            # 更新数据库状态为未完成
+            sql_pool.update_one("UPDATE ags_task SET state=2 WHERE id=%s", (sql_id,))
+        else:
+            parse_data(cert_id, response.text, sql_pool)
+            # 更新数据库状态为已完成
+            sql_pool.update_one("UPDATE ags_task SET state=1 WHERE id=%s", (sql_id,))
+    except requests.RequestException as e:
+        logger.error(f"Request error: {e}")
+        raise  # 可以选择重新抛出异常以便外部处理
+
+
+def process_urls(ids, mysql_pool, batch_size=1000, max_workers=5):
+    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+        for i in range(0, len(ids), batch_size):
+            batch = ids[i:i + batch_size]
+            try:
+                futures_to_urls = {executor.submit(get_data, url, mysql_pool): url for url in batch}
+                for future in concurrent.futures.as_completed(futures_to_urls):
+                    url = futures_to_urls[future]
+                    try:
+                        future.result()
+                        logger.debug(f"处理 {url} 成功")
+                    except Exception as exc:
+                        logger.debug(f"处理 {url} 出错: {exc}")
+            except Exception as e:
+                logger.error(f"提交任务失败: {e}")
+
+
+# def get_add_cert(start_, end_):
+#     num_list = []
+#     # 循环生成8位数的字符串
+#     for num in range(start_, end_ + 1):
+#         # 使用zfill将数字转换为字符串,并确保长度为8位,不足部分用0填充
+#         formatted_num = str(num).zfill(8)
+#         # print(formatted_num)
+#         num_list.append(formatted_num)
+#     return num_list
+
+
+def get_new_task(sql_pool):
+    # 查询最后一条数据  并且 +3000  将新任务插入表中
+    # max_cert = sql_pool.select_one("SELECT cert_id FROM ags_record ORDER BY id DESC LIMIT 1")
+    # start_max_cert = int(max_cert[0]) + 1
+    # end_max_cert = start_max_cert + 3000
+    # new_id_list = get_add_cert(start_max_cert, end_max_cert)
+    # logger.debug(f'查询到最新的 id 为:{max_cert[0]}, 开始生成新数据, 并添加到任务表中.........')
+    #
+    # # 防止重复添加任务
+    # state0_id_list = sql_pool.select_all("SELECT cert_id FROM ags_task WHERE state = 0")
+    # state0_id_list = [i[0] for i in state0_id_list]
+    # for i in state0_id_list:
+    #     if i in new_id_list:
+    #         new_id_list.remove(i)
+    # sql_pool.insert_all("INSERT INTO ags_task(cert_id) VALUES (%s)", new_id_list)
+
+    # 查询新任务列表
+    ags_id_list = sql_pool.select_all("SELECT id,cert_id FROM ags_task WHERE state = 2 ORDER BY id DESC")
+    # ags_id_list = sql_pool.select_all("SELECT id,cert_id FROM ags_task WHERE state = 0 LIMIT 5000")
+    ags_id_list = [i for i in ags_id_list]
+    return ags_id_list
+
+
+@retry(stop_max_attempt_number=5, wait_fixed=2000)
+def main():
+    """
+    爬虫主函数
+    """
+    try:
+        logger.info("开始运行 tag_spider 爬虫任务............................................................")
+        sql_pool = MySQLConnectionPool(log=logger)
+        if not sql_pool:
+            logger.error("数据库连接失败")
+            raise Exception("数据库连接失败")
+        new_task = get_new_task(sql_pool)
+        try:
+            process_urls(new_task, sql_pool, batch_size=1000, max_workers=5)
+        except Exception as e:
+            logger.error('process urls: ', e)
+
+    except Exception as e:
+        logger.error(f'error:{e}')
+    finally:
+        logger.info("爬虫程序运行结束,等待下一轮的采集任务.............")
+
+
+def schedule_task():
+    main()
+    schedule.every(15).days.at("00:01").do(main)
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    schedule_task()

+ 275 - 0
ags_spider/ags_new_daily.py

@@ -0,0 +1,275 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-10-14 11:01
+import json
+import time
+import requests
+import schedule
+import user_agent
+import concurrent.futures
+from loguru import logger
+from retrying import retry
+from parsel import Selector
+from datetime import datetime
+
+from mysq_pool import MySQLConnectionPool
+
+logger.remove()
+logger.add("logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=2000)
+def get_proxys_(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop_max_attempt_number=5, wait_fixed=1000)
+def get_proxys():
+    # 已购买账户  北美
+    # http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+    # https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+    http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
+    https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
+
+    # url = "https://ifconfig.me"
+    try:
+        proxySettings = {
+            "http": http_proxy,
+            "https": https_proxy,
+        }
+        return proxySettings
+    except Exception as e:
+        logger.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=2000)
+def get_price(ucid):
+    # ucid = '204224'
+    headers = {
+        "accept": "application/json, text/plain, */*",
+        "referer": "https://robograding.com/feed/00205223/view",
+        "user-agent": user_agent.generate_user_agent()
+        # "x-xsrf-token": "eyJpdiI6ImVBTksreFRBejJXS0tRWmQvUERjRVE9PSIsInZhbHVlIjoiWXA0VktNMGFpQ0RydkR0SmZrVDNhU0ZCQTU3T0pMcExIUFI0VkFkSm85NURGeEdNUXJKU1hxdlIwOHJkbXJGdVcrMXlpVlJPUDRUR3dBV0ZaQ0d2bFhHeTQzaG40dDFMb1lBSVpmcTF6cDh6UHVwTEZEZFViYXo1RnVjQ0dCQ2siLCJtYWMiOiJjYTdkNDJhMDkyNmE3ZmVjZmRiMWY2ZTBjMDBhNzJjMjhkZWYwY2M4NjAwNDAxMTU0ZmI5YjE4NWZhNTVhNWM3IiwidGFnIjoiIn0="
+    }
+    url = f"https://robograding.com/api/v3/card-price/{ucid}"
+    response = requests.get(url, headers=headers, proxies=get_proxys())
+    # response = requests.get(url, headers=headers)
+
+    if response.status_code != 200:
+        logger.debug('请求失败,重试......................')
+        raise Exception('请求失败,重试......................')
+    price = response.json().get('price')
+    return price
+
+
+def transform_date(date_str):
+    """
+    November 13, 2020 类型的日期字符串 转换成年月日格式
+    :param date_str: 日期字符串
+    :return: formatted_date
+    """
+    # 解析日期字符串
+    date_obj = datetime.strptime(date_str, "%B %d, %Y")
+
+    # 格式化日期
+    formatted_date = date_obj.strftime("%Y-%m-%d")
+    return formatted_date
+
+
+def save_data(sql_pool, info):
+    sql = "INSERT INTO ags_record (cert_id, name, title, score, card_type, release_date, series, card, ags_set, owner, centering_overall, surface_overall, edges_overall, corners_overall, price, front_img, back_img) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+    sql_pool.insert_one(sql, info)
+
+
+def parse_data(cert_id, resp_text, sql_pool):
+    selector = Selector(text=resp_text)
+    name = selector.xpath('//div[@class="feed-view__header__content"]//h1/text()').get()
+    title = selector.xpath('//div[@class="feed-view__header__content"]//h2/text()').get()
+    score = selector.xpath(
+        '//div[@class="feed-view__header__content"]//p[@class="feed-view__header__grade-score"]/text()').get()
+
+    tr_list = selector.xpath('//div[@class="feed-view__right-side"]//tbody/tr[not(contains(@class, "feed-view"))]')
+    result_dict = {'card_type': '',
+                   'release_date': '',
+                   'series': '',
+                   'card': '',
+                   'set': '',
+                   'owner': ''}
+    for tr in tr_list:
+        item_key = tr.xpath('./td[1]/h3/text()').get()
+        item_val = tr.xpath('./td[2]/text()').get()
+        try:
+            item_key = item_key.strip().replace(':', '')
+            item_val = item_val.strip().replace(':', '')
+            # print('item_key,item_val', item_key, item_val)
+            key_snake_case = item_key.replace(' ', '_').replace('/', '_').lower()
+            if key_snake_case in list(result_dict.keys()):
+                # print(key_snake_case, item_val)
+                result_dict[key_snake_case] = item_val
+        except Exception as e:
+            logger.debug(e)
+
+    # overall
+    centering_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[1]/p[2]/text()').get()
+    surface_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[2]/p[2]/text()').get()
+    edges_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[3]/p[2]/text()').get()
+    corners_overall = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[4]/p[2]/text()').get()
+
+    # 获取用户id
+    user_card_id = selector.xpath(
+        '//div[@class="feed-view__breakdown__scores-holder feed-view__breakdown__scores-card-price"]/div/@data-user-card-id').get()
+    if user_card_id:
+        price = get_price(user_card_id)
+    else:
+        price = None
+
+    img_json = selector.xpath(
+        '//section[@class="feed-view__content"]//div[@class="feed-view__card"]/div/@data-images').get()
+    img_dict = json.loads(img_json)
+    front_img = img_dict.get('front_slab_image')
+    back_img = img_dict.get('back_slab_image')
+    if not front_img:
+        front_img = img_dict.get('image_path')
+        back_img = None
+
+    info = (cert_id, name, title, score, result_dict.get('card_type'), transform_date(result_dict.get('release_date')),
+            result_dict.get('series'), result_dict.get('card'), result_dict.get('set'), result_dict.get('owner'),
+            centering_overall, surface_overall, edges_overall, corners_overall, price, front_img, back_img)
+    # print(info)
+    save_data(sql_pool, info)
+
+
+@retry(stop_max_attempt_number=5, wait_fixed=2000)
+def get_data(ags_id_list, sql_pool):
+    sql_id = ags_id_list[0]
+    cert_id = ags_id_list[1]
+    headers = {
+        "user-agent": user_agent.generate_user_agent()
+    }
+    url = f"https://robograding.com/feed/{cert_id}/view"
+
+    try:
+        response = requests.get(url, headers=headers, proxies=get_proxys())
+        # response = requests.get(url, headers=headers)
+        response.raise_for_status()
+
+        if "Grades are not available yet" in response.text:
+            logger.debug("Grades are not available yet in response.text......................")
+            # 更新数据库状态为未完成
+            sql_pool.update_one("UPDATE ags_task SET state=2 WHERE id=%s", (sql_id,))
+        else:
+            parse_data(cert_id, response.text, sql_pool)
+            # 更新数据库状态为已完成
+            sql_pool.update_one("UPDATE ags_task SET state=1 WHERE id=%s", (sql_id,))
+    except requests.RequestException as e:
+        logger.error(f"Request error: {e}")
+        raise  # 可以选择重新抛出异常以便外部处理
+
+
+def process_urls(ids, mysql_pool, batch_size=1000, max_workers=5):
+    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+        for i in range(0, len(ids), batch_size):
+            batch = ids[i:i + batch_size]
+            try:
+                futures_to_urls = {executor.submit(get_data, url, mysql_pool): url for url in batch}
+                for future in concurrent.futures.as_completed(futures_to_urls):
+                    url = futures_to_urls[future]
+                    try:
+                        future.result()
+                        logger.debug(f"处理 {url} 成功")
+                    except Exception as exc:
+                        logger.debug(f"处理 {url} 出错: {exc}")
+            except Exception as e:
+                logger.error(f"提交任务失败: {e}")
+
+
+# def get_add_cert(start_, end_):
+#     num_list = []
+#     # 循环生成8位数的字符串
+#     for num in range(start_, end_ + 1):
+#         # 使用zfill将数字转换为字符串,并确保长度为8位,不足部分用0填充
+#         formatted_num = str(num).zfill(8)
+#         # print(formatted_num)
+#         num_list.append(formatted_num)
+#     return num_list
+
+
+def get_new_task(sql_pool):
+    # 查询最后一条数据  并且 +3000  将新任务插入表中
+    # max_cert = sql_pool.select_one("SELECT cert_id FROM ags_record ORDER BY id DESC LIMIT 1")
+    # start_max_cert = int(max_cert[0]) + 1
+    # end_max_cert = start_max_cert + 3000
+    # new_id_list = get_add_cert(start_max_cert, end_max_cert)
+    # logger.debug(f'查询到最新的 id 为:{max_cert[0]}, 开始生成新数据, 并添加到任务表中.........')
+    #
+    # # 防止重复添加任务
+    # state0_id_list = sql_pool.select_all("SELECT cert_id FROM ags_task WHERE state = 0")
+    # state0_id_list = [i[0] for i in state0_id_list]
+    # for i in state0_id_list:
+    #     if i in new_id_list:
+    #         new_id_list.remove(i)
+    # sql_pool.insert_all("INSERT INTO ags_task(cert_id) VALUES (%s)", new_id_list)
+
+    # 查询新任务列表
+    # ags_id_list = sql_pool.select_all("SELECT id,cert_id FROM ags_task WHERE state != 1")
+    ags_id_list = sql_pool.select_all("SELECT id,cert_id FROM ags_task WHERE state = 0 LIMIT 5000")
+    ags_id_list = [i for i in ags_id_list]
+    return ags_id_list
+
+
+@retry(stop_max_attempt_number=5, wait_fixed=2000)
+def main():
+    """
+    爬虫主函数
+    """
+    try:
+        logger.info("开始运行 tag_spider 爬虫任务............................................................")
+        sql_pool = MySQLConnectionPool(log=logger)
+        if not sql_pool:
+            logger.error("数据库连接失败")
+            raise Exception("数据库连接失败")
+        new_task = get_new_task(sql_pool)
+        try:
+            process_urls(new_task, sql_pool, batch_size=1000, max_workers=5)
+        except Exception as e:
+            logger.error('process urls: ', e)
+
+    except Exception as e:
+        logger.error(f'error:{e}')
+    finally:
+        logger.info("爬虫程序运行结束,等待下一轮的采集任务.............")
+
+
+def schedule_task():
+    # main()
+    schedule.every().day.at("00:01").do(main)
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    schedule_task()

+ 11 - 0
ags_spider/application.yml

@@ -0,0 +1,11 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.23}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}
+
+fluent:
+  host: ${FIUENT_HOST:192.168.66.152}
+  port: ${FIUENT_PORT:24225}
+  appname: ${FIUENT_APPNAME:psa_spider.log}

+ 191 - 0
ags_spider/mysq_pool.py

@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-08-05 19:42
+import pymysql
+import YamlLoader
+from loguru import logger
+from retrying import retry
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=1, maxcached=3, maxconnections=5, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db
+        )
+
+    @retry(stop_max_attempt_number=100, wait_fixed=600000)
+    def _get_connection(self):
+        """
+        获取连接
+        :return: 连接
+        """
+        try:
+            return self.pool.connection()
+        except Exception as e:
+            self.log.error(f"Failed to get connection from pool: {e}, wait 10 mins retry")
+            raise e
+
+    @staticmethod
+    def _close_connection(conn):
+        """
+        关闭连接
+        :param conn: 连接
+        """
+        if conn:
+            conn.close()
+
+    @retry(stop_max_attempt_number=5, wait_fixed=1000)
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.execute(query, args)
+            if commit:
+                conn.commit()
+            self.log.debug(f"sql _execute , Query: {query}, Rows: {cursor.rowcount}")
+            return cursor
+        except Exception as e:
+            if conn and not commit:
+                conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            self._close_connection(conn)
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        # self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all , SQL: {query}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
+            # 如果批量插入失败,则逐条插入
+            rowcount = 0
+            for args in args_list:
+                self.insert_one(query, args)
+                rowcount += 1
+            self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
+        finally:
+            cursor.close()
+            self._close_connection(conn)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all , SQL: {query}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+
+        finally:
+            cursor.close()
+            self._close_connection(conn)

+ 10 - 0
ags_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.0
+loguru==0.7.3
+parsel==1.10.0
+PyMySQL==1.1.1
+PyYAML==6.0.2
+requests==2.32.3
+retrying==1.3.4
+schedule==1.2.2
+user_agent==0.1.10

+ 22 - 0
ags_spider/start_ags_spider.py

@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/6/3 17:37
+import subprocess
+import time
+
+
+def start_spider(script_name):
+    """启动指定的爬虫脚本"""
+    print(f"Starting {script_name}...")
+    # 使用subprocess.Popen启动一个新的进程,这样即使父进程退出,子进程也能继续运行
+    subprocess.Popen(["python", script_name])
+
+
+# 启动已经内置了定时任务的爬虫
+start_spider("ags_new_daily.py")
+time.sleep(1)
+
+start_spider("ags_history_spider.py")
+
+print("All topps now spiders have been started. Press Ctrl+C to stop the main process.")