福袋补充.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/6/30 17:59
  5. # -*- coding: utf-8 -*-
  6. # Author : Charley
  7. # Python : 3.10.8
  8. # Date : 2025/6/19 14:12
  9. import time
  10. from datetime import datetime
  11. import utils
  12. import random
  13. import inspect
  14. import pymysql
  15. import schedule
  16. from loguru import logger
  17. from mysql_pool import MySQLConnectionPool
  18. from tenacity import retry, stop_after_attempt, wait_fixed
  19. logger.remove()
  20. logger.add("./logs/bag_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  21. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  22. level="DEBUG", retention="7 day")
  23. def after_log(retry_state):
  24. """
  25. retry 回调
  26. :param retry_state: RetryCallState 对象
  27. """
  28. # 检查 args 是否存在且不为空
  29. if retry_state.args and len(retry_state.args) > 0:
  30. log = retry_state.args[0] # 获取传入的 logger
  31. else:
  32. log = logger # 使用全局 logger
  33. if retry_state.outcome.failed:
  34. log.warning(
  35. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  36. else:
  37. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  38. def get_luckybag_list(log, sql_pool, sql_luckybag_list):
  39. page = 1
  40. max_page = 500
  41. total_count = 0
  42. while page <= max_page:
  43. len_item, totalCount = get_luckybag_one_page(log, page, sql_pool,sql_luckybag_list)
  44. if len_item < 10:
  45. log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
  46. break
  47. total_count += len_item
  48. if total_count >= int(totalCount):
  49. log.debug(f"total_count: {total_count} totalCount: {totalCount}")
  50. break
  51. page += 1
  52. time.sleep(random.uniform(0.1, 1))
  53. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  54. def get_luckybag_one_page(log, page, sql_pool,sql_luckybag_list):
  55. log.debug(f" {inspect.currentframe().f_code.co_name} for page:{page}.....")
  56. url = "https://api.qiandao.cn/c2c-web/v1/luckybag/list"
  57. params = {
  58. "limit": "10",
  59. # "offset": "0",
  60. "offset": str((page - 1) * 10),
  61. "hostId": "228946998408215136",
  62. "orderBy": "updatedAt.desc",
  63. "filterFollow": "false",
  64. "tagIds": "1429578",
  65. "typeIds": "1405184,1132249,1000375,49695,1443553,1002041"
  66. }
  67. resp_json = utils.request_get_data(log, url, params)
  68. if not resp_json:
  69. log.error("get_luckybag_one_page error")
  70. raise RuntimeError("get_luckybag_one_page error")
  71. rows = resp_json.get("data", {}).get("rows", [])
  72. total_count = resp_json.get("data", {}).get("count", 0)
  73. try:
  74. parse_luckybag_data(log, rows, sql_pool,sql_luckybag_list)
  75. except Exception as e:
  76. log.error(f"parse_luckybag_data error: {e}")
  77. return len(rows), total_count
  78. def parse_luckybag_data(log, items, sql_pool,sql_luckybag_list):
  79. # info_list = []
  80. for item in items:
  81. luckybag_id = item.get("id")
  82. if luckybag_id in sql_luckybag_list:
  83. log.info(f"luckybag_id:{luckybag_id} is exist, skip.......")
  84. continue
  85. name = item.get("name")
  86. original_price = item.get("price")
  87. ice_breaking_price = item.get("iceBreakingPrice")
  88. seller_id = item.get("sellerId")
  89. seller_name = item.get("sellerName")
  90. remain_count = item.get("remainCount") # 剩下数量
  91. total_count = item.get("totalCount") # 总数
  92. createdAt = item.get("createdAt")
  93. created_at = utils.transform_ms(log, createdAt)
  94. updatedAt = item.get("updatedAt")
  95. updated_at = utils.transform_ms(log, updatedAt)
  96. info_dict = {
  97. "luckybag_id": luckybag_id,
  98. "luckybag_name": name,
  99. "original_price": original_price,
  100. "ice_breaking_price": ice_breaking_price,
  101. "seller_id": seller_id,
  102. "seller_name": seller_name,
  103. "remain_count": remain_count,
  104. "total_count": total_count,
  105. "created_at": created_at,
  106. "updated_at": updated_at
  107. }
  108. # print(info_dict)
  109. # info_list.append(info_dict)
  110. try:
  111. sql_pool.insert_one_or_dict(
  112. table="qiandao_luckybag_list_record",
  113. data=info_dict
  114. )
  115. except Exception as e:
  116. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  117. sql_luckybag_list.append(luckybag_id)
  118. # if info_list:
  119. # try:
  120. # sql_pool.insert_many(table="qiandao_luckybag_list_record", data_list=info_list)
  121. # except pymysql.err.IntegrityError as e:
  122. # if "Duplicate entry" in str(e):
  123. # logger.warning("存在重复的 luckybag_id,跳过插入")
  124. # else:
  125. # raise e
  126. # -------------------------------------------------------------------------------------------------------------------
  127. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  128. def get_luckybag_detail(log, luckyBagId: str):
  129. """
  130. 获取福袋详情
  131. :param luckyBagId: luckyBagId
  132. :param log: logger对象
  133. """
  134. url = "https://api.qiandao.cn/c2c-web/v1/luckybag"
  135. params = {
  136. # "luckyBagId": "876352952506283967"
  137. "luckyBagId": luckyBagId
  138. }
  139. resp_json = utils.request_get_data(log, url, params)
  140. if not resp_json:
  141. log.error("get_luckybag_detail error")
  142. raise RuntimeError("get_luckybag_detail error")
  143. return resp_json
  144. def parse_luckybag_detail_basic_info_data(log, resp_json, sql_pool, luckyBagId):
  145. try:
  146. basic_info = resp_json.get("data", {}).get("basicInfo", {})
  147. sell_end_time = basic_info.get("sellEndTime")
  148. sell_end_time = utils.transform_ms(log, sell_end_time)
  149. # moderate_status = basic_info.get("moderateStatus")
  150. status = basic_info.get("status")
  151. # 更新 qiandao_luckybag_list_record 表
  152. updata_dict = {
  153. "sell_end_time": sell_end_time,
  154. # "moderate_status": moderate_status,
  155. "status": status
  156. }
  157. # print(f"updata_dict:{updata_dict}")
  158. sql_pool.update_one_or_dict(
  159. table="qiandao_luckybag_list_record",
  160. data=updata_dict,
  161. condition={
  162. "luckybag_id": luckyBagId
  163. }
  164. )
  165. except Exception as e:
  166. log.error(f"{inspect.currentframe().f_code.co_name} for basic_info error: {e}")
  167. def parse_luckybag_detail_product_list_data(log, resp_json, sql_pool, luckybag_id):
  168. try:
  169. product_list = resp_json.get("data", {}).get("products", [])
  170. # info_product_list = []
  171. for product in product_list:
  172. product_id = product.get("id")
  173. product_name = product.get("name")
  174. image = product.get("image")
  175. reference_price = product.get("referencePrice") # 参考价格
  176. spu_id = product.get("spuId")
  177. spec_list = product.get("specValues", [{}])
  178. spec_name_list = [spec.get("name") for spec in spec_list]
  179. spec_name = "".join(spec_name_list) # 规格名称
  180. spec_value_list = [spec.get("value") for spec in spec_list]
  181. spec_value = "".join(spec_value_list) # 规格值
  182. remain_count = product.get("remainCount") # 剩余数量
  183. total_count = product.get("totalCount") # 总数
  184. product_dict = {
  185. "luckybag_id": luckybag_id,
  186. "product_id": product_id,
  187. "product_name": product_name,
  188. "image": image,
  189. "reference_price": reference_price,
  190. "spu_id": spu_id,
  191. "spec_name": spec_name,
  192. "spec_value": spec_value,
  193. "remain_count": remain_count,
  194. "total_count": total_count
  195. }
  196. # print(f"product_dict:{product_dict}")
  197. try:
  198. sql_pool.insert_one_or_dict(table="qiandao_luckybag_products_record", data=product_dict)
  199. sql_pool.update_one_or_dict(
  200. table="qiandao_luckybag_list_record",
  201. data={
  202. "detail_state": 1
  203. },
  204. condition={
  205. "luckybag_id": luckybag_id
  206. }
  207. )
  208. except Exception as e:
  209. log.error(f"{inspect.currentframe().f_code.co_name} for product_list error: {e}")
  210. # info_product_list.append(product_dict)
  211. # if info_product_list:
  212. # try:
  213. # sql_pool.insert_many(table="qiandao_luckybag_products_record", data_list=info_product_list)
  214. # sql_pool.update_one_or_dict(
  215. # table="qiandao_luckybag_list_record",
  216. # data={
  217. # "detail_state": 1
  218. # },
  219. # condition={
  220. # "luckybag_id": luckybag_id
  221. # }
  222. # )
  223. # except pymysql.err.IntegrityError as e:
  224. # if "Duplicate entry" in str(e):
  225. # logger.warning("存在重复的 product_id,跳过插入")
  226. # else:
  227. # raise e
  228. except Exception as e:
  229. log.error(f"{inspect.currentframe().f_code.co_name} for product_list error: {e}")
  230. def parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckybag_id):
  231. try:
  232. order_list = []
  233. # 先查询该福袋 购买数据的最大时间
  234. max_luckybay_time = sql_pool.select_one(
  235. query="SELECT MAX(order_created_at) FROM qiandao_luckybag_orders_record WHERE luckybag_id = %s",
  236. args=(luckybag_id,))
  237. max_luckybay_time = max_luckybay_time[0] if max_luckybay_time else None
  238. records = resp_json.get("data", {}).get("records", [])
  239. for record in records:
  240. product_id = record.get("productId")
  241. # product_name = record.get("productName")
  242. serial_number = record.get("serialNumber") # 序列号
  243. order_id = record.get("orderId")
  244. trading_time = record.get("orderCreatedAt")
  245. trading_time_str = utils.transform_ms(log, trading_time)
  246. if not trading_time_str:
  247. log.debug(f"Invalid trading time: {trading_time_str}")
  248. continue # 跳过无效时间
  249. # 字符串 -> datetime
  250. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  251. if max_luckybay_time and trading_time <= max_luckybay_time:
  252. log.debug(f"trading_time: {trading_time_str} <= max_luckybay_time: {max_luckybay_time}, 跳过旧数据")
  253. break
  254. buyerNickname = record.get("buyerNickname")
  255. order_dict = {
  256. "luckybag_id": luckybag_id,
  257. "product_id": product_id,
  258. "order_id": order_id,
  259. "serial_number": serial_number,
  260. "order_created_at": trading_time_str,
  261. "buyer_nickname": buyerNickname
  262. }
  263. # print(f"order_dict:{order_dict}")
  264. # try:
  265. # sql_pool.insert_one_or_dict(table="qiandao_luckybag_orders_record", data=order_dict)
  266. # sql_pool.update_one_or_dict(
  267. # table="qiandao_luckybag_list_record",
  268. # data={
  269. # "order_state": 1
  270. # },
  271. # condition={
  272. # "luckybag_id": luckybag_id
  273. # }
  274. # )
  275. # except Exception as e:
  276. # log.error(f"{inspect.currentframe().f_code.co_name} for order_list error: {e}")
  277. order_list.append(order_dict)
  278. if order_list:
  279. try:
  280. sql_pool.insert_many(table="qiandao_luckybag_orders_record", data_list=order_list)
  281. sql_pool.update_one_or_dict(
  282. table="qiandao_luckybag_list_record",
  283. data={
  284. "order_state": 1
  285. },
  286. condition={
  287. "luckybag_id": luckybag_id
  288. }
  289. )
  290. except pymysql.err.IntegrityError as e:
  291. if "Duplicate entry" in str(e):
  292. logger.warning("存在重复的 order_id,跳过插入")
  293. else:
  294. # raise e
  295. log.warning(f"{str(e)[:200]}")
  296. sql_pool.update_one_or_dict(
  297. table="qiandao_luckybag_list_record",
  298. data={
  299. "order_state": 2
  300. },
  301. condition={
  302. "luckybag_id": luckybag_id
  303. }
  304. )
  305. except Exception as e:
  306. log.error(f"{inspect.currentframe().f_code.co_name} for order_list error: {e}")
  307. def get_detail_main(log, sql_pool):
  308. """
  309. 1. 将 qiandao_luckybag_list_record 表中 detail_state = 0 的数据进行更新
  310. 插入 product list 数据 共用一个状态
  311. 2. 将 qiandao_luckybag_list_record 表中 order_state = 0 并且 status 不是 'ON_SALE' 的数据进行更新
  312. :param log: logger对象
  313. :param sql_pool: MySQLConnectionPool对象
  314. """
  315. sql_detail_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record WHERE order_state = 0")
  316. sql_detail_list = [item[0] for item in sql_detail_list]
  317. for luckyBagId in sql_detail_list:
  318. try:
  319. resp_json = get_luckybag_detail(log, luckyBagId)
  320. except Exception as e:
  321. log.error(f"{inspect.currentframe().f_code.co_name} for sql_detail_list: {luckyBagId}, error: {e}")
  322. resp_json = {}
  323. # parse_luckybag_detail_basic_info_data(log, resp_json, sql_pool, luckyBagId)
  324. # parse_luckybag_detail_product_list_data(log, resp_json, sql_pool, luckyBagId)
  325. parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckyBagId)
  326. # ---------------------------------------------------------------------------------------
  327. # order 信息待后续分析 status 后 再运行
  328. # sql_order_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record WHERE order_state = 0 AND status != 'ON_SALE'")
  329. # sql_order_list = [item[0] for item in sql_order_list]
  330. #
  331. # for luckyBagId2 in sql_order_list:
  332. # try:
  333. # resp_json = get_luckybag_detail(log, luckyBagId2)
  334. # except Exception as e:
  335. # log.error(f"{inspect.currentframe().f_code.co_name} for sql_order_list: {luckyBagId2}, error: {e}")
  336. # resp_json = {}
  337. #
  338. # parse_luckybag_detail_order_list_data(log, resp_json, sql_pool, luckyBagId2)
  339. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  340. def qd_lb_main(log):
  341. """
  342. 主函数
  343. :param log: logger对象
  344. """
  345. log.info(
  346. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  347. # 配置 MySQL 连接池
  348. sql_pool = MySQLConnectionPool(log=log)
  349. if not sql_pool.check_pool_health():
  350. log.error("数据库连接池异常")
  351. raise RuntimeError("数据库连接池异常")
  352. try:
  353. # 获取 福袋 列表
  354. log.info("-------------------------------------- 开始 获取 福袋 列表 --------------------------------------")
  355. # sql_luckybag_list = sql_pool.select_all("SELECT luckybag_id FROM qiandao_luckybag_list_record")
  356. # sql_luckybag_list = [item[0] for item in sql_luckybag_list]
  357. # try:
  358. # get_luckybag_list(log, sql_pool,sql_luckybag_list)
  359. # except Exception as e:
  360. # log.error(f"Error fetching get_luckybag_list: {e}")
  361. #
  362. # sql_luckybag_list.clear()
  363. #
  364. # time.sleep(5)
  365. # 获取 福袋 详情
  366. log.info("-------------------------------------- 开始获取 福袋 详情 --------------------------------------")
  367. try:
  368. get_detail_main(log, sql_pool)
  369. except Exception as e:
  370. log.error(f"Error fetching get_detail_main: {e}")
  371. except Exception as e:
  372. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  373. finally:
  374. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  375. # EmailSender().send(subject="【千岛 - 爬虫通知】今日任务已完成",
  376. # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。")
  377. if __name__ == '__main__':
  378. qd_lb_main(logger)