gbca_new_daily_spider.py 10 KB


  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/6/9 18:39
  5. import time
  6. import pytz
  7. import inspect
  8. import requests
  9. import schedule
  10. import user_agent
  11. from loguru import logger
  12. from datetime import datetime
  13. from mysql_pool import MySQLConnectionPool
  14. from tenacity import retry, stop_after_attempt, wait_fixed
  15. logger.remove()
  16. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  17. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  18. level="DEBUG", retention="15 day")
  19. def after_log(retry_state):
  20. """
  21. retry 回调
  22. :param retry_state: RetryCallState 对象
  23. """
  24. # 检查 args 是否存在且不为空
  25. if retry_state.args and len(retry_state.args) > 0:
  26. log = retry_state.args[0] # 获取传入的 logger
  27. else:
  28. log = logger # 使用全局 logger
  29. if retry_state.outcome.failed:
  30. log.warning(
  31. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  32. else:
  33. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  34. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  35. def get_proxys(log):
  36. """
  37. 获取代理
  38. :return: 代理
  39. """
  40. tunnel = "x371.kdltps.com:15818"
  41. kdl_username = "t13753103189895"
  42. kdl_password = "o0yefv6z"
  43. try:
  44. proxies = {
  45. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  46. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  47. }
  48. return proxies
  49. except Exception as e:
  50. log.error(f"Error getting proxy: {e}")
  51. raise e
  52. def save_data(sql_pool, info):
  53. """
  54. 保存数据
  55. :param sql_pool: sql连接池对象
  56. :param info: 保存的数据 -> tuple
  57. """
  58. sql = """
  59. INSERT INTO gbca_record (rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, year, publisher, brand, sub_brand, card_no, middle_score, border_score, card_angle_score, surface_score, sign_score, issue_limit, category2, company_id, create_time, update_time, order_code, card_id)
  60. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
  61. sql_pool.insert_one(sql, info)
  62. def transfer_ts(timestamp_ms) -> str:
  63. """
  64. 转换时间戳 -> 1615975247000
  65. :param timestamp_ms:
  66. :return: ret_ts -> str
  67. """
  68. # 将毫秒转换为秒
  69. timestamp_s = timestamp_ms / 1000.0
  70. # 创建 UTC 时间
  71. utc_dt = datetime.fromtimestamp(timestamp_s, pytz.utc)
  72. # 需要转换到特定时区(例如 'Asia/Shanghai')
  73. shanghai_tz = pytz.timezone('Asia/Shanghai')
  74. shanghai_dt = utc_dt.astimezone(shanghai_tz)
  75. ret_ts = shanghai_dt.strftime('%Y-%m-%d %H:%M:%S')
  76. return ret_ts
  77. def parse_resp(log, resp, rating_code, sql_pool):
  78. """
  79. 解析响应
  80. :param log: logger对象
  81. :param resp: 响应
  82. :param rating_code: 评级编号
  83. :param sql_pool: sql连接池对象
  84. """
  85. if resp.get("errorCode") == 0:
  86. data = resp.get("data")
  87. img_list = data.get("imgList")
  88. if len(img_list) == 2:
  89. front_img = img_list[0].get("self")
  90. back_img = img_list[1].get("self")
  91. elif len(img_list) == 1:
  92. front_img = img_list[0].get("self")
  93. back_img = None
  94. else:
  95. log.warning(f"{inspect.currentframe().f_code.co_name} -> No img_list:{img_list}")
  96. front_img = None
  97. back_img = None
  98. company_short_name = data.get("companyShortName") # 评级公司简称
  99. goods_name = data.get("goodsName") # 名称
  100. goods_score_name = data.get("goodsScoreName") # 分数
  101. category2 = data.get("category2")
  102. company_id = data.get("companyId")
  103. create_time = data.get("createTime")
  104. create_time = transfer_ts(create_time) if create_time else None
  105. update_time = data.get("updateTime")
  106. update_time = transfer_ts(update_time) if update_time else None
  107. order_code = data.get("orderCode")
  108. card_id = data.get("id")
  109. attr_year = data.get("attr", [])
  110. attr_mapping = {
  111. '年份': 'year',
  112. '发行商': 'publisher',
  113. '卡片系列名称': 'brand',
  114. '子系列名称': 'sub_brand',
  115. '卡片编码': 'card_no',
  116. '居中分数': 'middle_score',
  117. '边框分数': 'border_score',
  118. '卡角分数': 'card_angle_score',
  119. '表面分数': 'surface_score',
  120. '签字分数': 'sign_score',
  121. '限发数': 'issue_limit'
  122. }
  123. res = {}
  124. for a in attr_year:
  125. for key, var_name in attr_mapping.items():
  126. if key in a:
  127. try:
  128. res[var_name] = a.replace(f'{key}:', '').strip()
  129. except Exception as e:
  130. log.error(f"Error parsing {key} from {a}: {e}")
  131. break
  132. else:
  133. # 循环遍历完所有键值对后都没有找到与 a 匹配的 key(即没有通过 break 提前退出),则会执行 else 子句内的代码
  134. log.warning(f"{inspect.currentframe().f_code.co_name} -> a:{a}")
  135. # 统计
  136. # countNum = data.get("countNum")
  137. # countRemark = data.get("countRemark")
  138. # if countNum:
  139. # statistics = f"{goods_score_name} 数量:{countNum}"
  140. # if countRemark:
  141. # statistics = f"{goods_score_name} 数量:{countNum}({countRemark})"
  142. # else:
  143. # statistics = f"{goods_score_name}:{countRemark}"
  144. info = (
  145. rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, res.get("year"),
  146. res.get("publisher"), res.get("brand"), res.get("sub_brand"), res.get("card_no"), res.get("middle_score"),
  147. res.get("border_score"), res.get("card_angle_score"), res.get("surface_score"), res.get("sign_score"),
  148. res.get("issue_limit"), category2, company_id, create_time, update_time, order_code, card_id)
  149. # print(info)
  150. save_data(sql_pool, info)
  151. # 更新 task 表状态
  152. sql_pool.update_one('update gbca_task set state = 1 where keyword = %s', (rating_code,))
  153. else:
  154. log.debug(
  155. f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code} -> errorCode:{resp.get('errorCode')}, msg:{resp.get('msg')}")
  156. if resp.get('errorCode') == -1 or resp.get('errorCode') == '-1':
  157. log.warning(f"{inspect.currentframe().f_code.co_name} -> 无数据........")
  158. sql_pool.update_one('update gbca_task set state = 2 where keyword = %s', (rating_code,))
  159. else:
  160. log.warning(f"{inspect.currentframe().f_code.co_name} -> 获取数据失败........")
  161. sql_pool.update_one('update gbca_task set state = 3 where keyword = %s', (rating_code,))
  162. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  163. def get_resp(log, rating_code, sql_pool):
  164. """
  165. 获取卡片信息
  166. :param log: logger对象
  167. :param rating_code: 评级编号
  168. :param sql_pool: sql连接池对象
  169. :return:
  170. """
  171. log.info(f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code}")
  172. headers = {
  173. "Accept": "application/json, text/plain, */*",
  174. "Content-Type": "application/json;charset=UTF-8",
  175. "Referer": "https://www.gongbocoins.com/",
  176. "User-Agent": user_agent.generate_user_agent(),
  177. "lang": "en"
  178. }
  179. url = "https://wapi.gongbocoins.com/gbca/orderCoin/getWebsiteRatingInfo"
  180. data = {
  181. "ratingCode": rating_code
  182. }
  183. response = requests.post(url, headers=headers, json=data, proxies=get_proxys(log), timeout=10)
  184. # print(response.json())
  185. # print(response)
  186. response.raise_for_status()
  187. resp_json = response.json()
  188. if resp_json:
  189. parse_resp(log, resp_json, rating_code, sql_pool)
  190. else:
  191. log.warning(f"{inspect.currentframe().f_code.co_name} -> response:{response.status_code}")
  192. sql_pool.update_one('update gbca_task set state = 3 where keyword = %s', (rating_code,))
  193. def process_code_list(log, sql_pool, code_list):
  194. for rating_code in code_list:
  195. try:
  196. get_resp(log, rating_code, sql_pool)
  197. except Exception as ce:
  198. log.error(f"{inspect.currentframe().f_code.co_name} -> error: {ce}")
  199. sql_pool.update_one('update gbca_task set state = 3 where keyword = %s', (rating_code,))
  200. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  201. def gbca_main(log):
  202. """
  203. 主函数
  204. :param log: logger对象
  205. """
  206. log.info(
  207. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  208. # 配置 MySQL 连接池
  209. sql_pool = MySQLConnectionPool(log=log)
  210. if not sql_pool:
  211. log.error("MySQL数据库连接失败")
  212. raise Exception("MySQL数据库连接失败")
  213. try:
  214. while True:
  215. sql_code_list = sql_pool.select_all("select keyword from gbca_task where state = 0 limit 10000")
  216. sql_code_list = [i[0] for i in sql_code_list]
  217. if not sql_code_list:
  218. log.debug(f"{inspect.currentframe().f_code.co_name} -> No len sql_code_list")
  219. break
  220. try:
  221. process_code_list(log, sql_pool, sql_code_list)
  222. except Exception as e:
  223. log.error(f"{inspect.currentframe().f_code.co_name} -> error: {e}")
  224. except Exception as e:
  225. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  226. finally:
  227. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  228. def schedule_task():
  229. """
  230. 爬虫模块的启动文件
  231. """
  232. # 立即运行一次任务
  233. gbca_main(log=logger)
  234. # 设置定时任务
  235. schedule.every(30).days.at("00:01").do(gbca_main, log=logger)
  236. while True:
  237. schedule.run_pending()
  238. time.sleep(1)
  239. if __name__ == '__main__':
  240. schedule_task()