kgj_kapai_spider.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/2/8 13:11
  5. import json
  6. import random
  7. import time
  8. import base64
  9. import schedule
  10. import urllib3
  11. import requests
  12. from typing import Dict
  13. from loguru import logger
  14. from urllib.parse import quote
  15. from Crypto.Cipher import AES
  16. from Crypto.Util.Padding import unpad
  17. from datetime import datetime, timedelta
  18. from mysq_pool import MySQLConnectionPool
  19. from tenacity import retry, stop_after_attempt, wait_fixed
  20. from request_live_detail import get_live_detail
  21. urllib3.disable_warnings()
  22. logger.remove()
  23. logger.add("./kapai_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  24. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  25. level="DEBUG", retention="14 day")
  26. MAX_PAGE = 100
  27. # HEADERS = settings.KGJ_HEADERS
  28. def after_log(retry_state):
  29. """
  30. retry 回调
  31. :param retry_state: RetryCallState 对象
  32. """
  33. log = retry_state.args[0] # 获取传入的 logger
  34. if retry_state.outcome.failed:
  35. log.warning(
  36. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  37. else:
  38. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  39. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  40. def get_proxys(log):
  41. """
  42. 获取代理
  43. :return: 代理
  44. """
  45. # tunnel = "h991.kdltps.com:15818"
  46. # kdl_username = "t12136177769785"
  47. # kdl_password = "ety9bdi8"
  48. tunnel = "x371.kdltps.com:15818"
  49. kdl_username = "t13753103189895"
  50. kdl_password = "o0yefv6z"
  51. try:
  52. proxies = {
  53. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  54. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  55. }
  56. return proxies
  57. except Exception as e:
  58. log.error(f"Error getting proxy: {e}")
  59. raise e
  60. def decimal_to_percent(decimal_value):
  61. if not decimal_value:
  62. return "0%"
  63. # 将小数转换为百分比并保留一位小数
  64. percentage = decimal_value * 100
  65. # 使用格式化去除末尾多余的零
  66. return f"{percentage:g}%" # 'g' 格式会自动去掉不必要的零
  67. def decimal_to_percent_with_dist(decimal_value):
  68. if not decimal_value:
  69. return "0%"
  70. # 将小数转换为百分比
  71. percentage = decimal_value * 100
  72. # 使用格式化字符串,最多保留两位小数,去掉不必要的零
  73. formatted_percentage = f"{percentage:.2f}".rstrip('0').rstrip('.')
  74. return f"{formatted_percentage}%"
  75. def convert_seconds_to_hours_minutes(total_seconds: int):
  76. # 计算完整的小时数
  77. hours = total_seconds // 3600
  78. remaining_seconds_after_hours = total_seconds % 3600
  79. # 计算完整的分钟数
  80. minutes = remaining_seconds_after_hours // 60
  81. result = f"{hours}小时{minutes}分" if hours > 0 else f"{minutes}分" if minutes > 0 else "0分"
  82. return result
  83. def get_date(offset):
  84. """
  85. 获取指定偏移量的日期,格式为 YYYYMMDD。
  86. 参数:
  87. offset (int): 日期偏移量,0 表示今天,-1 表示昨天,-6 表示6天前,-7 表示7天前
  88. 返回:
  89. str: 格式为 YYYYMMDD 的日期字符串
  90. """
  91. today = datetime.today()
  92. target_date = today + timedelta(days=offset)
  93. return target_date.strftime('%Y%m%d')
  94. def decrypt_data(log, par_url, encrypted_data) -> Dict[str, str]:
  95. """
  96. 解密数据
  97. :param log:
  98. :param par_url:
  99. :param encrypted_data:
  100. :return:
  101. """
  102. log.info("开始解密数据 ->->->->->->->->->->->->->->->->->->->->->")
  103. if not isinstance(par_url, str):
  104. return {}
  105. # 对应原js中的str函数
  106. def transform_str(input_str):
  107. encoded = quote(input_str)
  108. return base64.b64encode(encoded.encode()).decode()
  109. str_result = transform_str(par_url) * 3
  110. org_key = str_result[:16]
  111. org_iv = str_result[12:28]
  112. # 使用Crypto库解析key和iv
  113. ikey = org_key.encode('utf-8')
  114. iiv = org_iv.encode('utf-8')
  115. # 解密
  116. cipher = AES.new(ikey, AES.MODE_CBC, iiv)
  117. decrypted_text = unpad(cipher.decrypt(base64.b64decode(encrypted_data)), AES.block_size).decode('utf-8')
  118. decrypted_text = json.loads(decrypted_text)
  119. # print(decrypted_text)
  120. return decrypted_text
  121. def save_product_list(sql_pool, info_list):
  122. sql = """
  123. INSERT INTO kgj_kapai_product_list_record (product_id, title, price, price_str, cos_ratio, sales, live_sales, video_sales, other_sales, live_ratio, video_ratio, other_ratio, shop_id, shop_name, shop_cover, keyword)
  124. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  125. """
  126. sql_pool.insert_all(sql, info_list)
  127. def save_live_detail(sql_pool, info_list):
  128. """
  129. 保存直播详情数据
  130. :param sql_pool: 数据库连接池对象
  131. :param info_list: info_list 列表
  132. """
  133. sql = """
  134. INSERT INTO kgj_kapai_live_detail_record (room_id, live_create_time, live_finish_time, duration, watch_users, avg_users, peak_users, through, exposed_num, stay_duration, new_fans_count, inc_fans_clubs, turn_ratio, interaction_ratio, gmv, sales, atv, explain_duration, sku_count, uv, cvr, rpm, promotion_id, product_id, product_title, product_cover, product_sales, product_putaway_time, product_sold_out_time)
  135. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  136. """
  137. sql_pool.insert_all(sql, info_list)
  138. def update_linked_live_state(sql_pool, sql_id):
  139. """
  140. 更新 kgj_linked_live_record 状态
  141. :param sql_pool: 数据库连接池对象
  142. :param sql_id: sql_id
  143. """
  144. sql = f"UPDATE kgj_kapai_linked_live_record SET live_detail_state = 1 WHERE id = %s"
  145. sql_pool.update_one(sql, (sql_id,))
  146. def parse_product_list(log, resp_json: dict, sql_pool, keyword, stop_paging):
  147. log.info("开始解析 product_list 数据......................")
  148. items = resp_json.get("items", [])
  149. info_list = []
  150. for item in items:
  151. sales = item.get("stat", {}).get("sales")
  152. # 检查 sales 是否为 0 或 '0'
  153. if sales == 0 or sales == '0':
  154. stop_paging[0] = True # 设置停止翻页的标志
  155. log.info("已到达销量为0的页码,停止当前产品列表的翻页..............")
  156. break # 停止解析当前页面的产品列表
  157. product_id = item.get("product_id")
  158. # 20250523 删除根据 product_id 去重
  159. # if product_id in sql_product_id_list:
  160. # log.debug(f"{product_id} 已存在,跳过..............")
  161. # continue
  162. title = item.get("title")
  163. # 如果标题中 包含["卡夹", "卡砖", "卡膜", "鼠标垫"]中的任意一个,则将keyword设置为 "周边"
  164. if any(keyword in title for keyword in ["卡夹", "卡砖", "卡膜", "鼠标垫"]):
  165. log.debug(f"{title} 包含关键字,为周边产品..............")
  166. keyword = "周边"
  167. price = item.get("price")
  168. price_str = item.get("price_str")
  169. cos_ratio = item.get("cos_ratio")
  170. live_sales = item.get("stat", {}).get("live_sales")
  171. video_sales = item.get("stat", {}).get("video_sales")
  172. other_sales = item.get("stat", {}).get("other_sales")
  173. live_ratio = item.get("market_type", {}).get("live_ratio")
  174. live_ratio = decimal_to_percent(live_ratio) if live_ratio else "0%"
  175. video_ratio = item.get("market_type", {}).get("video_ratio")
  176. video_ratio = decimal_to_percent(video_ratio) if video_ratio else "0%"
  177. other_ratio = item.get("market_type", {}).get("other_ratio")
  178. other_ratio = decimal_to_percent(other_ratio) if other_ratio else "0%"
  179. shop_id = item.get("shop_id")
  180. shop_name = item.get("shop_name")
  181. shop_cover = item.get("shop_cover")
  182. info = (product_id, title, price, price_str, cos_ratio, sales, live_sales, video_sales, other_sales, live_ratio,
  183. video_ratio, other_ratio, shop_id, shop_name, shop_cover, keyword)
  184. info_list.append(info)
  185. if info_list:
  186. log.info(f"解析到 {len(info_list)} 条数据......................")
  187. save_product_list(sql_pool, info_list)
  188. else:
  189. log.info("没有解析到数据......................")
  190. # save_product_list(sql_pool, info_list)
  191. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  192. def get_kgj_product_one_page(log, page, keyword, sql_pool, stop_paging, headers):
  193. url = "https://service.kaogujia.com/api/sku/search"
  194. params = {
  195. "limit": "50",
  196. "page": str(page),
  197. "sort_field": "sales",
  198. "sort": "0"
  199. }
  200. data = {
  201. "period": 7,
  202. "keyword": keyword
  203. }
  204. response = requests.post(url, headers=headers, params=params, json=data)
  205. # print(response.text)
  206. # print(response)
  207. response.raise_for_status()
  208. resp_json = response.json()
  209. if resp_json:
  210. if resp_json.get("code") == 200:
  211. enc_data = resp_json.get("data")
  212. par_url = '/api/sku/search'
  213. dec_data = decrypt_data(log, par_url, enc_data)
  214. # print(dec_data)
  215. parse_product_list(log, dec_data, sql_pool, keyword, stop_paging)
  216. else:
  217. log.warning(f"Error get_kgj_product_one_page: {resp_json.get('message')}")
  218. else:
  219. log.warning(f"Error resp_json")
  220. def get_kgj_product_list(log, keyword, sql_pool, headers):
  221. stop_paging = [False] # 使用列表来存储标志,以便在函数间传递
  222. for page in range(1, MAX_PAGE + 1):
  223. try:
  224. log.info(f"Getting kgj product list page {page}, keyword:{keyword}")
  225. get_kgj_product_one_page(log, page, keyword, sql_pool, stop_paging, headers)
  226. except Exception as e:
  227. log.error(f"Error getting kgj product list: {e}")
  228. time.sleep(random.randint(4, 6))
  229. continue
  230. time.sleep(random.randint(4, 6))
  231. if stop_paging[0]: # 检查停止翻页的标志
  232. log.info("停止翻页,因为 sales 为 0 或 '0'")
  233. break
  234. def save_product_overview(sql_pool, info_list):
  235. sql = """
  236. INSERT INTO kgj_kapai_product_overview_record (product_id, date_all, sales_str, live_sales_str, video_sales_str, other_sales_str, users_str, lives_str, videos_str, live_ratio, video_ratio, other_ratio)
  237. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  238. """
  239. sql_pool.insert_all(sql, info_list)
  240. def update_state(sql_pool, product_id, state_str, state_int):
  241. """
  242. 更新 kgj_product_list_record 状态
  243. :param sql_pool: 数据库连接池对象
  244. :param product_id: 产品ID
  245. :param state_str: 状态字段名称
  246. :param state_int: 状态值
  247. """
  248. sql = f"UPDATE kgj_kapai_product_list_record SET {state_str} = %s WHERE product_id = %s"
  249. sql_pool.update_one(sql, (state_int, product_id))
  250. def get_product_overview_percent(log, product_id, headers):
  251. log.info(f"Getting kgj product overview percent, product_id:{product_id}")
  252. url = "https://service.kaogujia.com/api/sku/overview/dist"
  253. data = {
  254. "product_id": product_id,
  255. "from_dt": get_date(-7),
  256. "to_dt": get_date(-1)
  257. }
  258. response = requests.post(url, headers=headers, json=data)
  259. response.raise_for_status()
  260. resp_json = response.json()
  261. if resp_json:
  262. if resp_json.get("code") == 200:
  263. enc_data = resp_json.get("data")
  264. par_url = '/api/sku/overview/dist'
  265. dec_data = decrypt_data(log, par_url, enc_data)
  266. # print(dec_data)
  267. sales_list = dec_data.get("sales")
  268. live_ratio = "0%"
  269. video_ratio = "0%"
  270. other_ratio = "0%"
  271. for sales in sales_list:
  272. if sales.get("name") == "直播带货":
  273. live_ratio = sales.get("percent")
  274. live_ratio = decimal_to_percent_with_dist(live_ratio) if live_ratio else "0%"
  275. if sales.get("name") == "视频带货":
  276. video_ratio = sales.get("percent")
  277. video_ratio = decimal_to_percent_with_dist(video_ratio) if video_ratio else "0%"
  278. if sales.get("name") == "商品卡":
  279. other_ratio = sales.get("percent")
  280. other_ratio = decimal_to_percent_with_dist(other_ratio) if other_ratio else "1%"
  281. return live_ratio, video_ratio, other_ratio
  282. else:
  283. log.warning(f"Error get_kgj_product_overview: {resp_json.get('message')}")
  284. # update_state(sql_pool, product_id, "product_state", 2)
  285. return "0%", "0%", "0%"
  286. else:
  287. log.warning(f"Error get_kgj_product_overview resp_json")
  288. # update_state(sql_pool, product_id, "product_state", 2)
  289. return "0%", "0%", "0%"
  290. def parse_product_overview(log, sql_pool, resp_json: dict, product_id, headers):
  291. log.info("开始解析 product_overview 数据......................")
  292. trend_list = resp_json.get("trend_list", [])
  293. if not trend_list:
  294. log.debug(f"parse_product_overview trend_list is empty")
  295. update_state(sql_pool, product_id, "product_state", 2)
  296. else:
  297. # 获取商品概览页的占比信息
  298. live_ratio, video_ratio, other_ratio = get_product_overview_percent(log, product_id, headers)
  299. info_list = []
  300. for trend in trend_list:
  301. date_all = trend.get("date_all")
  302. sales_str = trend.get("sales_str")
  303. live_sales_str = trend.get("live_sales_str")
  304. video_sales_str = trend.get("video_sales_str")
  305. other_sales_str = trend.get("other_sales_str")
  306. users_str = trend.get("users_str")
  307. lives_str = trend.get("lives_str")
  308. videos_str = trend.get("videos_str")
  309. info = (
  310. product_id, date_all, sales_str, live_sales_str, video_sales_str, other_sales_str, users_str, lives_str,
  311. videos_str, live_ratio, video_ratio, other_ratio)
  312. info_list.append(info)
  313. save_product_overview(sql_pool, info_list)
  314. # sql = "UPDATE kgj_product_list_record SET product_state = 2 WHERE product_id = %s"
  315. update_state(sql_pool, product_id, "product_state", 1)
  316. def get_kgj_product_overview(log, sql_pool, product_id, headers):
  317. url = f"https://service.kaogujia.com/api/sku/trend/{product_id}"
  318. params = {
  319. "begin": get_date(-7),
  320. # "begin": "20250115",
  321. # "end": "20250121"
  322. "end": get_date(-1)
  323. }
  324. response = requests.get(url, headers=headers, params=params)
  325. # print(response.text)
  326. response.raise_for_status()
  327. resp_json = response.json()
  328. if resp_json:
  329. if resp_json.get("code") == 200:
  330. enc_data = resp_json.get("data")
  331. par_url = f'/api/sku/trend/{product_id}'
  332. dec_data = decrypt_data(log, par_url, enc_data)
  333. # print(dec_data)
  334. parse_product_overview(log, sql_pool, dec_data, product_id, headers)
  335. else:
  336. log.warning(f"Error get_kgj_product_overview: {resp_json.get('message')}")
  337. update_state(sql_pool, product_id, "product_state", 2)
  338. else:
  339. log.warning(f"Error get_kgj_product_overview resp_json")
  340. update_state(sql_pool, product_id, "product_state", 2)
  341. def save_linded_live(sql_pool, info_list):
  342. sql = """
  343. INSERT INTO kgj_kapai_linked_live_record (product_id, title, duration_str, pub_time_str, uid, nick_name, fans_count, price, price_str, sales, gmv, pi, room_id, date_code)
  344. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  345. """
  346. sql_pool.insert_all(sql, info_list)
  347. def parse_linked_live(log, dec_data, sql_pool, product_id):
  348. log.info("开始解析 linked_live 数据......................")
  349. items = dec_data.get("items", [])
  350. if not items:
  351. log.debug(f"parse_linked_live items is empty")
  352. update_state(sql_pool, product_id, "live_state", 2)
  353. else:
  354. info_list = []
  355. for item in items:
  356. title = item.get("title")
  357. duration = item.get("duration")
  358. duration_str = convert_seconds_to_hours_minutes(duration)
  359. pub_time = item.get("pub_time")
  360. pub_time_str = datetime.fromtimestamp(pub_time).strftime('%Y-%m-%d %H:%M:%S') if pub_time else ""
  361. uid = item.get("uid")
  362. nick_name = item.get("nick_name")
  363. fans_count = item.get("fans_count")
  364. price = item.get("price")
  365. price_str = item.get("price_str")
  366. sales = item.get("sales")
  367. gmv = item.get("gmv")
  368. pi = item.get("pi")
  369. room_id = item.get("room_id")
  370. date_code = item.get("date_code")
  371. info = (
  372. product_id, title, duration_str, pub_time_str, uid, nick_name, fans_count, price, price_str, sales, gmv,
  373. pi, room_id, date_code)
  374. info_list.append(info)
  375. save_linded_live(sql_pool, info_list)
  376. update_state(sql_pool, product_id, "live_state", 1)
  377. def get_linked_live(log, sql_pool, product_id, headers):
  378. url = "https://service.kaogujia.com/api/sku/live/list"
  379. params = {
  380. "limit": "10",
  381. "page": "1",
  382. "sort_field": "gmv",
  383. "sort": "0"
  384. }
  385. data = {
  386. "keyword": "",
  387. "min_time": get_date(-6),
  388. "max_time": get_date(0),
  389. "product_id": product_id
  390. }
  391. response = requests.post(url, headers=headers, params=params, json=data)
  392. # print(response.text)
  393. response.raise_for_status()
  394. resp_json = response.json()
  395. if resp_json:
  396. if resp_json.get("code") == 200:
  397. enc_data = resp_json.get("data")
  398. par_url = '/api/sku/live/list'
  399. dec_data = decrypt_data(log, par_url, enc_data)
  400. # print(dec_data)
  401. parse_linked_live(log, dec_data, sql_pool, product_id)
  402. else:
  403. log.warning(f"Error get_linked_live: {resp_json.get('message')}")
  404. update_state(sql_pool, product_id, "live_state", 2)
  405. else:
  406. log.warning(f"Error get_linked_live resp_json")
  407. update_state(sql_pool, product_id, "live_state", 2)
  408. @retry(stop=stop_after_attempt(500), wait=wait_fixed(600), after=after_log)
  409. def kgj_kapai_main(log):
  410. log.info("开始运行 kgj_kapai_main 爬虫任务............................................................")
  411. sql_pool = MySQLConnectionPool(log=log)
  412. if not sql_pool:
  413. log.error("数据库连接失败")
  414. raise Exception("数据库连接失败")
  415. kgj_token = sql_pool.select_one("SELECT token FROM kgj_token")
  416. # 195的账号
  417. KGJ_HEADERS = {
  418. "accept": "*/*",
  419. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  420. "authorization": kgj_token[0],
  421. "content-type": "application/json",
  422. "origin": "https://www.kaogujia.com",
  423. "priority": "u=1, i",
  424. "referer": "https://www.kaogujia.com/",
  425. "sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
  426. "sec-ch-ua-mobile": "?0",
  427. "sec-ch-ua-platform": "\"Windows\"",
  428. "sec-fetch-dest": "empty",
  429. "sec-fetch-mode": "cors",
  430. "sec-fetch-site": "same-site",
  431. "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
  432. "version_code": "3.1"
  433. }
  434. try:
  435. keyword = "卡牌"
  436. log.info("开始获取 product_list 数据............................................................")
  437. # sql_product_id_list = sql_pool.select_all("SELECT DISTINCT product_id FROM kgj_kapai_product_list_record")
  438. # sql_product_id_list = [item[0] for item in sql_product_id_list]
  439. try:
  440. get_kgj_product_list(log, keyword, sql_pool, KGJ_HEADERS)
  441. except Exception as e:
  442. log.error(f"Error main -> getting kgj product list: {e}")
  443. # sql_product_id_list.clear()
  444. log.info("product_list 数据获取完成............................................................")
  445. time.sleep(5)
  446. log.info("开始获取 product_overview 数据............................................................")
  447. product_id_list_for_product = sql_pool.select_all(
  448. "SELECT product_id FROM kgj_kapai_product_list_record WHERE product_state = 0")
  449. product_id_list = [item[0] for item in product_id_list_for_product]
  450. for product_id in product_id_list:
  451. try:
  452. log.info(f"开始获取 product_id: {product_id} 的 product_overview 数据............................")
  453. get_kgj_product_overview(log, sql_pool, product_id, KGJ_HEADERS)
  454. except Exception as e:
  455. log.error(f"Error main -> getting kgj product overview: {e}")
  456. time.sleep(random.randint(4, 6))
  457. continue
  458. time.sleep(random.randint(4, 6))
  459. log.info("product_overview 数据获取完成............................................................")
  460. time.sleep(5)
  461. log.info("开始获取 linked_live 数据............................................................")
  462. product_id_list_for_live = sql_pool.select_all(
  463. "SELECT product_id FROM kgj_kapai_product_list_record WHERE live_state = 0")
  464. product_id_list = [item[0] for item in product_id_list_for_live]
  465. for product_id in product_id_list:
  466. try:
  467. log.info(f"开始获取 product_id: {product_id} 的 linked_live 数据............................")
  468. get_linked_live(log, sql_pool, product_id, KGJ_HEADERS)
  469. except Exception as e:
  470. log.error(f"Error main -> getting kgj linked_live: {e}")
  471. time.sleep(random.randint(4, 6))
  472. continue
  473. time.sleep(random.randint(4, 6))
  474. log.info("linked_live 数据获取完成............................................................")
  475. time.sleep(5)
  476. log.info("开始获取 live_detail 数据............................................................")
  477. sql_room_id_list = sql_pool.select_all(
  478. "SELECT id, uid, room_id, date_code FROM kgj_kapai_linked_live_record WHERE live_detail_state = 0 and uid is not null and room_id is not null and date_code is not null")
  479. sql_room_id_list = [item for item in sql_room_id_list]
  480. if sql_room_id_list:
  481. for sql_info in sql_room_id_list:
  482. try:
  483. log.info(f"开始获取 room_id: {sql_info[2]} 的 live_detail 数据............................")
  484. live_detail_info_list = get_live_detail(log, sql_info, KGJ_HEADERS)
  485. save_live_detail(sql_pool, live_detail_info_list)
  486. update_linked_live_state(sql_pool, sql_info[0])
  487. except Exception as e:
  488. log.error(f"Error main -> getting kgj live_detail: {e}")
  489. # update_linked_live_state(sql_pool, sql_info[0])
  490. time.sleep(random.randint(4, 6))
  491. continue
  492. time.sleep(random.randint(4, 6))
  493. log.info("live_detail 数据获取完成............................................................")
  494. except Exception as e:
  495. log.error(e)
  496. finally:
  497. log.info("爬虫程序 kgj_kapai_main 运行结束,等待下一轮的采集任务.............")
  498. def schedule_task():
  499. """
  500. 设置定时任务
  501. """
  502. # 立即运行一次任务
  503. # kgj_kapai_main(logger)
  504. # 设置定时任务 考古加 -> 卡牌 一周一次 卡牌类的周三跑 抓取时间比较久 和其他几个类错开时间
  505. schedule.every().wednesday.at("01:01").do(kgj_kapai_main, logger)
  506. while True:
  507. schedule.run_pending()
  508. time.sleep(1)
  509. if __name__ == '__main__':
  510. schedule_task()