浏览代码

add 6.12.1

lei.chen 6 月之前
父节点
当前提交
ed471c3058

+ 19 - 0
weikajia_spider/README.md

@@ -0,0 +1,19 @@
+## 1. 微卡家  每日更新爬虫任务
+
+- **3个爬虫文件**
+
+​		[weika_bidding_spider.py](https://git.hobbystocks.cn/xian/weikajia_spider/src/master/weika_bidding_spider.py)
+
+​		[weika_change_card_by_id_spider.py](https://git.hobbystocks.cn/xian/weikajia_spider/src/master/weika_change_card_by_id_spider.py)
+
+​		[weika_sold_spider.py](https://git.hobbystocks.cn/xian/weikajia_spider/src/master/weika_sold_spider.py)
+
+- **集成启动文件**
+
+​		[weika_spider.py](https://git.hobbystocks.cn/xian/weikajia_spider/src/master/weika_spider.py)
+
+```python
+# 启动命令
+python weika_spider.py
+```
+

+ 74 - 0
weikajia_spider/YamlLoader.py

@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+\:)?(?P<VAL>[\w\.]+)\}$')
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+
+    def get(self, key:str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+        
+    def getValueAsBool(self, key: str, env: str = None):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+    
+def readYaml(path:str = 'application.yml', profile:str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+
+    if profile != None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 6 - 0
weikajia_spider/application.yml

@@ -0,0 +1,6 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.25}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}

+ 40 - 0
weikajia_spider/clean_logs.py

@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-09-29 10:54
+import os
+from loguru import logger
+from datetime import datetime, timedelta
+
+def clean_logs():
+    # 定义要处理的日志文件夹
+    log_folders = ['bidding_logs', 'change_logs', 'sold_logs']
+
+    # 获取前天的日期
+    today = datetime.now().date()
+    cutoff_date = today - timedelta(days=2)
+
+    for folder in log_folders:
+        # 构建完整路径
+        folder_path = os.path.join(os.getcwd(), folder)
+
+        if not os.path.exists(folder_path):
+            logger.warning(f"警告: 文件夹 {folder} 不存在.")
+            continue
+
+        # 列出该文件夹下所有的.log文件
+        for filename in os.listdir(folder_path):
+            if filename.endswith('.log'):
+                file_date_str = filename.split('.')[0]  # 去掉.log后缀
+                try:
+                    file_date = datetime.strptime(file_date_str, '%Y%m%d').date()
+
+                    # 如果文件日期早于截止日期,则删除文件
+                    if file_date < cutoff_date:
+                        file_path = os.path.join(folder_path, filename)
+                        os.remove(file_path)
+                        logger.info(f"已删除过期日志文件: {file_path}")
+                except ValueError:
+                    logger.error(f"无法解析文件名中的日期: {filename}")
+
+    logger.info("日志清理完成 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")

+ 154 - 0
weikajia_spider/fill_data.py

@@ -0,0 +1,154 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-09-24 10:50
+import requests
+from loguru import logger
+from retrying import retry
+from mysq_pool import MySQLConnectionPool
+
+base_url = "https://api.weikajia.com"
+def save_data(sql_pool,info):
+    logger.debug("正在写入数据库............")
+    sql = """
+            UPDATE weikajia_bidding
+            SET nickName=%s,
+                avatarOss=%s,
+                following=%s,
+                following=%s,
+                voteRate=%s,
+                level=%s,
+                introduceSign=%s,
+                certifyStatus=%s,
+                ipRegion=%s,
+                blueVflag=%s,
+                shopVflag=%s,
+                credit=%s,
+                agentLevel=%s,
+                agentId=%s
+            WHERE id=%s
+               """
+    sql_pool.update_one(sql, info)
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_action(auctionId,headers):
+    """
+    获取auction信息
+    :param auctionId:
+    :return: agentUserInfo
+    """
+    logger.debug(f'正在查询auctionId为: {auctionId}的信息..............')
+    url = f"{base_url}/api/v2/auction/detail"
+    params = {
+        "auctionId": auctionId
+    }
+    response = requests.get(url, headers=headers, params=params, timeout=5)
+    # print(f'get_action: {response.json()}')
+    if response.json()["resultCode"] == 200:
+        agentUserInfo = response.json()["data"].get("agentUserInfo")
+        agentId = response.json()["data"].get("agentId")
+        agentUserInfo["agentId"] = agentId
+        return agentUserInfo
+    else:
+        logger.debug("get_action 请求失败,重试中...........")
+        raise Exception("请求失败")
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_cabinet(cabinetId,headers):
+    """
+    获取cabinet信息
+    :param cabinetId:
+    :return: cab_dict
+    """
+    logger.debug(f'正在查询cabinetId为: {cabinetId}的信息..............')
+    url = f"{base_url}/api/v2/cabinet/detail"
+    params = {
+        "cabinetId": cabinetId
+    }
+    response = requests.get(url, headers=headers, params=params, timeout=5)
+    # print(f'get_cabinet: {response.json()}')
+    if response.json()["resultCode"] == 200:
+        data = response.json()["data"]
+        cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"),
+                    "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"),
+                    "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"),
+                    "setName": data.get("setName"), "player": data.get("player"),
+                    "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber")
+                    }
+        return cab_dict
+    else:
+        logger.debug("get_cabinet 请求失败,重试中...........")
+        raise Exception("请求失败")
+
+def fill_main():
+    try:
+        sql_pool = MySQLConnectionPool(log=logger)
+        if not sql_pool:
+            logger.error("数据库连接失败")
+            raise Exception("数据库连接失败")
+        token = sql_pool.select_one("select token from wkj_token")
+        headers = {
+            "appVersion": "1.6.5",
+            "osVersion": "9",
+            "deviceModel": "M2007J22C",
+            "appVersionCode": "168",
+            "deviceBrand": "xiaomi",
+            "platform": "android",
+            "token": token[0],
+            "user-agent": "Mozilla/5.0 (Linux; Android 9; M2007J22C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.131 Mobile Safari/537.36",
+            "Content-Type": "application/json",
+            "Connection": "Keep-Alive"
+        }
+
+        sql = """
+            SELECT id,cabinetId, auctionItemId 
+            FROM weikajia_bidding 
+            WHERE nickName IS NULL 
+              AND avatarOss IS NULL 
+              AND following IS NULL 
+              AND voteRate IS NULL 
+              AND level IS NULL 
+              AND introduceSign IS NULL 
+              AND certifyStatus IS NULL 
+              AND ipRegion IS NULL 
+              AND blueVflag IS NULL 
+              AND shopVflag IS NULL 
+              AND credit IS NULL 
+              AND agentLevel IS NULL 
+              AND agentId IS NULL 
+              AND rmbPrice IS NULL 
+              AND brand IS NULL 
+              AND status IS NULL 
+              AND switchSt IS NULL 
+              AND cardNo IS NULL 
+              AND printNo IS NULL 
+              AND printNoS IS NULL 
+              AND sportId IS NULL 
+              AND barcodeId IS NULL 
+              AND year IS NULL 
+              AND grade IS NULL 
+              AND setName IS NULL 
+              AND playerIds IS NULL 
+              AND player IS NULL 
+              AND onSaleExpireTs IS NULL 
+              AND authenticNumber IS NULL;
+        """
+        res = sql_pool.select_all(sql)
+        for ii in res:
+            sql_id = ii[0]
+            cabinetId = ii[1]
+            auctionId = ii[2]
+            try:
+                act_dict = get_action(auctionId, headers)
+                cab_dict = get_cabinet(cabinetId, headers)
+                info = (act_dict, cab_dict,sql_id)
+                save_data(sql_pool,info)
+                # time.sleep(random.randint(3, 5))
+            except Exception as e:
+                logger.error(f'出错, {e}')
+    except Exception as e:
+        logger.error(e)
+    finally:
+        logger.info("爬虫程序运行结束,等待下一轮的采集任务.............")

+ 191 - 0
weikajia_spider/mysq_pool.py

@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-08-05 19:42
+import pymysql
+import YamlLoader
+from loguru import logger
+from retrying import retry
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db
+        )
+
+    @retry(stop_max_attempt_number=100, wait_fixed=600000)
+    def _get_connection(self):
+        """
+        获取连接
+        :return: 连接
+        """
+        try:
+            return self.pool.connection()
+        except Exception as e:
+            self.log.error(f"Failed to get connection from pool: {e}, wait 10 mins retry")
+            raise e
+
+    @staticmethod
+    def _close_connection(conn):
+        """
+        关闭连接
+        :param conn: 连接
+        """
+        if conn:
+            conn.close()
+
+    @retry(stop_max_attempt_number=5, wait_fixed=1000)
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.execute(query, args)
+            if commit:
+                conn.commit()
+            self.log.debug(f"sql _execute , Query: {query}, Rows: {cursor.rowcount}")
+            return cursor
+        except Exception as e:
+            if conn and not commit:
+                conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            self._close_connection(conn)
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all , SQL: {query}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
+            # 如果批量插入失败,则逐条插入
+            rowcount = 0
+            for args in args_list:
+                self.insert_one(query, args)
+                rowcount += 1
+            self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
+        finally:
+            cursor.close()
+            self._close_connection(conn)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self._get_connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all , SQL: {query}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+
+        finally:
+            cursor.close()
+            self._close_connection(conn)

+ 8 - 0
weikajia_spider/requirements.txt

@@ -0,0 +1,8 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.0
+loguru==0.7.2
+PyMySQL==1.1.1
+PyYAML==6.0.1
+requests==2.32.3
+retrying==1.3.4
+schedule==1.2.2

+ 151 - 0
weikajia_spider/weika_bidding_spider.py

@@ -0,0 +1,151 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-09-11 14:17
+import random
+import time
+import requests
+from retrying import retry
+from datetime import datetime
+from mysq_pool import MySQLConnectionPool
+
+
+def save_data(sql_pool, info):
+    """
+    保存数据
+    :param sql_pool:
+    :param info:
+    """
+    sql = "INSERT INTO weikajia_bidding (cabinetId, imgs, title, price, lastBidPrice, auctionItemId, auctionStart, auctionEnd, currBidIndex) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
+    sql_pool.insert_all(sql, info)
+
+
+def transform(timestamp):
+    # 将Unix时间戳转换为datetime对象
+    dt_object = datetime.fromtimestamp(int(timestamp))
+
+    # 格式化时间
+    formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S')
+    return formatted_time
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=10000)
+def get_list_page(headers, logger):
+    url = "https://api.weikajia.com/search/searchAuctionItem"
+
+    data = {
+        "page": 1,
+        "pageSize": 10,
+        "hideLoading": True,
+        "ascSort": "desc",
+        "sortType": "auction_start",
+        "orderStatus": "1"
+    }
+    response = requests.post(url, headers=headers, json=data, timeout=5)
+    # print(f'get_list_page: {response.json()}')
+    if response.json()["resultCode"] != 200:
+        logger.debug("get_list_page resultCode 请求失败,重试中...........")
+        raise Exception("请求失败")
+    total = response.json().get('data').get('total')
+    if total:
+        return total
+    else:
+        logger.debug("get_list_page total 请求失败,重试中...........")
+        raise Exception("get_list_page请求失败,重试中...........")
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_list(sql_pool, pp, headers, logger):
+    """
+    获取列表页信息
+    :param logger:
+    :param sql_pool:
+    :param pp:
+    :param headers:
+    :return:
+    """
+    url = "https://api.weikajia.com/search/searchAuctionItem"
+    # data = {
+    #     "page": int(pp),
+    #     "pageSize": 10,
+    #     "hideLoading": True,
+    #     "ascSort": "asc",
+    #     "sortType": "auction_end",
+    #     "orderStatus": "1"
+    # }
+    data = {
+        "page": int(pp),
+        "pageSize": 10,
+        "hideLoading": True,
+        "ascSort": "desc",
+        "sortType": "auction_start",
+        "orderStatus": "1"
+    }
+    response = requests.post(url, headers=headers, json=data, timeout=5)
+    # print(f'get_list: {response.json()}')
+    if response.json()["resultCode"] != 200:
+        logger.debug("请求失败,重试中...........")
+        raise Exception("请求失败")
+    logger.debug(f'第{pp}页请求成功..............')
+    cardCabinet = response.json().get('data', {}).get('cardCabinet', [])
+    if cardCabinet:
+        info_list = []
+        for item in cardCabinet:
+            cabinetId = item.get("cabinetId")
+            imgs = item.get("imgs")
+            title = item.get("title")
+            price = item.get("price")
+            lastBidPrice = item.get("lastBidPrice")
+            auctionItemId = item.get("auctionItemId")
+            auctionStart_ = item.get("auctionStart")
+            auctionStart = transform(auctionStart_)
+            auctionEnd_ = item.get("auctionEnd")
+            auctionEnd = transform(auctionEnd_)
+            currBidIndex = item.get("currBidIndex")
+
+            info = (cabinetId, imgs, title, price, lastBidPrice, auctionItemId, auctionStart, auctionEnd, currBidIndex)
+            # print(info)
+            info_list.append(info)
+        save_data(sql_pool, info_list)
+
+
+@retry(stop_max_attempt_number=100, wait_fixed=3600000)
+def bidding_main(log):
+    try:
+        log.info("开始运行 bidding_spider 爬虫任务............................................................")
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool:
+            log.error("数据库连接失败")
+            raise Exception("数据库连接失败")
+        # token = sql_pool.select_one("select token from wkj_token")
+        headers = {
+            "appVersion": "1.6.5",
+            "osVersion": "9",
+            "deviceModel": "M2007J22C",
+            "appVersionCode": "168",
+            "deviceBrand": "xiaomi",
+            "platform": "android",
+            # "token": token[0],
+            "user-agent": "Mozilla/5.0 (Linux; Android 9; M2007J22C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.131 Mobile Safari/537.36",
+            "Content-Type": "application/json",
+            "Connection": "Keep-Alive"
+        }
+        total = get_list_page(headers, log)
+        pages = (total + 9) // 10  # 计算页码
+        log.info(
+            f'----------------------------------------总条数为{total}条, 总页码为{pages}页----------------------------------------')
+        for i in range(1, pages + 1):
+            log.debug(f'正在爬取第{i}页..............')
+            try:
+                get_list(sql_pool, i, headers, log)
+                time.sleep(random.randint(3, 5))
+            except Exception as e:
+                log.error(f'第{i}页出错, {e}')
+    except Exception as e:
+        log.error(e)
+    finally:
+        log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
+
+if __name__ == '__main__':
+    from loguru import logger
+    bidding_main(logger)

+ 103 - 0
weikajia_spider/weika_change_card_by_id_spider.py

@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-09-14 10:07
+import random
+import time
+import requests
+from retrying import retry
+from mysq_pool import MySQLConnectionPool
+
+increases_daily_id = 1500
+
+
+def save_data(sql_pool, info):
+    """
+    保存数据
+    :param sql_pool:
+    :param info:
+    """
+    sql = "INSERT INTO weikajia_change_card_by_id(cabinetId, imgs, nickName, onSaleExpireTs, player, playerIds, rmbPrice, title, userId, status, switchSt) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+    sql_pool.insert_one(sql, info)
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_data(sql_pool, headers, ii, log):
+    url = "https://api.weikajia.com/api/v2/cabinet/detail"
+    log.info(f"开始处理 ID 为{ii}的数据 ->->->->->->->->->->->->->->->->->->->->")
+    params = {
+        "cabinetId": f"{ii}"
+    }
+    response = requests.get(url, headers=headers, params=params, timeout=5)
+    time.sleep(random.randint(1, 3))
+    # print(response.json())
+    if response.json()["resultCode"] != 200:
+        log.debug("请求失败,重试中...........")
+        raise Exception("请求失败")
+    log.debug(f'ID 为{ii}的数据请求成功..............')
+    resp_data = response.json().get('data', {})
+    cabinetId = resp_data.get('cabinetId')
+    # if cabinetId == 0:
+    #     log.debug(f'ID 为{ii}的数据不存在..............')
+    #     break
+    imgs = resp_data.get('imgs')
+    nickName = resp_data.get('nickName')
+    onSaleExpireTs = resp_data.get('onSaleExpireTs')
+    player = resp_data.get('player')
+    playerIds = resp_data.get('playerIds')
+    rmbPrice = resp_data.get('rmbPrice')
+    title = resp_data.get('title')
+    userId = resp_data.get('userId')
+    status = resp_data.get('status')
+    switchSt = resp_data.get('switchSt')
+    if onSaleExpireTs != '0001-01-01 00:00:00':
+        info = (cabinetId, imgs, nickName, onSaleExpireTs, player, playerIds, rmbPrice, title, userId, status, switchSt)
+        # print(info)
+        save_data(sql_pool, info)
+    else:
+        log.debug(f'ID:{ii}的后台数据为空..............')
+
+
+@retry(stop_max_attempt_number=100, wait_fixed=3600000)
+def run(log):
+    try:
+        log.info(
+            "开始运行 change_card_by_id_spider 爬虫任务............................................................")
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool:
+            log.error("数据库连接失败")
+            raise Exception("数据库连接失败")
+        token = sql_pool.select_one("select token from wkj_token")
+        headers = {
+            "appVersion": "1.6.5",
+            "osVersion": "9",
+            "deviceModel": "M2007J22C",
+            "appVersionCode": "168",
+            "deviceBrand": "xiaomi",
+            "platform": "android",
+            "token": token[0],
+            "user-agent": "Mozilla/5.0 (Linux; Android 9; M2007J22C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.131 Mobile Safari/537.36",
+            "Content-Type": "application/json",
+            "Connection": "Keep-Alive"
+        }
+        max_sql = "SELECT MAX(cabinetId) FROM weikajia_change_card_by_id"
+        max_cab_id = sql_pool.select_one(max_sql)[0] + 1
+        log.info(
+            f"max_cab_id 为{max_cab_id - 1}, 从{max_cab_id}开始查询数据 ->->->->->->->->->->->->->->->->->->->->")
+        for ii in range(max_cab_id, max_cab_id + increases_daily_id):
+            try:
+                get_data(sql_pool, headers, ii, log)
+            except Exception as e:
+                log.error(f'get_data error: {e}')
+    except Exception as e:
+        log.error(e)
+    finally:
+        log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
+
+
+def change_card_main(log):
+    run(log)
+
+if __name__ == '__main__':
+    from loguru import logger
+    change_card_main(logger)

+ 316 - 0
weikajia_spider/weika_sold_spider.py

@@ -0,0 +1,316 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-09-30 13:29
+import json
+import random
+import time
+import requests
+from retrying import retry
+from datetime import datetime
+from mysq_pool import MySQLConnectionPool
+
+# logger.remove()
+# logger.add("sold_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+#            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+#            level="DEBUG", retention="1 day")
+
+
+headers = {
+    "appVersion": "1.6.5",
+    "osVersion": "9",
+    "deviceModel": "M2007J22C",
+    "appVersionCode": "168",
+    "deviceBrand": "xiaomi",
+    "platform": "android",
+    "token": "",
+    "user-agent": "Mozilla/5.0 (Linux; Android 9; M2007J22C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.131 Mobile Safari/537.36",
+    "Content-Type": "application/json",
+    "Connection": "Keep-Alive"
+}
+base_url = "https://api.weikajia.com"
+
+
+def transform_timestamp(timestamp):
+    """
+    将 timestamp 格式转换为 %Y-%m-%d %H:%M:%S
+    :param timestamp:
+    :return: formatted_time
+    """
+    # 将Unix时间戳转换为datetime对象
+    dt_object = datetime.fromtimestamp(int(timestamp))
+    # 格式化时间
+    formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S')
+    return formatted_time
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_action(log, auctionId):
+    """
+    获取auction信息
+    :param log:
+    :param auctionId:
+    :return: agentUserInfo
+    """
+    log.debug(f'正在查询auctionId为: {auctionId}的信息..............')
+    url = f"{base_url}/api/v2/auction/detail"
+    params = {
+        "auctionId": auctionId
+    }
+    response = requests.get(url, headers=headers, params=params, timeout=5)
+    # print(f'get_action: {response.json()}')
+    if response.json()["resultCode"] == 200:
+        try:
+            agentUserInfo = response.json()["data"].get("agentUserInfo")
+            agentId = response.json()["data"].get("agentId")
+            agentUserInfo["agentId"] = agentId
+            return agentUserInfo
+        except Exception as e:
+            log.error(f"get_action agentUserInfo , error: {e}")
+            return {}
+    else:
+        log.debug("get_action 请求失败,重试中...........")
+        # raise Exception("请求失败")
+        return {}
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_cabinet(log, cabinetId):
+    """
+    获取cabinet信息
+    :param log:
+    :param cabinetId:
+    :return: cab_dict
+    """
+    log.debug(f'正在查询cabinetId为: {cabinetId}的信息..............')
+    url = f"{base_url}/api/v2/cabinet/detail"
+    params = {
+        "cabinetId": cabinetId
+    }
+    response = requests.get(url, headers=headers, params=params, timeout=5)
+    # print(f'get_cabinet: {response.json()}')
+    if response.json()["resultCode"] == 200:
+        data = response.json()["data"]
+        cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"),
+                    "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"),
+                    "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"),
+                    "setName": data.get("setName"), "player": data.get("player"),
+                    "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber")
+                    }
+        return cab_dict
+    else:
+        log.debug("get_cabinet 请求失败,重试中...........")
+        raise Exception("请求失败")
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_sold_xhr_page(log):
+    """
+    获取已售数据页数
+    :return: total
+    """
+    log.info("开始获取总页数....")
+    url = f"{base_url}/search/searchAuctionItem"
+    data = {
+        "page": 1,
+        "pageSize": 10,
+        "orderStatus": "3",
+        "sortType": "auction_end",
+        "ascSort": "desc"
+    }
+    response = requests.post(url, headers=headers, json=data)
+    total = response.json()['data']['total']
+    if total:
+        return total
+    else:
+        log.error("get_sold_xhr_page, error")
+        raise Exception("获取get_sold_xhr_page数据失败")
+
+
+def fetch_all_pages(log):
+    """
+    查询所有页数的数据
+    :return: page_data 每页的数据
+    """
+    log.info("开始获取所有页数据....")
+    total = get_sold_xhr_page(log)
+    pages = (total + 9) // 10  # 计算页码
+    log.info(f"一共有{total}条已售数据, 总页数: {pages}..................................")
+    for page in range(1, pages + 1):
+        data = {
+            "page": page,
+            "pageSize": 10,
+            "orderStatus": "3",
+            "sortType": "auction_end",
+            "ascSort": "desc"
+        }
+        response = requests.post(f"{base_url}/search/searchAuctionItem", headers=headers, json=data)
+        page_data = response.json()['data']['cardCabinet']
+        # all_data.extend(page_data)
+        yield page_data
+        time.sleep(1)
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def get_bid(log, aid, page, token):
+    """
+    获取竞价相关数据  每个用户的最后一条竞价信息
+    :param token:
+    :param log:
+    :param aid: auctionItemId
+    :param page:
+    :return: result: recordList
+    """
+    url = f"{base_url}/api/v2/auction/record"
+    params = {
+        "auctionItemId": aid,
+        "pageNumber": str(page),
+        "pageSize": "10"
+    }
+    log.debug(f'正在获取竞价相关第{page}页的数据..................')
+    headers["token"] = token[0]
+    response = requests.get(url, headers=headers, params=params)
+    # print(f'get_bid: {response.json()}')
+    if response.status_code != 200:
+        log.error(f"请求失败,状态码: {response.status_code}")
+        raise Exception("请求失败")
+    time.sleep(1)
+    recordList = response.json()['data']['recordList']
+    if recordList:
+        return recordList
+    else:
+        log.error(f"get_bid, error")
+        raise Exception("获取get_bid数据失败")
+
+
+def get_bid_list(log, aid, bidIndex, token):
+    """
+    获取竞价相关数据  每个用户的最后一条竞价信息
+    :param token:
+    :param log:
+    :param aid: auctionItemId
+    :param bidIndex: 竞价总条数
+    :return: result: JSON列表格式
+    """
+    # if bidIndex <= 0:
+    #     bidIndex = 1
+    log.info(f"开始获取第{aid}的get_bid_list数据, 一共{bidIndex}条")
+    pages = (bidIndex + 9) // 10  # 计算页码
+    resp_list = []
+    # for page in range(1, int(pages) + 1):
+    #     recordList = get_bid(log, aid, page, token)
+    #     resp_list.extend(recordList)
+    for page in range(1, int(pages) + 1):
+        try:
+            recordList = get_bid(log, aid, page, token)
+            resp_list.extend(recordList)
+        except Exception as e:
+            log.error(f"recordList  get_bid error: {e}")
+            break
+
+
+    # 创建一个字典来存储每个用户的最新竞价信息
+    latest_bids = {}
+    for bid in resp_list:
+        nick_name = bid['nickName']
+        if nick_name not in latest_bids or bid['bidTime'] > latest_bids[nick_name]['bidTime']:
+            latest_bids[nick_name] = {'nickName': nick_name, 'bidPrice': bid['bidPrice'], 'bidTime': bid['bidTime']}
+
+    result = [
+        {'nickName': i['nickName'], 'bidPrice': i['bidPrice'], 'bidTime': transform_timestamp(i['bidTime'])}
+        for i in latest_bids.values()
+    ]
+    return result
+
+
+def save_data(sql_pool, info):
+    """
+    保存数据
+    :param sql_pool:
+    :param info:
+    :return:
+    """
+    sql = """INSERT INTO weikajia_sold(cabinetId, auctionItemId, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd, bid_list, nickName, following, certifyStatus, ipRegion, credit, agentLevel, agentId, rmbPrice, brand, status, switchSt, cardNo, barcodeId, year, grade, setName, player, onSaleExpireTs, authenticNumber) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
+    sql_pool.insert_one(sql, info)
+
+
+@retry(stop_max_attempt_number=3, wait_fixed=1000)
+def run(sql_pool, log, token):
+    """
+    主运行函数
+    """
+    try:
+        log.info("开始运行 sold_spider 爬虫任务............................................................")
+        # token = sql_pool.select_one("select token from wkj_token")
+        # headers["token"] = token[0]
+        sql_cabinetId_list = sql_pool.select_all("select cabinetId from weikajia_sold")
+        cabinetId_list = [i[0] for i in sql_cabinetId_list]
+        # print(cabinetId_list)
+        for page_data in fetch_all_pages(log):
+            # info_list = []
+            for auction in page_data:
+                aid = auction.get('auctionItemId')
+                cabinetId = auction.get("cabinetId")
+
+                # 判断cabid是否在库中
+                if cabinetId in cabinetId_list:
+                    log.info(
+                        f"{cabinetId}已存在,跳过............................................................")
+                    continue
+                else:
+                    cabinetId_list.append(cabinetId)
+
+                    imgs = auction.get("imgs")
+                    title = auction.get("title")
+                    bidIndex = auction.get("currBidIndex")
+                    price = auction.get("price")
+                    lastBidPrice = auction.get("lastBidPrice")
+                    auctionStart_ = auction.get("auctionStart")
+                    auctionStart = transform_timestamp(auctionStart_)
+                    auctionEnd_ = auction.get("auctionEnd")
+                    auctionEnd = transform_timestamp(auctionEnd_)
+                    bid_list = get_bid_list(log, aid, bidIndex, token)
+
+                    # 获取详情页数据
+                    act_dict = get_action(log, aid)
+                    # time.sleep(random.randint(5, 10))
+                    cab_dict = get_cabinet(log, cabinetId)
+                    # follows = act_dict.get("follows")
+                    if not bid_list:
+                        bid_list = []
+                    info = (
+                        cabinetId, aid, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd,
+                        json.dumps(bid_list, ensure_ascii=False), act_dict.get("nickName"),
+                        act_dict.get("following"), act_dict.get("certifyStatus"), act_dict.get("ipRegion"),
+                        act_dict.get("credit"), act_dict.get("agentLevel"), act_dict.get("agentId"),
+                        cab_dict.get("rmbPrice"), cab_dict.get("brand"), cab_dict.get("status"),
+                        cab_dict.get("switchSt"), cab_dict.get("cardNo"), cab_dict.get("barcodeId"),
+                        cab_dict.get("year"), cab_dict.get("grade"), cab_dict.get("setName"),
+                        cab_dict.get("player"), cab_dict.get("onSaleExpireTs"), cab_dict.get("authenticNumber")
+                    )
+                    # info_list.append(info)
+
+                    # 保存每页的数据
+                    # logger.info(info)
+                    save_data(sql_pool, info)
+                    time.sleep(random.randint(1, 3))
+        cabinetId_list.clear()
+    except Exception as e:
+        log.error(f'Error: {e}')
+    finally:
+        log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
+
+
+@retry(stop_max_attempt_number=100, wait_fixed=3600000)
+def sold_main(log):
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool:
+        log.error("数据库连接失败")
+        raise Exception("数据库连接失败")
+    token = sql_pool.select_one("select token from wkj_token")
+    run(sql_pool, log, token)
+
+
+if __name__ == '__main__':
+    from loguru import logger
+    sold_main(log=logger)

+ 57 - 0
weikajia_spider/weika_spider.py

@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# Author  : Charley
+# Python  : 3.8.10
+# Date: 2024-09-13 10:46
+import time
+import schedule
+import threading
+from loguru import logger
+from weika_bidding_spider import bidding_main
+from weika_change_card_by_id_spider import change_card_main
+from weika_sold_spider import sold_main
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="1 day")
+
+
+def run_threaded(job_func, *args, **kwargs):
+    """
+    在新线程中运行给定的函数,并传递参数。
+
+    :param job_func: 要运行的目标函数
+    :param args: 位置参数
+    :param kwargs: 关键字参数
+    """
+    job_thread = threading.Thread(target=job_func, args=args, kwargs=kwargs)
+    job_thread.start()
+
+
+def schedule_task():
+    """
+    两个爬虫模块的启动文件
+    bidding_main
+    weika_change_card_by_id_spider
+    change_card_main
+    """
+    # 立即运行一次任务
+    # run_threaded(bidding_main, log=logger)
+    # run_threaded(change_card_main, log=logger)
+    # run_threaded(sold_main, log=logger)
+    # bidding_main(log=logger)
+    # change_card_main(log=logger)
+    # sold_main(log=logger)
+
+    # 设置定时任务
+    # schedule.every().day.at("00:00").do(run_threaded,clean_logs)
+    schedule.every().day.at("08:01").do(run_threaded, bidding_main, log=logger)
+    schedule.every().day.at("00:01").do(run_threaded, change_card_main, log=logger)
+    schedule.every().day.at("10:01").do(run_threaded, sold_main, log=logger)
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    schedule_task()