소스 검색

add 6.12.1

lei.chen 6 달 전
부모
커밋
48f1950458
6개의 변경된 파일504개의 추가작업 그리고 0개의 파일을 삭제
  1. 9 0
      bgs_spider/README.md
  2. 74 0
      bgs_spider/YamlLoader.py
  3. 11 0
      bgs_spider/application.yml
  4. 209 0
      bgs_spider/bgs_new_daily_spider.py
  5. 191 0
      bgs_spider/mysq_pool.py
  6. 10 0
      bgs_spider/requirements.txt

+ 9 - 0
bgs_spider/README.md

@@ -0,0 +1,9 @@
+## 1. bgs 爬虫
+
+[bgs_spider.py](https://git.hobbystocks.cn/xian/bgs_spider/src/master/bgs_spider.py)
+
+```python
+# 启动命令
+python bgs_new_daily_spider.py
+```
+

+ 74 - 0
bgs_spider/YamlLoader.py

@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+\:)?(?P<VAL>[\w\.]+)\}$')
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+
+    def get(self, key:str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+        
+    def getValueAsBool(self, key: str, env: str = None):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+    
+def readYaml(path:str = 'application.yml', profile:str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+
+    if profile != None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 11 - 0
bgs_spider/application.yml

@@ -0,0 +1,11 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.23}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}
+
+fluent:
+  host: ${FIUENT_HOST:192.168.66.152}
+  port: ${FIUENT_PORT:24225}
+  appname: ${FIUENT_APPNAME:psa_spider.log}

+ 209 - 0
bgs_spider/bgs_new_daily_spider.py

@@ -0,0 +1,209 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-11-12 16:40
+import time
+import requests
+import schedule
+import user_agent
+from loguru import logger
+import concurrent.futures
+from tenacity import stop_after_attempt, wait_fixed, retry
+from mysq_pool import MySQLConnectionPool
+
+logger.remove()
+logger.add("logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="1 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+def save_data(mysql_pool, info):
+    """
+    :param mysql_pool:
+    :param info:
+    :return:
+    """
+    sql = "INSERT INTO beckett_bgs_record(set_name, player_name, date_graded, centering_grade, corner_grade, edges_grade, surfaces_grade, auto_grade, final_grade, total_grade, cards_grade, number) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+    mysql_pool.insert_one(sql, info)
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_data(log, bgs_id, mysql_pool):
+    """
+    :param log:
+    :param bgs_id:
+    :param mysql_pool:
+    :return:
+    """
+    headers = {
+        "accept": "application/json, text/plain, */*",
+        "user-agent": user_agent.generate_user_agent()
+    }
+    url = "https://www.beckett.com/api/grading/lookup"
+    params = {
+        "category": "BGS",
+        "serialNumber": str(bgs_id)
+    }
+    response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=5)
+    if response.status_code == 404:
+        # 没有数据 No Record Found 将状态改为3
+        log.warning(f"No Record Found for {bgs_id}")
+        mysql_pool.update_one("UPDATE bgs_task SET state=3 WHERE auth_code=%s", (bgs_id,))
+        return
+
+    if response.status_code != 200:
+        # 查询失败  将状态改为2
+        log.warning(f"Error getting data for {bgs_id}, {response.status_code}")
+        mysql_pool.update_one("UPDATE bgs_task SET state=2 WHERE auth_code=%s", (bgs_id,))
+        return
+
+    # print(response.json())
+    result_dict = response.json()
+    if result_dict:
+        """
+            "label": "silver",
+            "non_bccg_card_total": 0,
+            "item_id": "17932864",
+            "set_name": "2024 Magic the Gathering Secret Lair Dungeons & Dragons 50th Anniversary Bonus Card Foil",
+            "sport_name": "Magic",
+            "card_key": "0879",
+            "player_name": "Minsc & Boo, Timeless Heroes M",
+            "date_graded": "Thursday, April 17, 2025",
+            "center_grade": "9.5",
+            "corners_grade": "8.5",
+            "edges_grade": "8.5",
+            "surfaces_grade": "10.0",
+            "autograph_grade": "0.0",
+            "final_grade": "8.5",
+        """
+        set_name = result_dict.get('set_name')
+        player_name = result_dict.get('player_name')
+        date_graded = result_dict.get('date_graded')
+        centering_grade = result_dict.get('center_grade')
+        corner_grade = result_dict.get('corners_grade')
+        edges_grade = result_dict.get('edges_grade')
+        surfaces_grade = result_dict.get('surface_grade')
+        auto_grade = result_dict.get('autograph_grade')
+        final_grade = result_dict.get('final_grade')
+        total_grade = result_dict.get('pop_report')
+        cards_grade = result_dict.get('pop_higher')
+        info = (set_name, player_name, date_graded, centering_grade, corner_grade, edges_grade, surfaces_grade,
+                auto_grade, final_grade, total_grade, cards_grade, int(bgs_id))
+
+        # 检查所有值是否都为 None或空字符串, 不包含bgs_id
+        all_none_or_empty = all(x is None or x == '' for x in info[:-1])
+        if all_none_or_empty:
+            log.debug("All values are empty")
+        else:
+            # print(info)
+            save_data(mysql_pool, info)
+            # 查询成功  将状态改为1
+            mysql_pool.update_one("UPDATE bgs_task SET state=1 WHERE auth_code=%s", (bgs_id,))
+
+
+def process_urls(log, ids, mysql_pool, batch_size=1000, max_workers=5):
+    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+        for i in range(0, len(ids), batch_size):
+            # print(i)
+            batch = ids[i:i + batch_size]
+            # print(batch)
+            try:
+                futures_to_urls = {executor.submit(get_data, log, url, mysql_pool): url for url in batch}
+                for future in concurrent.futures.as_completed(futures_to_urls):
+                    url = futures_to_urls[future]
+                    try:
+                        future.result()
+                        log.debug(f"处理 {url} 成功")
+                    except Exception as exc:
+                        log.debug(f"处理 {url} 出错: {exc}")
+            except Exception as e:
+                log.error(f"提交任务失败: {e}")
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def bgs_main(log):
+    try:
+        log.info(
+            "开始运行 bgs_main 爬虫任务............................................................")
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool:
+            log.error("数据库连接失败")
+            raise Exception("数据库连接失败")
+
+        max_bgs_id = sql_pool.select_one("SELECT MAX(number) AS max_number FROM beckett_bgs_record")
+        # print(max_bgs_id_list)
+        max_bgs_id = max_bgs_id[0]
+        log.info(f"max_bgs_id 从 {max_bgs_id} 开始爬取.........................")
+        bgs_id_list = [i for i in range(max_bgs_id, max_bgs_id + 3001)]
+
+        sql_pool.insert_all("INSERT INTO bgs_task(auth_code) VALUES (%s)", bgs_id_list)
+
+        # 倒序查 5000个
+        sql_bgs_id_list = sql_pool.select_all(
+            "SELECT auth_code FROM bgs_task WHERE state!=1 ORDER BY id DESC LIMIT 5000")
+        sql_bgs_id_list = [bid[0] for bid in sql_bgs_id_list]
+        # for bid in sql_bgs_id_list:
+        try:
+            process_urls(log, sql_bgs_id_list, sql_pool, batch_size=1000,
+                         max_workers=10)  # 根据需要调整batch_size和max_workers
+            # get_data(bid, mysql_pool)
+        except Exception as e:
+            log.error('process urls: ', e)
+
+    except Exception as e:
+        log.error(e)
+    finally:
+        log.info("爬虫程序运行结束,等待下一轮的采集任务.....................")
+
+
+def schedule_task():
+    """
+    设置定时任务
+    """
+    # 立即运行一次任务
+    # bgs_main(logger)
+
+    # 设置定时任务
+    schedule.every().day.at("03:01").do(bgs_main, logger)
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    schedule_task()
+    # get_data('1000743')

+ 191 - 0
bgs_spider/mysq_pool.py

@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-08-05 19:42
+import pymysql
+import YamlLoader
+from loguru import logger
+from retrying import retry
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=10, maxcached=5, maxconnections=20, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db
+        )
+
+    @retry(stop_max_attempt_number=100, wait_fixed=600000)
+    def _get_connection(self):
+        """
+        获取连接
+        :return: 连接
+        """
+        try:
+            return self.pool.connection()
+        except Exception as e:
+            self.log.error(f"Failed to get connection from pool: {e}, wait 10 mins retry")
+            raise e
+
+    @staticmethod
+    def _close_connection(conn):
+        """
+        关闭连接
+        :param conn: 连接
+        """
+        if conn:
+            conn.close()
+
+    @retry(stop_max_attempt_number=5, wait_fixed=1000)
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.execute(query, args)
+            if commit:
+                conn.commit()
+            self.log.debug(f"sql _execute , Query: {query}, Rows: {cursor.rowcount}")
+            return cursor
+        except Exception as e:
+            if conn and not commit:
+                conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            self._close_connection(conn)
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all , SQL: {query}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
+            # 如果批量插入失败,则逐条插入
+            rowcount = 0
+            for args in args_list:
+                self.insert_one(query, args)
+                rowcount += 1
+            self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
+        finally:
+            cursor.close()
+            self._close_connection(conn)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all , SQL: {query}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+
+        finally:
+            cursor.close()
+            self._close_connection(conn)

+ 10 - 0
bgs_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.0
+loguru==0.7.2
+parsel==1.9.1
+PyMySQL==1.1.1
+PyYAML==6.0.1
+Requests==2.32.3
+user_agent==0.1.10
+tenacity
+schedule