闪购历史已售.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/6/30 19:15
  5. # -*- coding: utf-8 -*-
  6. # Author : Charley
  7. # Python : 3.10.8
  8. # Date : 2025/6/19 11:27
  9. import time
  10. import pymysql
  11. import utils
  12. import random
  13. import inspect
  14. import datetime
  15. import schedule
  16. from loguru import logger
  17. from mysql_pool import MySQLConnectionPool
  18. from tenacity import retry, stop_after_attempt, wait_fixed
  19. logger.remove()
  20. logger.add("./logs/sg_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  21. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  22. level="DEBUG", retention="7 day")
  23. CRAWL_CATEGORY = '卡牌'
  24. def after_log(retry_state):
  25. """
  26. retry 回调
  27. :param retry_state: RetryCallState 对象
  28. """
  29. # 检查 args 是否存在且不为空
  30. if retry_state.args and len(retry_state.args) > 0:
  31. log = retry_state.args[0] # 获取传入的 logger
  32. else:
  33. log = logger # 使用全局 logger
  34. if retry_state.outcome.failed:
  35. log.warning(
  36. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  37. else:
  38. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  39. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  40. def get_category_list(log, sql_pool, sql_p_list):
  41. """
  42. 获取类别列表
  43. :param log:
  44. :param sql_pool:
  45. :param sql_p_list:
  46. """
  47. log.debug("Getting category list............")
  48. url = "https://api.qiandao.com/c2c-web/v1/stock-order/bargin-v3"
  49. data = {
  50. "tagIds": [
  51. 1422849,
  52. 1132249,
  53. 1622975,
  54. 1583218,
  55. 1706130,
  56. 1113003,
  57. 1000375,
  58. 1572324,
  59. 1436291,
  60. 1560387,
  61. 1276355,
  62. 1423075,
  63. 1583179,
  64. 42474,
  65. 324,
  66. 1446179,
  67. 1533642,
  68. 1542944,
  69. 1568717,
  70. 1572231,
  71. 1532176,
  72. 1506156,
  73. 1568683,
  74. 1541805,
  75. 1277151,
  76. 1529427,
  77. 1514792,
  78. 1515959,
  79. 1541749
  80. ],
  81. "withStockCount": 4
  82. }
  83. resp_json = utils.request_post_data(log, url, data)
  84. if not resp_json:
  85. log.error("get_category_list error")
  86. raise RuntimeError("get_category_list error")
  87. parse_category_list(log, resp_json, sql_pool, sql_p_list)
  88. def parse_category_list(log, resp_json, sql_pool, sql_p_list):
  89. """
  90. 解析类别列表数据
  91. :param log:
  92. :param resp_json:
  93. :param sql_pool:
  94. :param sql_p_list:
  95. """
  96. items = resp_json.get("data", [])
  97. for item in items:
  98. tag_id = item.get("tagId")
  99. if tag_id:
  100. try:
  101. get_product_list(log, tag_id, sql_pool, sql_p_list)
  102. except Exception as e:
  103. log.error(f'parse_category_list to get_product_list error: {e}')
  104. else:
  105. log.error("tag_id is empty")
  106. def get_product_list(log, tag_id, sql_pool, sql_p_list):
  107. """
  108. 获取商品列表
  109. :param log:
  110. :param tag_id:
  111. :param sql_pool:
  112. :param sql_p_list:
  113. """
  114. log.debug(f" {inspect.currentframe().f_code.co_name} for tag_id:{tag_id}.....")
  115. page = 1
  116. max_page = 500
  117. total_count = 0
  118. while page <= max_page:
  119. len_item, totalCount = get_product_one_page(log, tag_id, page, sql_pool, sql_p_list)
  120. if len_item < 10:
  121. log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
  122. break
  123. total_count += len_item
  124. if total_count >= int(totalCount):
  125. log.debug(f"total_count: {total_count} totalCount: {totalCount}")
  126. break
  127. page += 1
  128. # time.sleep(random.uniform(0.1, 1))
  129. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  130. def get_product_one_page(log, tag_id, page_num, sql_pool, sql_p_list):
  131. """
  132. 获取产品 单页数据
  133. :param log:
  134. :param tag_id:
  135. :param page_num:
  136. :param sql_pool:
  137. :param sql_p_list:
  138. :return:
  139. """
  140. log.debug(f"Getting product list for tag_id: {tag_id} page_num: {page_num}............")
  141. url = "https://api.qiandao.com/c2c-web/v1/stock-order/stock-order-zone"
  142. data = {
  143. # "tagId": "1132249",
  144. "tagId": tag_id,
  145. "limit": 10,
  146. "offset": (page_num - 1) * 10,
  147. "isWished": False,
  148. "sort": "SELL_COUNT_DESC"
  149. }
  150. resp_json = utils.request_post_data(log, url, data)
  151. if not resp_json:
  152. log.error("get_product_one_page error")
  153. raise RuntimeError("get_product_one_page error")
  154. items = resp_json.get("data", {}).get("items", [])
  155. totalCount = resp_json.get("data", {}).get("totalCount", 0)
  156. parse_product_data(log, items, tag_id, sql_pool, sql_p_list)
  157. return len(items), totalCount
  158. def parse_product_data(log, items, tag_id, sql_pool, sql_p_list):
  159. """
  160. 解析产品数据
  161. :param log:
  162. :param items:
  163. :param tag_id:
  164. :param sql_pool:
  165. :param sql_p_list:
  166. """
  167. # info_list = []
  168. for item in items:
  169. spu_id = item.get("spuId", "")
  170. if spu_id in sql_p_list:
  171. log.info(f"spu_id:{spu_id} is exist, skip.......")
  172. continue
  173. spu_name = item.get("name", "")
  174. tag_name = item.get("tagName", "")
  175. mega_spu_name = item.get("megaSpuName", "")
  176. data_dict = {
  177. "tag_id": tag_id,
  178. "tag_name": tag_name,
  179. "spu_id": spu_id,
  180. "spu_name": spu_name,
  181. "mega_spu_name": mega_spu_name,
  182. }
  183. # print(data_dict)
  184. # info_list.append(data_dict)
  185. try:
  186. sql_pool.insert_one_or_dict(table="qiandao_sg_category_record", data=data_dict)
  187. except pymysql.err.IntegrityError as e:
  188. if "Duplicate entry" in str(e):
  189. log.warning("存在重复的 spu_id,跳过插入")
  190. else:
  191. # raise e
  192. log.warning(f"{str(e)[:200]}")
  193. sql_p_list.append(spu_id)
  194. # if info_list:
  195. # try:
  196. # sql_pool.insert_many(table="qiandao_sg_category_record", data_list=info_list)
  197. # except Exception as e:
  198. # log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  199. # -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  200. def get_sg_history_list(log, sql_pool, sql_spu_id):
  201. """
  202. 获取商品历史交易记录
  203. :param log:
  204. :param sql_pool:
  205. :param sql_spu_id:
  206. :return:
  207. """
  208. page = 1
  209. max_page = 50
  210. # 获取该 spu 最后一条的时间(最新交易时间)
  211. max_trading_time = sql_pool.select_one(
  212. f"SELECT MAX(trading_time) FROM qiandao_sg_sold_record WHERE spu_id = '{sql_spu_id}'"
  213. )
  214. max_trading_time = max_trading_time[0] if max_trading_time else None
  215. while page <= max_page:
  216. items = get_sg_history_sold_one_page(log, sql_spu_id, page)
  217. if not items:
  218. log.debug(f'--------------- page {page} has no items, break ---------------')
  219. break
  220. has_new_data = parse_sold_data(log, items, sql_pool, sql_spu_id, max_trading_time)
  221. if not has_new_data:
  222. log.debug(f'--------------- page {page} has no newer data, break ---------------')
  223. break
  224. if len(items) < 20:
  225. log.debug(f'--------------- page {page} has less than 20 items, break ---------------')
  226. break
  227. page += 1
  228. # time.sleep(random.uniform(0.1, 1))
  229. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  230. def get_sg_history_sold_one_page(log, spu_id, page):
  231. """
  232. 获取商品历史交易记录单页数据
  233. :param log:
  234. :param spu_id:
  235. :param page:
  236. :return:
  237. """
  238. log.debug(f"get_sg_history_sold_one_page: 开始获取第{page}页.............")
  239. url = "https://api.qiandao.com/c2c-web/v1/common/get-spu-trading-history-list"
  240. data = {
  241. # "spuId": "654833013418492160",
  242. "spuId": spu_id,
  243. "skuId": "0",
  244. # "tradingType": "EXCHANGE",
  245. "tradingType": "",
  246. "limit": "20",
  247. "offset": f"{(page - 1) * 20}"
  248. }
  249. resp_json = utils.request_post_data(log, url, data, proxy=False)
  250. if not resp_json:
  251. log.error("get_sg_history_sold_one_page error")
  252. raise RuntimeError("get_sg_history_sold_one_page error")
  253. items = resp_json.get("data", {}).get("items", [])
  254. return items
  255. def parse_sold_data(log, items, sql_pool, spu_id, max_trading_time):
  256. """
  257. 解析商品历史交易记录数据
  258. :param log:
  259. :param items:
  260. :param sql_pool:
  261. :param spu_id:
  262. :param max_trading_time:
  263. :return:
  264. """
  265. try:
  266. info_list = []
  267. has_new_data = False # 是否有新数据
  268. for item in items:
  269. tradingTime = item.get("tradingTime", "")
  270. trading_time_str = utils.transform_ms(log, tradingTime)
  271. if not trading_time_str:
  272. continue
  273. # 将 trading_time_str 转换为 datetime.datetime
  274. try:
  275. trading_dt = datetime.datetime.strptime(trading_time_str, '%Y-%m-%d %H:%M:%S')
  276. except Exception as e:
  277. log.error(f"Error parsing trading_time: {e}")
  278. continue
  279. # 如果 max_trading_time 为 None,全部保留
  280. if max_trading_time is None:
  281. # log.debug('max_trading_time is None')
  282. has_new_data = True
  283. elif trading_dt > max_trading_time:
  284. log.debug(f'trading_dt: {trading_dt} > max_trading_time: {max_trading_time}, 保留')
  285. has_new_data = True
  286. else:
  287. log.debug(f'trading_dt: {trading_dt} <= max_trading_time: {max_trading_time}, 跳过旧数据')
  288. break # 跳过旧数据
  289. buyer_name = item.get("buyerName", "")
  290. product_name = item.get("productName", "")
  291. trading_volume = item.get("tradingVolume", "")
  292. trading_amount = item.get("tradingAmount", "")
  293. is_acquisition_ = item.get("isAcquisition", "")
  294. is_acquisition = 1 if is_acquisition_ else 0
  295. tradingType = item.get("tradingType") # 没有闪电标的是: "C2C", 有闪电标的是: "EXCHANGE"
  296. data_dict = {
  297. "spu_id": spu_id,
  298. "buyer_name": buyer_name,
  299. "product_name": product_name,
  300. "trading_volume": trading_volume,
  301. "trading_amount": trading_amount,
  302. "is_acquisition": is_acquisition,
  303. "trading_time": trading_time_str,
  304. "trading_type": tradingType,
  305. "crawl_category": CRAWL_CATEGORY
  306. }
  307. # print(data_dict)
  308. info_list.append(data_dict)
  309. # try:
  310. # sql_pool.insert_one_or_dict(table="qiandao_sg_sold_record", data=data_dict)
  311. # except Exception as e:
  312. # log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  313. if info_list:
  314. sql_pool.insert_many(table="qiandao_sg_sold_record", data_list=info_list)
  315. return has_new_data # 返回是否有新数据
  316. except Exception as e:
  317. log.error(f" {inspect.currentframe().f_code.co_name} error: {e}")
  318. return False
  319. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  320. def qd_sg_main(log):
  321. """
  322. 主函数
  323. :param log: logger对象
  324. """
  325. log.info(
  326. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  327. # 配置 MySQL 连接池
  328. sql_pool = MySQLConnectionPool(log=log)
  329. if not sql_pool.check_pool_health():
  330. log.error("数据库连接池异常")
  331. raise RuntimeError("数据库连接池异常")
  332. try:
  333. # 获取 闪购 已售
  334. log.info("-------------------------------------- 开始获取 闪购 已售 --------------------------------------")
  335. try:
  336. sql_spu_id_list = sql_pool.select_all(
  337. f"SELECT spu_id FROM qiandao_sg_category_record WHERE crawl_category = '{CRAWL_CATEGORY}'")
  338. sql_spu_id_list = [item[0] for item in sql_spu_id_list]
  339. # start_processing = False # 控制是否开始处理的标志
  340. for sql_spu_id in sql_spu_id_list:
  341. # 715450768790066187
  342. # if sql_spu_id == "776924231970643245":
  343. # start_processing = True # 遇到目标 spu_id 后开启处理
  344. #
  345. # if not start_processing:
  346. # continue # 在遇到目标 spu_id 前跳过
  347. log.info(f"开始获取:{sql_spu_id} 已售商品")
  348. try:
  349. get_sg_history_list(log, sql_pool, sql_spu_id)
  350. except Exception as e:
  351. log.error(f"Error fetching get_sold_list for sql_spu_id:{sql_spu_id}, {e}")
  352. sql_spu_id_list.clear()
  353. except Exception as e:
  354. log.error(f"Error fetching sql_shop_id_list: {e}")
  355. except Exception as e:
  356. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  357. finally:
  358. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  359. # EmailSender().send(subject="【千岛 - 爬虫通知】今日任务已完成",
  360. # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。")
  361. if __name__ == '__main__':
  362. qd_sg_main(log=logger)