gbca_new_daily_spider.py 10 KB


  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/6/9 18:39
  5. import time
  6. import pytz
  7. import inspect
  8. import requests
  9. import schedule
  10. import user_agent
  11. from loguru import logger
  12. from datetime import datetime
  13. from mysql_pool import MySQLConnectionPool
  14. from tenacity import retry, stop_after_attempt, wait_fixed
  15. """
  16. 20260116 重启
  17. """
  18. logger.remove()
  19. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  20. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  21. level="DEBUG", retention="15 day")
  22. def after_log(retry_state):
  23. """
  24. retry 回调
  25. :param retry_state: RetryCallState 对象
  26. """
  27. # 检查 args 是否存在且不为空
  28. if retry_state.args and len(retry_state.args) > 0:
  29. log = retry_state.args[0] # 获取传入的 logger
  30. else:
  31. log = logger # 使用全局 logger
  32. if retry_state.outcome.failed:
  33. log.warning(
  34. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  35. else:
  36. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  37. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  38. def get_proxys(log):
  39. """
  40. 获取代理
  41. :return: 代理
  42. """
  43. tunnel = "x371.kdltps.com:15818"
  44. kdl_username = "t13753103189895"
  45. kdl_password = "o0yefv6z"
  46. try:
  47. proxies = {
  48. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  49. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  50. }
  51. return proxies
  52. except Exception as e:
  53. log.error(f"Error getting proxy: {e}")
  54. raise e
  55. def save_data(sql_pool, info):
  56. """
  57. 保存数据
  58. :param sql_pool: sql连接池对象
  59. :param info: 保存的数据 -> tuple
  60. """
  61. sql = """
  62. INSERT INTO gbca_record (rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, year, publisher, brand, sub_brand, card_no, middle_score, border_score, card_angle_score, surface_score, sign_score, issue_limit, category2, company_id, create_time, update_time, order_code, card_id)
  63. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
  64. sql_pool.insert_one(sql, info)
  65. def transfer_ts(timestamp_ms) -> str:
  66. """
  67. 转换时间戳 -> 1615975247000
  68. :param timestamp_ms:
  69. :return: ret_ts -> str
  70. """
  71. # 将毫秒转换为秒
  72. timestamp_s = timestamp_ms / 1000.0
  73. # 创建 UTC 时间
  74. utc_dt = datetime.fromtimestamp(timestamp_s, pytz.utc)
  75. # 需要转换到特定时区(例如 'Asia/Shanghai')
  76. shanghai_tz = pytz.timezone('Asia/Shanghai')
  77. shanghai_dt = utc_dt.astimezone(shanghai_tz)
  78. ret_ts = shanghai_dt.strftime('%Y-%m-%d %H:%M:%S')
  79. return ret_ts
  80. def parse_resp(log, resp, rating_code, sql_pool):
  81. """
  82. 解析响应
  83. :param log: logger对象
  84. :param resp: 响应
  85. :param rating_code: 评级编号
  86. :param sql_pool: sql连接池对象
  87. """
  88. if resp.get("errorCode") == 0:
  89. data = resp.get("data")
  90. img_list = data.get("imgList")
  91. if len(img_list) == 2:
  92. front_img = img_list[0].get("self")
  93. back_img = img_list[1].get("self")
  94. elif len(img_list) == 1:
  95. front_img = img_list[0].get("self")
  96. back_img = None
  97. else:
  98. log.warning(f"{inspect.currentframe().f_code.co_name} -> No img_list:{img_list}")
  99. front_img = None
  100. back_img = None
  101. company_short_name = data.get("companyShortName") # 评级公司简称
  102. goods_name = data.get("goodsName") # 名称
  103. goods_score_name = data.get("goodsScoreName") # 分数
  104. category2 = data.get("category2")
  105. company_id = data.get("companyId")
  106. create_time = data.get("createTime")
  107. create_time = transfer_ts(create_time) if create_time else None
  108. update_time = data.get("updateTime")
  109. update_time = transfer_ts(update_time) if update_time else None
  110. order_code = data.get("orderCode")
  111. card_id = data.get("id")
  112. attr_year = data.get("attr", [])
  113. attr_mapping = {
  114. '年份': 'year',
  115. '发行商': 'publisher',
  116. '卡片系列名称': 'brand',
  117. '子系列名称': 'sub_brand',
  118. '卡片编码': 'card_no',
  119. '居中分数': 'middle_score',
  120. '边框分数': 'border_score',
  121. '卡角分数': 'card_angle_score',
  122. '表面分数': 'surface_score',
  123. '签字分数': 'sign_score',
  124. '限发数': 'issue_limit'
  125. }
  126. res = {}
  127. for a in attr_year:
  128. for key, var_name in attr_mapping.items():
  129. if key in a:
  130. try:
  131. res[var_name] = a.replace(f'{key}:', '').strip()
  132. except Exception as e:
  133. log.error(f"Error parsing {key} from {a}: {e}")
  134. break
  135. else:
  136. # 循环遍历完所有键值对后都没有找到与 a 匹配的 key(即没有通过 break 提前退出),则会执行 else 子句内的代码
  137. log.warning(f"{inspect.currentframe().f_code.co_name} -> a:{a}")
  138. # 统计
  139. # countNum = data.get("countNum")
  140. # countRemark = data.get("countRemark")
  141. # if countNum:
  142. # statistics = f"{goods_score_name} 数量:{countNum}"
  143. # if countRemark:
  144. # statistics = f"{goods_score_name} 数量:{countNum}({countRemark})"
  145. # else:
  146. # statistics = f"{goods_score_name}:{countRemark}"
  147. info = (
  148. rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, res.get("year"),
  149. res.get("publisher"), res.get("brand"), res.get("sub_brand"), res.get("card_no"), res.get("middle_score"),
  150. res.get("border_score"), res.get("card_angle_score"), res.get("surface_score"), res.get("sign_score"),
  151. res.get("issue_limit"), category2, company_id, create_time, update_time, order_code, card_id)
  152. # print(info)
  153. save_data(sql_pool, info)
  154. # 更新 task 表状态
  155. sql_pool.update_one('update gbca_task set state = 1 where keyword = %s', (rating_code,))
  156. else:
  157. log.debug(
  158. f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code} -> errorCode:{resp.get('errorCode')}, msg:{resp.get('msg')}")
  159. if resp.get('errorCode') == -1 or resp.get('errorCode') == '-1':
  160. log.warning(f"{inspect.currentframe().f_code.co_name} -> 无数据........")
  161. sql_pool.update_one('update gbca_task set state = 2 where keyword = %s', (rating_code,))
  162. else:
  163. log.warning(f"{inspect.currentframe().f_code.co_name} -> 获取数据失败........")
  164. sql_pool.update_one('update gbca_task set state = 3 where keyword = %s', (rating_code,))
  165. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  166. def get_resp(log, rating_code, sql_pool):
  167. """
  168. 获取卡片信息
  169. :param log: logger对象
  170. :param rating_code: 评级编号
  171. :param sql_pool: sql连接池对象
  172. :return:
  173. """
  174. log.info(f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code}")
  175. headers = {
  176. "Accept": "application/json, text/plain, */*",
  177. "Content-Type": "application/json;charset=UTF-8",
  178. "Referer": "https://www.gongbocoins.com/",
  179. "User-Agent": user_agent.generate_user_agent(),
  180. "lang": "en"
  181. }
  182. url = "https://wapi.gongbocoins.com/gbca/orderCoin/getWebsiteRatingInfo"
  183. data = {
  184. "ratingCode": rating_code
  185. }
  186. response = requests.post(url, headers=headers, json=data, proxies=get_proxys(log), timeout=10)
  187. # print(response.json())
  188. # print(response)
  189. response.raise_for_status()
  190. resp_json = response.json()
  191. if resp_json:
  192. parse_resp(log, resp_json, rating_code, sql_pool)
  193. else:
  194. log.warning(f"{inspect.currentframe().f_code.co_name} -> response:{response.status_code}")
  195. sql_pool.update_one('update gbca_task set state = 3 where keyword = %s', (rating_code,))
  196. def process_code_list(log, sql_pool, code_list):
  197. for rating_code in code_list:
  198. try:
  199. get_resp(log, rating_code, sql_pool)
  200. except Exception as ce:
  201. log.error(f"{inspect.currentframe().f_code.co_name} -> error: {ce}")
  202. sql_pool.update_one('update gbca_task set state = 3 where keyword = %s', (rating_code,))
  203. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  204. def gbca_main(log):
  205. """
  206. 主函数
  207. :param log: logger对象
  208. """
  209. log.info(
  210. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  211. # 配置 MySQL 连接池
  212. sql_pool = MySQLConnectionPool(log=log)
  213. if not sql_pool:
  214. log.error("MySQL数据库连接失败")
  215. raise Exception("MySQL数据库连接失败")
  216. try:
  217. while True:
  218. sql_code_list = sql_pool.select_all("select keyword from gbca_task where state != 1 limit 10000")
  219. sql_code_list = [i[0] for i in sql_code_list]
  220. if not sql_code_list:
  221. log.debug(f"{inspect.currentframe().f_code.co_name} -> No len sql_code_list")
  222. break
  223. try:
  224. process_code_list(log, sql_pool, sql_code_list)
  225. except Exception as e:
  226. log.error(f"{inspect.currentframe().f_code.co_name} -> error: {e}")
  227. except Exception as e:
  228. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  229. finally:
  230. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  231. def schedule_task():
  232. """
  233. 爬虫模块的启动文件
  234. """
  235. # 立即运行一次任务
  236. gbca_main(log=logger)
  237. # 设置定时任务
  238. schedule.every(30).days.at("00:01").do(gbca_main, log=logger)
  239. while True:
  240. schedule.run_pending()
  241. time.sleep(1)
  242. if __name__ == '__main__':
  243. schedule_task()