Kaynağa Gözat

update bgs spider 2.2.2

charley 1 gün önce
ebeveyn
işleme
807b641948
1 değiştirilmiş dosya ile 72 ekleme ve 43 silme
  1. 72 43
      bgs_spider/bgs_new_daily_spider.py

+ 72 - 43
bgs_spider/bgs_new_daily_spider.py

@@ -50,6 +50,26 @@ def get_proxys(log):
         log.error(f"Error getting proxy: {e}")
         raise e
 
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys1(log):
+    # 已购买账户  北美
+    # http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+    # https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+    http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931"
+    https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931"
+
+    # url = "https://ifconfig.me"
+    try:
+        proxySettings = {
+            "http": http_proxy,
+            "https": https_proxy,
+        }
+        return proxySettings
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
 
 def save_data(mysql_pool, info):
     """
@@ -78,8 +98,8 @@ def get_data(log, bgs_id, mysql_pool):
         "category": "BGS",
         "serialNumber": str(bgs_id)
     }
-    # response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=5)
-    response = requests.get(url, headers=headers, params=params, timeout=5)
+    response = requests.get(url, headers=headers, params=params, proxies=get_proxys1(log), timeout=5)
+    # response = requests.get(url, headers=headers, params=params, timeout=5)
     if response.status_code == 404:
         # 没有数据 No Record Found 将状态改为3
         log.warning(f"No Record Found for {bgs_id}")
@@ -136,23 +156,23 @@ def get_data(log, bgs_id, mysql_pool):
             mysql_pool.update_one("UPDATE bgs_task SET state=1 WHERE auth_code=%s", (bgs_id,))
 
 
-def process_urls(log, ids, mysql_pool, batch_size=1000, max_workers=5):
-    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
-        for i in range(0, len(ids), batch_size):
-            # print(i)
-            batch = ids[i:i + batch_size]
-            # print(batch)
-            try:
-                futures_to_urls = {executor.submit(get_data, log, url, mysql_pool): url for url in batch}
-                for future in concurrent.futures.as_completed(futures_to_urls):
-                    url = futures_to_urls[future]
-                    try:
-                        future.result()
-                        log.debug(f"处理 {url} 成功")
-                    except Exception as exc:
-                        log.debug(f"处理 {url} 出错: {exc}")
-            except Exception as e:
-                log.error(f"提交任务失败: {e}")
+# def process_urls(log, ids, mysql_pool, batch_size=1000, max_workers=5):
+#     with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+#         for i in range(0, len(ids), batch_size):
+#             # print(i)
+#             batch = ids[i:i + batch_size]
+#             # print(batch)
+#             try:
+#                 futures_to_urls = {executor.submit(get_data, log, url, mysql_pool): url for url in batch}
+#                 for future in concurrent.futures.as_completed(futures_to_urls):
+#                     url = futures_to_urls[future]
+#                     try:
+#                         future.result()
+#                         log.debug(f"处理 {url} 成功")
+#                     except Exception as exc:
+#                         log.debug(f"处理 {url} 出错: {exc}")
+#             except Exception as e:
+#                 log.error(f"提交任务失败: {e}")
 
 
 @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
@@ -165,33 +185,41 @@ def bgs_main(log):
             log.error("数据库连接失败")
             raise Exception("数据库连接失败")
 
-        max_bgs_id = sql_pool.select_one("SELECT MAX(number) AS max_number FROM beckett_bgs_record")
-        # print(max_bgs_id_list)
-        max_bgs_id = max_bgs_id[0]
-        log.info(f"max_bgs_id 从 {max_bgs_id} 开始爬取.........................")
-        # bgs_id_list = [i for i in range(max_bgs_id, max_bgs_id + 3001)]
+        # max_bgs_id = sql_pool.select_one("SELECT MAX(number) AS max_number FROM beckett_bgs_record")
+        # # print(max_bgs_id_list)
+        # max_bgs_id = max_bgs_id[0]
+        # log.info(f"max_bgs_id 从 {max_bgs_id} 开始爬取.........................")
+        # # bgs_id_list = [i for i in range(max_bgs_id, max_bgs_id + 3001)]
 
         """
         2026/01/20  修改  +3000,-3000,
         2026/02/02  修改  +20000,-100000,
+        2026/02/02 18:00  修改  大于1500w的  单线程 循环跑  
         """
-        sql_bgs_id_list = sql_pool.select_all(
-            "SELECT auth_code FROM bgs_task WHERE state != 1 AND auth_code BETWEEN %s AND %s LIMIT 120000",
-            (max_bgs_id - 100000, max_bgs_id + 20000))
-
         # sql_bgs_id_list = sql_pool.select_all(
-        #     "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code > 18239485 LIMIT 5000")
-            # "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code > 17990000 LIMIT 5000"
-        sql_bgs_id_list = [bid[0] for bid in sql_bgs_id_list]
-        # for bid in sql_bgs_id_list:
-        try:
-            process_urls(log, sql_bgs_id_list, sql_pool, batch_size=1000,
-                         max_workers=3)  # 根据需要调整batch_size和max_workers
-            # get_data(bid, mysql_pool)
-        except Exception as e:
-            log.error('process urls: ', e)
-
-        # time.sleep(5)
+        #     "SELECT auth_code FROM bgs_task WHERE state != 1 AND auth_code BETWEEN %s AND %s LIMIT 120000",
+        #     (max_bgs_id - 100000, max_bgs_id + 20000))
+
+        while True:
+            sql_bgs_id_list = sql_pool.select_all(
+                "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code > 15000000 LIMIT 5000")
+                # "SELECT auth_code FROM bgs_task WHERE state!=1 AND auth_code > 17990000 LIMIT 5000"
+            sql_bgs_id_list = [bid[0] for bid in sql_bgs_id_list]
+
+            if not sql_bgs_id_list:
+                log.info("没有需要处理的数据,等待1小时后再试.........................")
+                time.sleep(3600)
+                continue
+
+            for bid in sql_bgs_id_list:
+                try:
+                    # process_urls(log, sql_bgs_id_list, sql_pool, batch_size=1000,
+                    #              max_workers=3)  # 根据需要调整batch_size和max_workers
+                    get_data(log, bid, sql_pool)
+                except Exception as e:
+                    log.error('process urls: ', e)
+
+            # time.sleep(5)
 
     except Exception as e:
         log.error(e)
@@ -206,13 +234,14 @@ def schedule_task():
     # 立即运行一次任务
     bgs_main(logger)
 
-    # 设置定时任务
-    schedule.every().day.at("03:01").do(bgs_main, logger)
+    # 设置定时任务 - 改为每周一执行
+    schedule.every().monday.at("03:01").do(bgs_main, logger)
     while True:
         schedule.run_pending()
         time.sleep(1)
 
 
 if __name__ == '__main__':
-    schedule_task()
+    # schedule_task()
     # get_data('1000743')
+    bgs_main(logger)