zc_history_spider.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2026/2/27 11:22
  5. import time
  6. import inspect
  7. import requests
  8. import user_agent
  9. from loguru import logger
  10. from crypto_utils import CryptoHelper
  11. from mysql_pool import MySQLConnectionPool
  12. from tenacity import retry, stop_after_attempt, wait_fixed
  13. logger.remove()
  14. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  15. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  16. level="DEBUG", retention="7 day")
  17. # 基础配置
  18. BASE_URL = "https://cashier.yqszpay.com"
  19. PAGE_SIZE = 10
  20. headers = {
  21. "User-Agent": user_agent.generate_user_agent(),
  22. "Connection": "Keep-Alive",
  23. "Accept-Encoding": "gzip",
  24. "Content-Type": "application/json",
  25. "channelNo": "88888888",
  26. "pageSize": str(PAGE_SIZE),
  27. # "pageNum": 1,
  28. "version": "1.9.9.82537"
  29. }
  30. def after_log(retry_state):
  31. """
  32. retry 回调
  33. :param retry_state: RetryCallState 对象
  34. """
  35. # 检查 args 是否存在且不为空
  36. if retry_state.args and len(retry_state.args) > 0:
  37. log = retry_state.args[0] # 获取传入的 logger
  38. else:
  39. log = logger # 使用全局 logger
  40. if retry_state.outcome.failed:
  41. log.warning(
  42. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  43. else:
  44. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  45. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  46. def get_proxys(log):
  47. """
  48. 获取代理配置
  49. :param log: 日志对象
  50. :return: 代理字典
  51. """
  52. tunnel = "x371.kdltps.com:15818"
  53. kdl_username = "t13753103189895"
  54. kdl_password = "o0yefv6z"
  55. try:
  56. proxies = {
  57. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  58. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  59. }
  60. return proxies
  61. except Exception as e:
  62. log.error(f"Error getting proxy: {e}")
  63. raise e
  64. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  65. def make_encrypted_post_request(log, url: str, request_data: dict, extra_headers: dict = None):
  66. """
  67. 通用加密POST请求函数(带重试机制)
  68. :param log: 日志对象
  69. :param url: 请求URL
  70. :param request_data: 请求数据字典(会被加密)
  71. :param extra_headers: 额外的请求头
  72. :return: 解密后的响应数据,失败返回None
  73. """
  74. request_headers = headers.copy()
  75. if extra_headers:
  76. request_headers.update(extra_headers)
  77. log.debug(f"Request URL: {url}, Data: {request_data}")
  78. encrypted_body = CryptoHelper.encrypt_request_data(request_data)
  79. # print(request_headers)
  80. response = requests.post(url, headers=request_headers, json=encrypted_body, timeout=30)
  81. # response.raise_for_status()
  82. if response.status_code == 200:
  83. response_json = response.json()
  84. # log.debug(f"Raw response: {response_json}")
  85. if 'data' in response_json:
  86. decrypted = CryptoHelper.decrypt_response_data(response_json)
  87. # log.debug(f"Decrypted response: {decrypted}")
  88. return decrypted
  89. return response_json
  90. else:
  91. log.error(f"请求失败: {response.status_code}, Response: {response.text}")
  92. return None
  93. def get_shop_single_page(log, page_num, page_size=PAGE_SIZE):
  94. """
  95. 获取商户列表(支持翻页)
  96. :param log: 日志对象
  97. :param page_num: 页码
  98. :param page_size: 每页条数
  99. """
  100. log.debug(f"Getting shop list, page: {page_num}")
  101. url = f"{BASE_URL}/zc-api/merchant/getMerMyList"
  102. request_data = {'pageNum': page_num, 'pageSize': page_size}
  103. try:
  104. resp = make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
  105. except Exception as e:
  106. log.error(f"Error getting shop list: {e}")
  107. resp = None
  108. return resp
  109. def get_sold_single_page(log, mer_no, page_num):
  110. """
  111. 获取商品列表(支持翻页)
  112. :param log: 日志对象
  113. :param mer_no: 商户编号
  114. :param page_num: 页码
  115. """
  116. log.info(f"Getting sold items for mer_no: {mer_no}, page: {page_num}")
  117. url = f"{BASE_URL}/zc-api/act/actProduct/getActList"
  118. request_data = {
  119. 'merNo': mer_no,
  120. 'pageNum': page_num,
  121. 'pageSize': PAGE_SIZE,
  122. 'queryType': 1
  123. }
  124. return make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
  125. def get_player_single_page(log, act_id, token, page_num, page_size=PAGE_SIZE):
  126. """
  127. 获取玩家列表(支持翻页)
  128. :param log: 日志对象
  129. :param act_id: 活动ID
  130. :param token: Authorization token
  131. :param page_num: 页码
  132. :param page_size: 每页条数
  133. """
  134. log.debug(f"Getting player list for act_id: {act_id}, page: {page_num}")
  135. # url = f"{BASE_URL}/zc-api/act/actOrder/getActOrderPublicDetails"
  136. url = f"{BASE_URL}/zc-api/act/actOrder/getUserDetailsCount"
  137. # request_data = {'actId': act_id, 'pageNum': page_num, 'pageSize': page_size}
  138. request_data = {"actId": act_id, "pageNum": page_num, "pageSize": page_size}
  139. return make_encrypted_post_request(
  140. log, url, request_data,
  141. extra_headers={"Authorization": token, "pageNum": str(page_num)}
  142. )
  143. def parse_shop_data(log, items, sql_pool):
  144. """
  145. 解析商户数据
  146. :param log: 日志对象
  147. :param items: 商户列表
  148. :param sql_pool: MySQL连接池
  149. :return: 解析后的数据列表
  150. """
  151. log.debug(f"Parsing shop data...........")
  152. info_list = []
  153. for item in items:
  154. # log.debug(f"Processing shop item: {item}")
  155. shop_id = item.get('merNo')
  156. shop_name = item.get('merName')
  157. sold_number = item.get('spell_number')
  158. # link_man = item.get('linkMan')
  159. # user_id = item.get('userId')
  160. fans = item.get('attentionNumber')
  161. data_dict = {
  162. 'shop_id': shop_id,
  163. 'shop_name': shop_name,
  164. 'sold_number': sold_number,
  165. 'fans': fans
  166. }
  167. log.debug(f"Parsed shop data: {data_dict}")
  168. info_list.append(data_dict)
  169. # 保存/更新 根据shop_id判断 是否存在,存在则更新,不存在则插入
  170. sql = "INSERT INTO zc_shop_record (shop_id, shop_name, sold_number, fans) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE shop_name=VALUES(shop_name), sold_number=VALUES(sold_number), fans=VALUES(fans)"
  171. # 将字典列表转换为元组列表
  172. args_list = [tuple(d.values()) for d in info_list]
  173. sql_pool.insert_many(query=sql, args_list=args_list)
  174. @retry(stop=stop_after_attempt(3), wait=wait_fixed(1), after=after_log)
  175. def get_video(log, token, pid):
  176. """
  177. 获取活动视频信息
  178. :param log: 日志对象
  179. :param token: Authorization token
  180. :param pid: 活动ID
  181. :return: (live_id, open_time, close_time, video_url)
  182. """
  183. url = "https://cashier.yqszpay.com/zc-api/live/actLive/getMerLiveInfo"
  184. request_data = {'actId': pid}
  185. log.debug(f"获取视频信息,actId: {pid}")
  186. resp_data = make_encrypted_post_request(
  187. log, url, request_data,
  188. extra_headers={"Authorization": token}
  189. )
  190. # log.debug(f"视频响应: {resp_data}")
  191. live_id = resp_data.get('live', {}).get('liveId')
  192. live_open_time = resp_data.get('live', {}).get('openTime')
  193. live_close_time = resp_data.get('live', {}).get('closeTime')
  194. video_url = resp_data.get('live', {}).get('videoUrl')
  195. return live_id, live_open_time, live_close_time, video_url
  196. def parse_sold_data(log, token, items, sql_pool, shop_name):
  197. """
  198. 解析商品数据
  199. :param log: 日志对象
  200. :param token: Authorization token
  201. :param items: 商品列表
  202. :param sql_pool: MySQL连接池
  203. :param shop_name: 商户名称
  204. :return: 解析后的数据列表
  205. """
  206. info_list = []
  207. for item in items:
  208. # log.debug(f"Processing sold item: {item}")
  209. shop_id = item.get('merNo') # 商户编号
  210. pid = item.get('id')
  211. act_day = item.get('actDay') # 活动天数
  212. act_logo = item.get('actLogo')
  213. act_name = item.get('actName') # 活动名称
  214. act_no = item.get('actNo') # 活动编号
  215. act_status = item.get('actStatus') # 活动状态
  216. startDate = item.get('startDate') # 开始时间
  217. endDate = item.get('complete_date') # 结束时间
  218. storageId = item.get('storageId') # 存储ID
  219. storageName = item.get('storageName') # 存储名称
  220. unitPrice = item.get('unitPrice') # 单价
  221. sumPrice = item.get('sumPrice') # 总价
  222. reality_price = item.get('realityPrice') # 实际价格
  223. packageNumber = item.get('packageNumber') # 包配置
  224. schedule = item.get('schedule') # 库存
  225. live_id, live_open_time, live_close_time, video_url = get_video(log, token, pid)
  226. data_dict = {
  227. 'shop_id': shop_id,
  228. 'shop_name': shop_name,
  229. 'pid': pid,
  230. 'act_day': act_day,
  231. 'act_img': act_logo,
  232. 'act_name': act_name,
  233. 'act_no': act_no,
  234. 'act_status': act_status,
  235. 'start_date': startDate,
  236. 'end_date': endDate,
  237. 'storage_id': storageId,
  238. 'storage_name': storageName,
  239. 'unit_price': unitPrice,
  240. 'sum_price': sumPrice,
  241. 'reality_price': reality_price,
  242. 'package_number': packageNumber,
  243. 'schedule': schedule,
  244. 'live_id': live_id,
  245. 'live_open_time': live_open_time,
  246. 'live_close_time': live_close_time,
  247. 'video_url': video_url
  248. }
  249. # log.debug(f"Parsed sold data: {data_dict}")
  250. # { 'live_close_time': None, 'video_url': None}
  251. info_list.append(data_dict)
  252. # 保存数据
  253. sql_pool.insert_many(table='zc_product_record', data_list=info_list, ignore=True)
  254. def parse_player_data_bak(log, items, sql_pool):
  255. """
  256. 解析玩家数据
  257. :param log: 日志对象
  258. :param items: 玩家列表
  259. :param sql_pool: MySQL连接池
  260. :return: 解析后的数据列表
  261. """
  262. log.debug(f"Parsing player data...........")
  263. info_list = []
  264. for item in items:
  265. # log.debug(f"Processing player item: {item}")
  266. pid = item.get('actId')
  267. player_id = item.get('id')
  268. order_id = item.get('orderId')
  269. secret_name = item.get('secretName')
  270. add_time = item.get('addTime')
  271. user_id = item.get('userId')
  272. user_name = item.get('user_name')
  273. data_dict = {
  274. 'pid': pid,
  275. 'player_id': player_id,
  276. 'order_id': order_id,
  277. 'secret_name': secret_name,
  278. 'add_time': add_time,
  279. 'user_id': user_id,
  280. 'user_name': user_name
  281. }
  282. # log.debug(f"Parsed player data: {data_dict}")
  283. info_list.append(data_dict)
  284. # 保存数据
  285. sql_pool.insert_many(table='zc_player_record', data_list=info_list, ignore=True)
  286. def parse_player_data(log, items, sql_pool):
  287. """
  288. 解析玩家数据
  289. :param log: 日志对象
  290. :param items: 玩家列表
  291. :param sql_pool: MySQL连接池
  292. :return: 解析后的数据列表
  293. """
  294. log.debug(f"Parsing player data...........")
  295. info_list = []
  296. for item in items:
  297. # log.debug(f"Processing player item: {item}")
  298. pid = item.get('actId')
  299. give_number = item.get('giveNumber') # 份数
  300. user_id = item.get('userId')
  301. user_name = item.get('userName')
  302. data_dict = {
  303. 'pid': pid,
  304. 'give_number': give_number,
  305. 'user_id': user_id,
  306. 'user_name': user_name
  307. }
  308. # log.debug(f"Parsed player data: {data_dict}")
  309. info_list.append(data_dict)
  310. # 保存数据
  311. sql_pool.insert_many(table='zc_player_record', data_list=info_list, ignore=True)
  312. def get_shop_list(log, sql_pool):
  313. """
  314. 商户列表翻页生成器
  315. :param log: 日志对象
  316. :param sql_pool: MySQL连接池
  317. """
  318. page_num = 1
  319. total = 0
  320. while page_num <= 100:
  321. result = get_shop_single_page(log, page_num, PAGE_SIZE)
  322. # print(result)
  323. if result is None:
  324. log.error(f"第 {page_num} 页请求失败,停止翻页")
  325. break
  326. data_list = result.get('rows', [])
  327. parse_shop_data(log, data_list, sql_pool)
  328. # 获取总条数(第一页时获取)
  329. if total is None and 'total' in result:
  330. total = result['total']
  331. log.info(f"总记录数: {total}")
  332. # 检查是否有数据
  333. if len(data_list) == 0:
  334. log.info(f"第 {page_num} 页无数据,停止翻页")
  335. break
  336. # 根据total判断是否超出范围
  337. if total is not None and (page_num - 1) * PAGE_SIZE >= total:
  338. log.info(f"已遍历完所有数据,停止翻页")
  339. break
  340. log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
  341. page_num += 1
  342. def get_sold_list(log, shop_id, shop_name, token, sql_pool):
  343. """
  344. 商品列表翻页生成器
  345. :param log: 日志对象
  346. :param shop_id: shop_id
  347. :param shop_name: 商户名称
  348. :param token: Authorization token
  349. :param sql_pool: MySQL连接池
  350. """
  351. page_num = 1
  352. max_pages = 1000
  353. while page_num <= max_pages:
  354. result = get_sold_single_page(log, shop_id, page_num)
  355. # print(result)
  356. if result is None:
  357. log.error(f"第 {page_num} 页请求失败,停止翻页")
  358. break
  359. data_list = result.get('rows', [])
  360. parse_sold_data(log, token, data_list, sql_pool, shop_name)
  361. # 检查是否有数据
  362. if len(data_list) < 10:
  363. log.info(f"第 {page_num} 页无数据,停止翻页")
  364. break
  365. log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
  366. page_num += 1
  367. def get_player_list(log, act_id, token, sql_pool):
  368. """
  369. 玩家列表翻页生成器
  370. :param log: 日志对象
  371. :param act_id: 活动ID
  372. :param token: Authorization token
  373. :param sql_pool: MySQL连接池
  374. :return: has_data (True: 有数据, False: 无数据)
  375. """
  376. page_num = 1
  377. max_pages = 100
  378. has_data = False
  379. while page_num <= max_pages:
  380. result = get_player_single_page(log, act_id, token, page_num)
  381. if result is None:
  382. log.error(f"第 {page_num} 页请求失败,停止翻页")
  383. break
  384. data_list = result.get('rows', [])
  385. # 如果有数据才解析
  386. if len(data_list) > 0:
  387. has_data = True
  388. parse_player_data(log, data_list, sql_pool)
  389. # 检查是否有数据
  390. if len(data_list) < 10:
  391. log.info(f"第 {page_num} 页无数据,停止翻页")
  392. break
  393. log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
  394. page_num += 1
  395. return has_data
  396. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  397. def zc_main(log):
  398. """
  399. 主函数
  400. :param log: logger对象
  401. """
  402. log.info(
  403. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  404. # 配置 MySQL 连接池
  405. sql_pool = MySQLConnectionPool(log=log)
  406. if not sql_pool.check_pool_health():
  407. log.error("数据库连接池异常")
  408. raise RuntimeError("数据库连接池异常")
  409. try:
  410. # 获取 token
  411. token_row = sql_pool.select_one("SELECT token FROM zc_token WHERE id = 1")
  412. if not token_row:
  413. log.error("未查询到 token")
  414. return
  415. token = token_row[0]
  416. # player test
  417. # has_data = get_player_list(log, 1800, token, sql_pool)
  418. # 获取shop data
  419. # try:
  420. # get_shop_list(logger, sql_pool)
  421. # except Exception as e:
  422. # log.error(f'iterate_shop_list error: {e}')
  423. #
  424. # time.sleep(5)
  425. #
  426. # # 获取sold data - 遍历所有商户
  427. try:
  428. # 从 shop 表查询所有 merNo
  429. mer_no_rows = sql_pool.select_all("SELECT shop_id FROM zc_shop_record WHERE sold_number != 0")
  430. log.info(f"查询到 {len(mer_no_rows)} 个商户编号: {mer_no_rows}")
  431. for shop_id, shop_name in mer_no_rows:
  432. log.info(f"开始爬取商户 {shop_id}, {shop_name} 的商品数据")
  433. get_sold_list(log, shop_id, shop_name, token, sql_pool)
  434. except Exception as e:
  435. log.error(f'get_sold_list error: {e}')
  436. #
  437. # time.sleep(5)
  438. # 获取player data - 遍历所有活动
  439. try:
  440. # 从 sold 表查询所有 actId
  441. act_id_rows = sql_pool.select_all("SELECT pid FROM zc_product_record WHERE player_state != 1")
  442. act_id_list = [row[0] for row in act_id_rows] if act_id_rows else []
  443. log.info(f"查询到 {len(act_id_list)} 个活动ID")
  444. for act_id in act_id_list:
  445. try:
  446. # 先将当前 pid 的状态改为 1,表示开始查询
  447. sql_pool.update_one("UPDATE zc_product_record SET player_state = 1 WHERE pid = %s", (act_id,))
  448. log.info(f"将 pid: {act_id} 的状态更新为 1(开始查询)")
  449. log.info(f"开始爬取pid: {act_id} 的玩家数据")
  450. has_data = get_player_list(log, act_id, token, sql_pool)
  451. # 根据是否有数据更新状态
  452. if has_data:
  453. log.info(f"pid: {act_id} 查询到数据,状态保持为 1")
  454. else:
  455. log.info(f"pid: {act_id} 没有数据,状态更新为 2")
  456. sql_pool.update_one("UPDATE zc_product_record SET player_state = 2 WHERE pid = %s", (act_id,))
  457. except Exception as pid_error:
  458. # 如果查询失败,将状态改为 3
  459. log.error(f"pid: {act_id} 查询失败,错误: {pid_error}")
  460. try:
  461. sql_pool.update_one("UPDATE zc_product_record SET player_state = 3 WHERE pid = %s", (act_id,))
  462. log.info(f"已将 pid: {act_id} 的状态更新为 3(查询异常)")
  463. except Exception as update_error:
  464. log.error(f"更新 pid: {act_id} 状态失败: {update_error}")
  465. except Exception as e:
  466. log.error(f'iterate_player_list error: {e}')
  467. except Exception as e:
  468. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  469. finally:
  470. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  471. if __name__ == '__main__':
  472. # 获取单页数据
  473. # result = get_sold_single_page(logger, 'ZC10264451', page_num=1, page_size=20)
  474. # result = get_shop_single_page(logger, page_num=1, page_size=10)
  475. # result = get_player_single_page(logger, 1520, 'your_token', page_num=1, page_size=10)
  476. # print("单页数据:", result)
  477. zc_main(logger)
  478. # get_vodeo(logger, 'your_token', 1726)
  479. # sql_pool = MySQLConnectionPool(log=logger)
  480. # get_shop_list(logger, sql_pool)
  481. # get_shop_single_page(logger, 1, 10)