Ver código fonte

update 6.13.1

lei.chen 6 meses atrás
pai
commit
fae6264064

+ 44 - 0
ins_img_video_spider/README.md

@@ -0,0 +1,44 @@
+```azure
+1. [主程序] 调用 get_userPosts('fanatics') 创建生成器
+2. [生成器激活] 进入 get_userPosts 函数:
+   ├─ 2.1 初始化分页参数 continuations = [{'count':'12'}]
+   ├─ 2.2 构建初始 URL:https://i.instagram.com/api/v1/feed/user/fanatics/username/
+   └─ → 暂停等待迭代
+
+3. [首次迭代] 主程序执行 for item_ in items:
+   ├─ 3.1 get_userPosts 恢复执行:
+   │   ├─ 弹出分页参数 {'count':'12'}
+   │   ├─ 发送 API 请求获取第一页数据
+   │   └─ → 若失败:产出错误信息 → 结束
+   │
+   ├─ 3.2 处理响应:
+   │   ├─ 检查 resp.user 是否存在
+   │   ├─ → 不存在:产出错误 → 结束
+   │   └─ → 存在:
+   │       ├─ 提取 _items = resp.items
+   │       ├─ 检查 more_available:
+   │       │   ├─ 是:添加 {'count':'12', 'max_id':next_max_id} 到 continuations
+   │       │   └─ 更新 temp 为用户ID
+   │       └─ 执行 yield from extract_post(_items)
+   │
+   └─ 3.3 extract_post 开始处理:
+       ├─ 遍历每个 post:
+       │   ├─ 创建基础 item 字典
+       │   ├─ 根据 media_type 添加媒体URL:
+       │   │   ├─ 类型1(单图):item.photo = 图片URL
+       │   │   ├─ 类型2(视频):item.video = 视频URL
+       │   │   └─ 类型8(轮播):item.photo=[多图], item.video=首视频
+       │   └─ 产出 item → 主程序打印
+       └─ 全部处理完后 → 返回 get_userPosts
+
+4. [后续迭代] 主程序继续循环:
+   ├─ 4.1 检查 continuations:
+   │   ├─ 为空 → 结束迭代
+   │   └─ 非空 → 重复步骤3.1-3.3获取下一页
+   │
+   └─ 4.2 自动分页示例:
+       ├─ 第一页:max_id=null → 获取前12条
+       ├─ 第二页:max_id=xxx → 获取下12条
+       └─ 直到 more_available=False
+
+```

+ 74 - 0
ins_img_video_spider/YamlLoader.py

@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+\:)?(?P<VAL>[\w\.]+)\}$')
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+
+    def get(self, key:str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+        
+    def getValueAsBool(self, key: str, env: str = None):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+    
+def readYaml(path:str = 'application.yml', profile:str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+
+    if profile != None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 11 - 0
ins_img_video_spider/application.yml

@@ -0,0 +1,11 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.23}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}
+
+fluent:
+  host: ${FIUENT_HOST:192.168.66.152}
+  port: ${FIUENT_PORT:24225}
+  appname: ${FIUENT_APPNAME:psa_spider.log}

+ 256 - 0
ins_img_video_spider/ins_history_spider.py

@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/5/15 13:19
+import inspect
+import random
+import time
+
+import schedule
+from loguru import logger
+from datetime import datetime
+# from tls_client import Session
+from curl_cffi import Session
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+from mysql_pool import MySQLConnectionPool
+
+USER_NAME_LIST = ['hobbysbestcards']  # 查询的用户名列表 hobbysbestcards
+
+cookie = r'ig_did=8D2CD910-0CBD-41CD-A5B4-9EB7E2F8BC91; ps_l=1; ps_n=1; datr=0aYZaGecXnDrIALr4HPo5O0h; mid=aBmm0QALAAFOBiNIagQ4prL9V4Zg; dpr=1.5; csrftoken=1Eeolr1d8t3VMjwNQIeMMQx9JTlyUsGu; sessionid=50762414324%3Af7LRzwBjb06Q7U%3A6%3AAYfUreTnqm7V_o3Pvqt0Tej1vwMQDGjOKw_Zm8TOqA; ds_user_id=50762414324; rur="RVA\05450762414324\0541778817145:01f75b26510d73b461bb75b0f907b2ec268507a83f95fb7c5a8571ced3b614c68af0d6b5"; wd=1707x247'
+
+PARAMS = r'("app_id":\s*"[^"]+")|("claim":\s*"[^"]+")|("csrf_token":\s*"[^"]+")|(["LSD",[],{"token":\s*"[^"]+")'
+
+# session = Session(client_identifier="chrome_120", random_tls_extension_order=True)
+session = Session()
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+MAX_PAGE = 3000  # 最大页数
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(15), after=after_log)
+def ajax_request(log, url: str, params=None):
+    """
+    请求封装
+    :param log: logger对象
+    :param url: api url
+    :param params: api params
+    :return: json object
+    """
+    try:
+        headers = {
+            'sec-fetch-mode': 'cors',
+            'referer': 'https://www.instagram.com/',
+            'x-ig-app-id': '936619743392459',
+            'sec-fetch-site': 'same-site',
+            'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
+            'x-asbd-id': '198387',
+            'accept': '*/*',
+            'sec-ch-ua': 'Chromium";v="104", " Not A;Brand";v="99", "Google Chrome";v="104"',
+            'sec-ch-ua-mobile': '?0',
+            'x-ig-www-claim': 'hmac.AR11qy__GsvLpiS4wKBygLGdxs2DxJB1esTkBw7b2QFaHH2d',
+            'authority': 'i.instagram.com',
+            'sec-ch-ua-platform': 'Windows"',
+            'x-instagram-ajax': '1006400593',
+            'sec-fetch-dest': 'empty',
+            'user-agent': 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36',
+            'cookie': cookie
+        }
+        resp = session.get(url, headers=headers, params=params)
+        # print(resp.text)
+        resp.raise_for_status()
+
+        return resp.json()
+    except Exception as e:
+        log.error(f"Request failed: {e}")
+        raise
+
+
+def get_userPosts(log, userName: str, sql_uid_list: list):
+    """
+    从用户名获取所有帖子
+    :param log: logger对象
+    :param userName:  用户名
+    :param sql_uid_list: sql_uid_list 列表
+    :return: generator
+    """
+    page = 1
+    continuations = [{
+        'count': '12',
+    }]
+    temp = userName + '/username/'
+    while continuations:
+        continuation = continuations.pop()
+        log.info(f"The page number currently requested is: {page}.........")
+        # Url将在第二次请求时更改
+        url = 'https://i.instagram.com/api/v1/feed/user' + f'/{temp}'
+        resp = ajax_request(log, url, params=continuation)
+
+        if not resp:
+            log.error("API请求失败,跳过当前分页")
+
+        time.sleep(random.uniform(3, 5))
+        page += 1
+
+        if page > MAX_PAGE:
+            log.info(f"The page number currently requested is: {page}.........")
+            break
+
+        # 没有这样的用户
+        if not resp.get('user'):
+            log.warning(f"checking cookie or unknown/private User: {userName}")
+            yield 'checking cookie or unknown/private User: {}'.format(userName)
+        else:
+            _items = resp.get('items', [])
+            # 模拟鼠标按下
+
+            if resp.get('more_available'):
+                continuations.append({'count': '12', 'max_id': resp.get('next_max_id')})
+                user = resp.get('user')
+                temp = user.get('pk_id') if user.get('pk_id') else user.get('pk')
+            yield from extract_post(log, _items, userName, sql_uid_list)
+
+
+def extract_post(log, posts, user_name: str, sql_uid_list: list):
+    """
+    从帖子列表中提取一个帖子
+    :param log: logger对象
+    :param posts: original instagram posts
+    :param user_name: user_name
+    :param sql_uid_list: sql_uid_list 列表
+    :return: dict of posts
+    """
+    # print("extract_post")
+    if not posts:  # 处理 None 或空列表
+        log.debug("No posts found.")
+        return {}
+    for post in posts:
+        # print('post:',post)
+        caption = post.get('caption')
+
+        created_at_stamp = caption.get('created_at') if caption else post.get('taken_at')
+        created_at = datetime.fromtimestamp(created_at_stamp).strftime("%Y-%m-%d %H:%M:%S")
+
+        uid = post.get('code')
+        if uid in sql_uid_list:
+            log.info(f"uid:{uid} has been processed, skipping................")
+            continue
+        item = {
+            'user_name': user_name,
+            'uid': uid,
+            'pid': post.get('pk'),
+            'pk_id': post.get('id'),
+            'comment_count': post.get('comment_count'),
+            'like_count': post.get('like_count'),
+            'title': caption.get('text') if caption else None,
+            'created_at': created_at
+        }
+        # 其他类型可再添加
+        types = post.get('media_type')
+
+        if types == 8:
+            try:
+                imgs_list = [post.get('image_versions2', {}).get('candidates', [{}])[0].get('url') for _ in
+                             post.get('carousel_media')]
+            except Exception as e:
+                log.warning(f"imgs_list processing post: {e}")
+                # imgs_list = [_.get('url') for _ in post.get('image_versions2', {}).get('candidates', [{}])[0]]
+                imgs_list = [post.get('image_versions2', {}).get('candidates', [{}])[0].get('url')]
+            imgs = ','.join(imgs_list) if imgs_list else None
+
+            item.update({
+                'imgs_url': imgs,
+                'video_url': post.get('carousel_media', [{}])[0].get('video_versions', [{}])[0].get('url')
+            })
+        elif types == 2:
+            item.update({
+                'imgs_url': None,
+                'video_url': post.get('video_versions', [{}])[0].get('url')
+            })
+
+        elif types == 1:
+            item.update({
+                'imgs_url': post.get('image_versions2', {}).get('candidates', [{}])[0].get('url'),
+                'video_url': None
+            })
+        yield item
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def ins_posts_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        for user_name in USER_NAME_LIST:
+            log.info(
+                f'-------------------------------- 开始爬取用户 {user_name} 的所有帖子 --------------------------------')
+            sql_uid_list = sql_pool.select_all('select uid from instagram_posts_record where user_name = %s',
+                                               (user_name,))
+            sql_uid_list = [_[0] for _ in sql_uid_list]
+            log.debug(f'查询到 uid 列表sql_uid_list的长度为: {len(sql_uid_list)}')
+            items_ = get_userPosts(log, user_name, sql_uid_list)
+            for item_ in items_:
+                # print(item_)
+                sql_pool.insert_one_or_dict('instagram_posts_record', item_)
+            sql_uid_list.clear()
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    ins_posts_main(log=logger)

+ 270 - 0
ins_img_video_spider/ins_posts_spider.py

@@ -0,0 +1,270 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.8.10
+# Date   : 2025/4/2 18:57
+import inspect
+import random
+import time
+
+import schedule
+from loguru import logger
+from datetime import datetime
+# from tls_client import Session
+from curl_cffi import Session
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+from mysql_pool import MySQLConnectionPool
+
+USER_NAME_LIST = ['fanatics', 'hobbysbestcards']  # 查询的用户名列表 hobbysbestcards
+
+cookie = r'ig_did=8D2CD910-0CBD-41CD-A5B4-9EB7E2F8BC91; ps_l=1; ps_n=1; datr=0aYZaGecXnDrIALr4HPo5O0h; mid=aBmm0QALAAFOBiNIagQ4prL9V4Zg; dpr=1.5; csrftoken=1Eeolr1d8t3VMjwNQIeMMQx9JTlyUsGu; sessionid=50762414324%3Af7LRzwBjb06Q7U%3A6%3AAYfUreTnqm7V_o3Pvqt0Tej1vwMQDGjOKw_Zm8TOqA; ds_user_id=50762414324; rur="RVA\05450762414324\0541778817145:01f75b26510d73b461bb75b0f907b2ec268507a83f95fb7c5a8571ced3b614c68af0d6b5"; wd=1707x247'
+
+PARAMS = r'("app_id":\s*"[^"]+")|("claim":\s*"[^"]+")|("csrf_token":\s*"[^"]+")|(["LSD",[],{"token":\s*"[^"]+")'
+
+# session = Session(client_identifier="chrome_120", random_tls_extension_order=True)
+session = Session()
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+MAX_PAGE = 3  # 最大页数
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(15), after=after_log)
+def ajax_request(log, url: str, params=None):
+    """
+    请求封装
+    :param log: logger对象
+    :param url: api url
+    :param params: api params
+    :return: json object
+    """
+    try:
+        headers = {
+            'sec-fetch-mode': 'cors',
+            'referer': 'https://www.instagram.com/',
+            'x-ig-app-id': '936619743392459',
+            'sec-fetch-site': 'same-site',
+            'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
+            'x-asbd-id': '198387',
+            'accept': '*/*',
+            'sec-ch-ua': 'Chromium";v="104", " Not A;Brand";v="99", "Google Chrome";v="104"',
+            'sec-ch-ua-mobile': '?0',
+            'x-ig-www-claim': 'hmac.AR11qy__GsvLpiS4wKBygLGdxs2DxJB1esTkBw7b2QFaHH2d',
+            'authority': 'i.instagram.com',
+            'sec-ch-ua-platform': 'Windows"',
+            'x-instagram-ajax': '1006400593',
+            'sec-fetch-dest': 'empty',
+            'user-agent': 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36',
+            'cookie': cookie
+        }
+        resp = session.get(url, headers=headers, params=params)
+        # print(resp.text)
+        resp.raise_for_status()
+
+        return resp.json()
+    except Exception as e:
+        log.error(f"Request failed: {e}")
+        raise
+
+
+def get_userPosts(log, userName: str, sql_uid_list: list):
+    """
+    从用户名获取所有帖子
+    :param log: logger对象
+    :param userName:  用户名
+    :param sql_uid_list: sql_uid_list 列表
+    :return: generator
+    """
+    page = 1
+    continuations = [{
+        'count': '12',
+    }]
+    temp = userName + '/username/'
+    while continuations:
+        continuation = continuations.pop()
+        log.info(f"The page number currently requested is: {page}.........")
+        # Url将在第二次请求时更改
+        url = 'https://i.instagram.com/api/v1/feed/user' + f'/{temp}'
+        resp = ajax_request(log, url, params=continuation)
+
+        if not resp:
+            log.error("API请求失败,跳过当前分页")
+
+        time.sleep(random.uniform(5, 8))
+        page += 1
+
+        if page > MAX_PAGE:
+            log.info(f"The page number currently requested is: {page}.........")
+            break
+
+        # 没有这样的用户
+        if not resp.get('user'):
+            log.warning(f"checking cookie or unknown/private User: {userName}")
+            yield 'checking cookie or unknown/private User: {}'.format(userName)
+        else:
+            _items = resp.get('items', [])
+            # 模拟鼠标按下
+
+            if resp.get('more_available'):
+                continuations.append({'count': '12', 'max_id': resp.get('next_max_id')})
+                user = resp.get('user')
+                temp = user.get('pk_id') if user.get('pk_id') else user.get('pk')
+            yield from extract_post(log, _items, userName, sql_uid_list)
+
+
+def extract_post(log, posts, user_name: str, sql_uid_list: list):
+    """
+    从帖子列表中提取一个帖子
+    :param log: logger对象
+    :param posts: original instagram posts
+    :param user_name: user_name
+    :param sql_uid_list: sql_uid_list 列表
+    :return: dict of posts
+    """
+    # print("extract_post")
+    if not posts:  # 处理 None 或空列表
+        log.debug("No posts found.")
+        return {}
+    for post in posts:
+        # print('post:',post)
+        caption = post.get('caption')
+
+        created_at_stamp = caption.get('created_at') if caption else post.get('taken_at')
+        created_at = datetime.fromtimestamp(created_at_stamp).strftime("%Y-%m-%d %H:%M:%S")
+
+        uid = post.get('code')
+        if uid in sql_uid_list:
+            log.info(f"uid:{uid} has been processed, skipping................")
+            continue
+        item = {
+            'user_name': user_name,
+            'uid': uid,
+            'pid': post.get('pk'),
+            'pk_id': post.get('id'),
+            'comment_count': post.get('comment_count'),
+            'like_count': post.get('like_count'),
+            'title': caption.get('text') if caption else None,
+            'created_at': created_at
+        }
+        # 其他类型可再添加
+        types = post.get('media_type')
+
+        if types == 8:
+            try:
+                imgs_list = [post.get('image_versions2', {}).get('candidates', [{}])[0].get('url') for _ in
+                             post.get('carousel_media')]
+            except Exception as e:
+                log.warning(f"imgs_list processing post: {e}")
+                # imgs_list = [_.get('url') for _ in post.get('image_versions2', {}).get('candidates', [{}])[0]]
+                imgs_list = [post.get('image_versions2', {}).get('candidates', [{}])[0].get('url')]
+            imgs = ','.join(imgs_list) if imgs_list else None
+
+            item.update({
+                'imgs_url': imgs,
+                'video_url': post.get('carousel_media', [{}])[0].get('video_versions', [{}])[0].get('url')
+            })
+        elif types == 2:
+            item.update({
+                'imgs_url': None,
+                'video_url': post.get('video_versions', [{}])[0].get('url')
+            })
+
+        elif types == 1:
+            item.update({
+                'imgs_url': post.get('image_versions2', {}).get('candidates', [{}])[0].get('url'),
+                'video_url': None
+            })
+        yield item
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def ins_posts_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        for user_name in USER_NAME_LIST:
+            log.info(
+                f'-------------------------------- 开始爬取用户 {user_name} 的所有帖子 --------------------------------')
+            sql_uid_list = sql_pool.select_all('select uid from instagram_posts_record where user_name = %s',
+                                               (user_name,))
+            sql_uid_list = [_[0] for _ in sql_uid_list]
+            log.debug(f'查询到 uid 列表sql_uid_list的长度为: {len(sql_uid_list)}')
+            items_ = get_userPosts(log, user_name, sql_uid_list)
+            for item_ in items_:
+                # print(item_)
+                sql_pool.insert_one_or_dict('instagram_posts_record', item_)
+            sql_uid_list.clear()
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+def schedule_task():
+    """
+    设置定时任务
+    """
+    # ins_posts_main(log=logger)
+
+    # schedule.every().day.at("05:00").do(ins_posts_main, log=logger)
+    schedule.every().monday.at("06:00").do(ins_posts_main, log=logger)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    schedule_task()

+ 290 - 0
ins_img_video_spider/mysql_pool.py

@@ -0,0 +1,290 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/25 14:14
+import re
+
+import pymysql
+import YamlLoader
+from loguru import logger
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        :param log: 自定义日志记录器
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db,
+            ping=0  # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
+        )
+
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        try:
+            with self.pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(query, args)
+                    if commit:
+                        conn.commit()
+                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                    return cursor
+        except Exception as e:
+            if commit:
+                conn.rollback()
+            self.log.error(f"Error executing query: {e}, Query: {query}, Args: {args}")
+            raise e
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        cursor = self._execute(query, args, commit=True)
+        return cursor.lastrowid  # 返回插入的ID
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
+            # 如果批量插入失败,则逐条插入
+            rowcount = 0
+            for args in args_list:
+                self.insert_one(query, args)
+                rowcount += 1
+            self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True):
+        """
+        单条插入(支持字典或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data: 字典数据 {列名: 值}
+        :param query: 直接SQL语句(与data二选一)
+        :param args: SQL参数(query使用时必需)
+        :param commit: 是否自动提交
+        :return: 最后插入ID
+        """
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
+            values = ', '.join(['%s'] * len(data))
+            query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args = tuple(data.values())
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        cursor = self._execute(query, args, commit)
+        self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return cursor.lastrowid
+
+    def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=500, commit=True):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+                        # self.log.debug(f"sql insert_many_or_dict, SQL: {query}, Rows: {cursor.rowcount}")
+            except Exception as e:
+                if commit:
+                    conn.rollback()
+                self.log.error(f"Batch insert failed: {e}")
+                # 降级为单条插入
+                for args in batch:
+                    try:
+                        self.insert_one_or_dict(table=None, query=query, args=args, commit=commit)
+                        total += 1
+                    except Exception as e2:
+                        self.log.error(f"Single insert failed: {e2}")
+                        continue
+        self.log.info(f"sql insert_many_or_dict, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_too_many(self, query, args_list, batch_size=1000):
+        """
+        执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        :param batch_size: 每次插入的条数
+        """
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        conn.commit()
+            except Exception as e:
+                self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
+                # 当前批次降级为单条插入
+                for args in batch:
+                    self.insert_one(query, args)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def check_pool_health(self):
+        """
+        检查连接池中有效连接数
+
+        # 使用示例
+        # 配置 MySQL 连接池
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool.check_pool_health():
+            log.error("数据库连接池异常")
+            raise RuntimeError("数据库连接池异常")
+        """
+        try:
+            with self.pool.connection() as conn:
+                conn.ping(reconnect=True)
+                return True
+        except Exception as e:
+            self.log.error(f"Connection pool health check failed: {e}")
+            return False
+
+    @staticmethod
+    def _safe_identifier(name):
+        """SQL标识符安全校验"""
+        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+            raise ValueError(f"Invalid SQL identifier: {name}")
+        return name

+ 8 - 0
ins_img_video_spider/requirements.txt

@@ -0,0 +1,8 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+curl_cffi==0.11.1
+DBUtils==3.1.0
+loguru==0.7.3
+PyMySQL==1.1.1
+PyYAML==6.0.2
+schedule==1.2.2
+tenacity==9.0.0