zj_new_daily_spider.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/6/9 15:56
  5. import time
  6. import json
  7. import inspect
  8. import requests
  9. import schedule
  10. import hashlib
  11. import base64
  12. import user_agent
  13. from loguru import logger
  14. from datetime import datetime
  15. from Crypto.Cipher import AES
  16. from Crypto.Util.Padding import unpad
  17. from mysql_pool import MySQLConnectionPool
  18. from tenacity import retry, stop_after_attempt, wait_fixed
  19. logger.remove()
  20. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  21. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  22. level="DEBUG", retention="7 day")
  23. RESPONSE_KEY = b"3bd48ea5e910b195843941351be7cbae" # 16字节AES密钥(UTF8编码)
  24. REQUEST_KEY = "1ba48ea2e910b666843941351be7cbad"
  25. def after_log(retry_state):
  26. """
  27. retry 回调
  28. :param retry_state: RetryCallState 对象
  29. """
  30. # 检查 args 是否存在且不为空
  31. if retry_state.args and len(retry_state.args) > 0:
  32. log = retry_state.args[0] # 获取传入的 logger
  33. else:
  34. log = logger # 使用全局 logger
  35. if retry_state.outcome.failed:
  36. log.warning(
  37. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  38. else:
  39. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  40. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  41. def get_proxys(log):
  42. """
  43. 获取代理
  44. :return: 代理
  45. """
  46. tunnel = "x371.kdltps.com:15818"
  47. kdl_username = "t13753103189895"
  48. kdl_password = "o0yefv6z"
  49. try:
  50. proxies = {
  51. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  52. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  53. }
  54. return proxies
  55. except Exception as e:
  56. log.error(f"Error getting proxy: {e}")
  57. raise e
  58. def make_sign():
  59. """生成sign: MD5(REQUEST_KEY + 当前时间戳秒)"""
  60. now = datetime.now()
  61. dt_str = now.strftime("%Y-%m-%d %H:%M:%S")
  62. # JS: new Date(o).getTime() / 1e3 — 取秒级时间戳
  63. timestamp = int(datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S").timestamp())
  64. raw = REQUEST_KEY + str(timestamp)
  65. return hashlib.md5(raw.encode()).hexdigest(), dt_str
  66. def decrypt_response(data_b64, iv_hex):
  67. """AES-CBC解密响应数据"""
  68. iv = iv_hex.encode("utf-8") # JS: CryptoJS.enc.Utf8.parse(iv) — 直接UTF8编码
  69. cipher = AES.new(RESPONSE_KEY, AES.MODE_CBC, iv)
  70. ciphertext = base64.b64decode(data_b64)
  71. plaintext = unpad(cipher.decrypt(ciphertext), AES.block_size)
  72. return json.loads(plaintext.decode("utf-8"))
  73. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  74. def get_request_one_page(log, rating_no) -> dict:
  75. """
  76. 获取单页数据
  77. :param log: logger
  78. :param rating_no: 证书编号
  79. :return: dict
  80. """
  81. sign, dt_str = make_sign()
  82. headers = {
  83. "accept": "*/*",
  84. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  85. "content-type": "application/json;charset=UTF-8",
  86. "datetime": dt_str,
  87. "origin": "https://www.zhongjianjiantong.com",
  88. "referer": "https://www.zhongjianjiantong.com/web/index.html",
  89. "sign": sign,
  90. "user-agent": user_agent.generate_user_agent()
  91. }
  92. url = "https://www.zhongjianjiantong.com/Api/OrderRatingGoods/detail"
  93. data = {
  94. "rating_no": rating_no
  95. }
  96. with requests.Session() as session:
  97. response = session.post(url, headers=headers, json=data, proxies=get_proxys(log), timeout=5)
  98. # print(response.text)
  99. response.raise_for_status()
  100. result = response.json()
  101. if result["code"] == 200 and result.get("iv"):
  102. decrypted = decrypt_response(result["data"], result["iv"])
  103. return decrypted
  104. else:
  105. return result
  106. def parse_data(resp_json, sql_pool):
  107. """
  108. 解析数据
  109. :param resp_json: 响应数据
  110. :param sql_pool: 数据库连接池
  111. """
  112. card_id = resp_json.get('obj_order_rating_goods', {}).get('id')
  113. order_no = resp_json.get('obj_order_rating_goods', {}).get('order_no')
  114. tag_no = resp_json.get('obj_order_rating_goods', {}).get('tag_no') # 标签号/查询的号码
  115. images = resp_json.get('obj_order_rating_goods', []).get('images')
  116. card_create_time = resp_json.get('obj_order_rating_goods', {}).get('create_time')
  117. card_update_time = resp_json.get('obj_order_rating_goods', {}).get('update_time')
  118. score = resp_json.get('obj_order_rating_goods', {}).get('score') # 中检评分
  119. corners = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get('corners') # 四角
  120. eoges = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get('eoges') # 边缘
  121. surface = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get('surface') # 表面
  122. centering = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get('centering') # 居中
  123. colour = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get('colour') # 颜色
  124. repair = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get('repair') # 修复
  125. rating_no = resp_json.get('obj_order_rating_goods', {}).get('rating_no') # 证书编号
  126. obj_brand_title = resp_json.get('obj_order_rating_goods', {}).get('obj_brand', {}).get(
  127. 'title') # 商品品牌
  128. obj_detail_spxl = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get(
  129. 'spxl') # 商品系列
  130. obj_detail_spmc = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get(
  131. 'spmc') # 商品名称
  132. obj_detail_fxnf = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get(
  133. 'fxnf') # 发行年份
  134. obj_detail_yy = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get('yy') # 语言
  135. obj_detail_spbh = resp_json.get('obj_order_rating_goods', {}).get('obj_detail', {}).get(
  136. 'spbh') # 商品编号
  137. info = (
  138. card_id, order_no, tag_no, images, card_create_time, card_update_time, score, corners, eoges, surface,
  139. centering,
  140. colour, repair, rating_no, obj_brand_title, obj_detail_spxl, obj_detail_spmc, obj_detail_fxnf, obj_detail_yy,
  141. obj_detail_spbh)
  142. # print(info)
  143. sql = """
  144. INSERT INTO zhongjian_record (card_id, order_no, tag_no, images, card_create_time, card_update_time, score, corners, eoges, surface, centering, colour, repair, rating_no, obj_brand_title, obj_detail_spxl, obj_detail_spmc, obj_detail_fxnf, obj_detail_yy, obj_detail_spbh)
  145. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  146. """
  147. sql_pool.insert_one(sql, info)
  148. def loop_rating_no(log, sql_pool, sql_ra_no_list):
  149. """
  150. 循环处理每个证书编号
  151. :param log: logger
  152. :param sql_pool: 数据库连接池
  153. :param sql_ra_no_list: 证书编号列表
  154. """
  155. # sql_ra_no_list = sql_pool.select_all('select tag_no from zhongjian_task where state = 0 limit 10000')
  156. # sql_ra_no_list = [i[0] for i in sql_ra_no_list]
  157. for rating_no_ in sql_ra_no_list:
  158. log.info(f"{rating_no_} is start ......................................")
  159. try:
  160. resp_json = get_request_one_page(log, rating_no_)
  161. if resp_json and resp_json.get('code') == 200:
  162. # print(resp_json)
  163. parse_data(resp_json, sql_pool)
  164. sql_pool.update_one('update zhongjian_task set state = 1 where tag_no = %s', (rating_no_,))
  165. elif resp_json and resp_json.get('code') == 400:
  166. log.warning(f"{rating_no_} is not exist ......................................")
  167. sql_pool.update_one('update zhongjian_task set state = 2 where tag_no = %s', (rating_no_,))
  168. else:
  169. log.warning(f"other warning, please check ......................................")
  170. sql_pool.update_one('update zhongjian_task set state = 3 where tag_no = %s', (rating_no_,))
  171. except Exception as e:
  172. log.warning(f"{inspect.currentframe().f_code.co_name} error: {e}")
  173. sql_pool.update_one('update zhongjian_task set state = 3 where tag_no = %s', (rating_no_,))
  174. continue
  175. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  176. def zhongjian_main(log):
  177. """
  178. 主函数
  179. :param log:
  180. """
  181. log.info(
  182. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  183. # 配置 MySQL 连接池
  184. sql_pool = MySQLConnectionPool(log=log)
  185. if not sql_pool:
  186. log.error("MySQL数据库连接失败")
  187. raise Exception("MySQL数据库连接失败")
  188. try:
  189. # while True:
  190. # sql_ra_no_list = sql_pool.select_all('select tag_no from zhongjian_task where state = 0 limit 10000')
  191. sql_ra_no_list = sql_pool.select_all(
  192. "select tag_no from zhongjian_task where tag_no like '529%' and state = 0 limit 50000")
  193. # sql_ra_no_list = sql_pool.select_all("select tag_no from zhongjian_task where tag_no > '519354131' and state != 1 limit 10000")
  194. sql_ra_no_list = [i[0] for i in sql_ra_no_list]
  195. if not sql_ra_no_list:
  196. log.info(f'没有需要处理的数据,等待下一轮处理........................................................')
  197. # break
  198. return
  199. try:
  200. loop_rating_no(log, sql_pool, sql_ra_no_list)
  201. except Exception as e:
  202. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  203. except Exception as e:
  204. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  205. finally:
  206. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  207. def schedule_task():
  208. """
  209. 爬虫模块 定时任务 的启动文件
  210. """
  211. # 立即运行一次任务
  212. zhongjian_main(log=logger)
  213. # 设置定时任务
  214. # schedule.every(30).days.at("00:01").do(zhongjian_main, log=logger)
  215. schedule.every().days.at("05:00").do(zhongjian_main, log=logger)
  216. while True:
  217. schedule.run_pending()
  218. time.sleep(1)
  219. if __name__ == '__main__':
  220. schedule_task()