jhs_sold_spider.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/3/17 10:37
  5. import inspect
  6. import requests
  7. import user_agent
  8. from loguru import logger
  9. from datetime import datetime
  10. from mysq_pool import MySQLConnectionPool
  11. from tenacity import retry, stop_after_attempt, wait_fixed
  12. """
  13. com.jihuanshe
  14. """
  15. logger.remove()
  16. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  17. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  18. level="DEBUG", retention="7 day")
  19. def after_log(retry_state):
  20. """
  21. retry 回调
  22. :param retry_state: RetryCallState 对象
  23. """
  24. # 检查 args 是否存在且不为空
  25. if retry_state.args and len(retry_state.args) > 0:
  26. log = retry_state.args[0] # 获取传入的 logger
  27. else:
  28. log = logger # 使用全局 logger
  29. if retry_state.outcome.failed:
  30. log.warning(
  31. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  32. else:
  33. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  34. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  35. def get_proxys(log):
  36. """
  37. 获取代理
  38. :return: 代理
  39. """
  40. tunnel = "x371.kdltps.com:15818"
  41. kdl_username = "t13753103189895"
  42. kdl_password = "o0yefv6z"
  43. try:
  44. proxies = {
  45. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  46. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  47. }
  48. return proxies
  49. except Exception as e:
  50. log.error(f"Error getting proxy: {e}")
  51. raise e
  52. def save_product_data(sql_pool, product_info: tuple):
  53. """
  54. 保存 product 数据
  55. :param sql_pool: MySQL连接池对象
  56. :param product_info: 要保存的数据
  57. """
  58. sql = """
  59. INSERT INTO jhs_product_record (seller_user_id, seller_username, product_id, app_id, nonce_str, signature, auction_product_name, auction_product_images, game_key, language_text, authenticator_name, grading, starting_price, max_bid_price, status, auction_product_start_time, auction_product_end_time)
  60. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
  61. sql_pool.insert_one(sql, product_info)
  62. def save_biddings_data(sql_pool, biddings_list: list):
  63. """
  64. 保存 biddings 数据
  65. :param sql_pool: MySQL连接池对象
  66. :param biddings_list: 要保存的数据 -> list
  67. """
  68. sql = """
  69. INSERT INTO jhs_biddings_record (product_id, username, bid_price, bid_status, created_at)
  70. VALUES (%s, %s, %s, %s, %s)"""
  71. sql_pool.insert_all(sql, biddings_list)
  72. def save_shop_data(sql_pool, shop_info: tuple):
  73. """
  74. 保存 product 数据
  75. :param sql_pool: MySQL连接池对象
  76. :param shop_info: 要保存的数据
  77. """
  78. sql = """
  79. INSERT INTO jhs_shop_record (seller_user_id, seller_username, follower_count, success_order_user_count, seller_credit_rank_image_url)
  80. VALUES (%s, %s, %s, %s, %s)"""
  81. sql_pool.insert_one(sql, shop_info)
  82. def parse_data(log, resp_json, sql_product_id, sql_pool, sql_shop_id_list):
  83. """
  84. 解析数据, 将解析后的数据, 分成三个表存储 product
  85. biddings
  86. shop
  87. :param log: logger对象
  88. :param resp_json: resp_json
  89. :param sql_product_id: sql_product_id
  90. :param sql_pool: MySQL连接池对象
  91. :param sql_shop_id_list: sql_shop_id_list
  92. """
  93. sql_id = sql_product_id[0]
  94. pid = sql_product_id[1]
  95. status = resp_json.get('status')
  96. if status not in ['ongoing', 'pending']:
  97. """
  98. 获取 shop 信息
  99. """
  100. seller_user_id = resp_json.get('seller_user_id')
  101. # print(type(seller_user_id))
  102. seller_username = resp_json.get('seller_username')
  103. # print(seller_user_id)
  104. # # 查询商家id在不在数据库中
  105. # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM jhs_shop_record WHERE seller_user_id = %s) AS exists_flag"""
  106. # exists_flag = sql_pool.select_one(sql_exists_flag, (seller_user_id,))
  107. # exists_flag = exists_flag[0]
  108. # if exists_flag == 1:
  109. if str(seller_user_id) in sql_shop_id_list:
  110. log.info(
  111. f"----------------- The seller_user_id {seller_user_id} is already in the database, Not need save -----------------")
  112. else:
  113. follower_count = resp_json.get('follower_count')
  114. success_order_user_count = resp_json.get('success_order_user_count')
  115. seller_credit_rank_image_url = resp_json.get('seller_credit_rank_image_url')
  116. shop_info = (
  117. seller_user_id, seller_username, follower_count, success_order_user_count, seller_credit_rank_image_url
  118. )
  119. # print(shop_info)
  120. try:
  121. save_shop_data(sql_pool, shop_info)
  122. sql_shop_id_list.append(seller_user_id)
  123. except Exception as e:
  124. if "Duplicate entry" in str(e):
  125. logger.warning(f"存在重复的 seller_user_id{seller_user_id},跳过插入....")
  126. else:
  127. logger.error(f"保存数据 seller_user_id 时出错: {str(e)}")
  128. """
  129. 获取 product 信息
  130. """
  131. app_id = resp_json.get('appId')
  132. nonce_str = resp_json.get('nonceStr')
  133. signature = resp_json.get('signature')
  134. # auction_product_id = resp_json.get('auction_product_id')
  135. auction_product_name = resp_json.get('auction_product_name')
  136. auction_product_images_list = resp_json.get('auction_product_images', [])
  137. if auction_product_images_list:
  138. auction_product_images = ','.join(auction_product_images_list)
  139. else:
  140. auction_product_images = None
  141. game_key = resp_json.get('game_key')
  142. language_text = resp_json.get('language_text')
  143. authenticator_name = resp_json.get('authenticator_name')
  144. grading = resp_json.get('grading')
  145. starting_price = resp_json.get('starting_price')
  146. max_bid_price = resp_json.get('max_bid_price')
  147. auction_product_start_timestamp = resp_json.get('auction_product_start_timestamp')
  148. auction_product_start_time = datetime.fromtimestamp(auction_product_start_timestamp).strftime(
  149. '%Y-%m-%d %H:%M:%S') if auction_product_start_timestamp else None
  150. auction_product_end_timestamp = resp_json.get('auction_product_end_timestamp')
  151. auction_product_end_time = datetime.fromtimestamp(auction_product_end_timestamp).strftime(
  152. '%Y-%m-%d %H:%M:%S') if auction_product_end_timestamp else None
  153. product_info = (
  154. seller_user_id, seller_username, pid, app_id, nonce_str, signature, auction_product_name,
  155. auction_product_images, game_key, language_text, authenticator_name, grading, starting_price, max_bid_price,
  156. status, auction_product_start_time, auction_product_end_time
  157. )
  158. # print(product_info)
  159. try:
  160. save_product_data(sql_pool, product_info)
  161. sql_pool.update_one("update jhs_task set task_state = 1 where id = %s", (sql_id,))
  162. except Exception as e:
  163. if "Duplicate entry" in str(e):
  164. logger.warning(f"存在重复的 pid{pid},跳过插入....")
  165. else:
  166. logger.error(f"保存数据时出错: {str(e)}")
  167. return
  168. """
  169. 获取 biddings 信息
  170. """
  171. biddings = resp_json.get('biddings', [])
  172. # print(biddings)
  173. # 创建一个字典来存储每个用户的最高出价记录
  174. highest_bids = {}
  175. for record in biddings:
  176. username = record['username']
  177. bid_price = float(record['bid_price']) # 将出价转换为浮点数以便比较
  178. # 如果用户不在字典中,或者当前出价高于已存储的最高出价,则更新记录
  179. if username not in highest_bids or bid_price > float(highest_bids[username]['bid_price']):
  180. highest_bids[username] = record
  181. bids_list = list(highest_bids.values())
  182. biddings_list = [
  183. (pid, record['username'], record['bid_price'], record['bid_status'], record['created_at'])
  184. for record in bids_list
  185. ]
  186. # print(biddings_list)
  187. if biddings_list:
  188. save_biddings_data(sql_pool, biddings_list)
  189. else:
  190. log.info(f"................ No biddings found for product {pid}, Not need save ................")
  191. else:
  192. log.info(f"................ The product {pid} is ongoing or pending, Not need parse ................")
  193. sql_pool.update_one("update jhs_task set task_state = 4 where id = %s", (sql_id,))
  194. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  195. def get_resp(log, sql_product_id, sql_pool, sql_shop_id_list):
  196. """
  197. 获取 response 响应
  198. :param log: logger对象
  199. :param sql_product_id: sql_product_id
  200. :param sql_pool: MySQL连接池对象
  201. :param sql_shop_id_list: sql_shop_id_list
  202. """
  203. sql_id = sql_product_id[0]
  204. pid = sql_product_id[1]
  205. headers = {
  206. "accept": "application/json, text/plain, */*",
  207. "referer": "https://www.jihuanshe.com/",
  208. "user-agent": user_agent.generate_user_agent()
  209. }
  210. url = "https://api.jihuanshe.com/api/market/share/auction-product"
  211. params = {
  212. "auction_product_id": pid,
  213. "url": f"https://www.jihuanshe.com/app/auction?auctionProductId={pid}"
  214. }
  215. response = requests.get(url, headers=headers, params=params, timeout=5, proxies=get_proxys(log))
  216. # print(response.json())
  217. # print(response)
  218. resp_json = response.json()
  219. if resp_json:
  220. if resp_json.get("code") == 440:
  221. log.debug(f"< 抱歉,该竞价商品已被移除, pid:{pid} >")
  222. sql_pool.update_one("update jhs_task set task_state = 2 where id = %s", (sql_id,))
  223. else:
  224. try:
  225. parse_data(log, resp_json, sql_product_id, sql_pool, sql_shop_id_list)
  226. # sql_pool.update_one("update jhs_task set task_state = 1 where id = %s", (sql_id,))
  227. except Exception as e:
  228. log.error(f"get_resp call parse_data() sql_product_id:{sql_product_id} 获取失败, error:{e}")
  229. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  230. def jhs_main(log):
  231. """
  232. 主函数
  233. :param log: logger对象
  234. """
  235. log.info(
  236. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  237. # 配置 MySQL 连接池
  238. sql_pool = MySQLConnectionPool(log=log)
  239. if not sql_pool:
  240. log.error("MySQL数据库连接失败")
  241. raise Exception("MySQL数据库连接失败")
  242. try:
  243. # while True:
  244. # product_id_list = sql_pool.select_all(
  245. # "SELECT id, product_id FROM jhs_task WHERE task_state = 0 AND id <= 420107 ORDER BY product_id DESC")
  246. # 补充了17511条
  247. product_id_list = sql_pool.select_all(
  248. "SELECT jt.id ,jt.product_id FROM jhs_task jt LEFT JOIN jhs_product_record jpr ON jt.product_id = jpr.product_id WHERE jt.task_state = 1 AND jpr.product_id IS NULL")
  249. product_id_list = [keyword for keyword in product_id_list]
  250. sql_shop_id_list = sql_pool.select_all("SELECT seller_user_id FROM jhs_shop_record")
  251. sql_shop_id_list = [keyword[0] for keyword in sql_shop_id_list]
  252. # if not product_id_list:
  253. # log.info(f'没有待采集商品,等待十分钟再查询采集任务............')
  254. # time.sleep(600)
  255. # continue
  256. for sql_product_id in product_id_list:
  257. try:
  258. get_resp(log, sql_product_id, sql_pool, sql_shop_id_list)
  259. except Exception as e:
  260. log.error(f"Loop sql_product_id:{sql_product_id} 获取失败, error:{e}")
  261. sql_pool.update_one("update jhs_task set task_state = 3 where id = %s", (sql_product_id[0],))
  262. except Exception as e:
  263. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  264. finally:
  265. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  266. if __name__ == '__main__':
  267. jhs_main(logger)
  268. # get_resp(logger, (1, 7), MySQLConnectionPool(log=logger))