||
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/3/17 17:22
- import inspect
- import schedule
- from airtest.core.api import *
- from poco.drivers.android.uiautomation import AndroidUiautomationPoco
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- # from poco.exceptions import PocoNoSuchNodeException
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- PACKAGE_NAME = 'com.jihuanshe'
- init_device("Android")
- # 在Airtest中, init_device 和 connect_device 是两个用于初始化设备的函数。
- #
- # init_device 函数用于初始化设备连接,它会自动检测并连接可用的设备。你可以通过传递设备的参数来指定要连接的设备,例如设备的序列号、设备类型等。
- #
- # connect_device 函数用于手动连接设备。你需要提供设备的参数,例如设备的序列号、设备类型等,来指定要连接的设备。
- #
- # 总结起来, init_device 函数会自动检测并连接可用的设备,而 connect_device 函数需要手动指定要连接的设备。
- stop_app(PACKAGE_NAME)
- # time.sleep(11)
- start_app(PACKAGE_NAME)
- """
- # 如果ADB无法找到设备,尝试重启ADB服务
- D:\投屏\QtScrcpy-win-x64-v3.3.1\adb.exe kill-server
- """
- category_list = [
- {'游戏王': ['日文', '简中']},
- {'宝可梦': ['简中', '日文', '英文']},
- {'航海王': ['简中', '日文']},
- {'符文战场': ['简中']},
- {'WS黑白双翼': ['日文', '简中']},
- {'数码宝贝': ['简中']},
- {'迪士尼 洛卡纳': ['简中']},
- {'携站之境UA': ['简中', '日文']},
- {'VG卡片战斗先导者': ['简中', '日文']},
- {'高达GCG': ['简中']},
- {'影之诗': ['简中']}
- ]
- poco = AndroidUiautomationPoco(
- use_airtest_input=True, screenshot_each_action=False)
- window_width, window_height = poco.get_screen_size()
- def scrape_index():
- # 等待进去首页
- time.sleep(10)
- # 判断是否有广告 需要点击
- tag_adv = poco(f'{PACKAGE_NAME}:id/ivClose')
- for _ in range(3):
- if tag_adv.exists():
- logger.warning("广告元素已找到")
- tag_adv.click()
- time.sleep(1)
- else:
- logger.debug("广告元素未找到, 退出")
- break
- elements = poco(f'{PACKAGE_NAME}:id/imageIv').wait(60)
- # elements.wait_for_appearance(timeout=60)
- elements.click()
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def scrape_detail(element):
- try:
- # element.click()
- # panel = poco_(f'{PACKAGE_NAME}:id/content')
- # panel.wait_for_appearance(5)
- element_title = element.offspring(f'{PACKAGE_NAME}:id/tvName') # 标题
- if element_title.exists():
- title = element_title.attr('text')
- else:
- logger.warning("title元素未找到")
- return None
- # element_cardImage = element.offspring(f'{PACKAGE_NAME}:id/cardImage')
- # if element_cardImage.exists():
- # cardImage = element_cardImage.attr('text')
- # else:
- # logger.warning("cardImage元素未找到")
- # return None
- element_tvSalesDataNum = element.offspring(f'{PACKAGE_NAME}:id/tvSalesDataNum') # 销量
- if element_tvSalesDataNum.exists():
- tvSalesDataNum = element_tvSalesDataNum.attr('text')
- else:
- logger.warning("tvSalesDataNum元素未找到")
- ele_tvSalesData = element.offspring(f'{PACKAGE_NAME}:id/tvSalesData')
- if ele_tvSalesData.exists():
- tvSalesDataNum = ele_tvSalesData.attr('text')
- if '张' in tvSalesDataNum:
- tvSalesDataNum = tvSalesDataNum.replace('张', 0)
- if '销量' in tvSalesDataNum:
- tvSalesDataNum = tvSalesDataNum.replace('销量', 0)
- logger.debug(f'tvSalesDataNum元素未找到,使用tvSalesData元素')
- else:
- tvSalesDataNum = None
- # return None
- element_tvNumber = element.offspring(f'{PACKAGE_NAME}:id/tvNumber') # 编号
- if element_tvNumber.exists():
- tvNumber = element_tvNumber.attr('text')
- else:
- logger.warning("tvNumber元素未找到")
- return None
- element_tvRarity = element.offspring(f'{PACKAGE_NAME}:id/tvRarity') # 稀有度
- if element_tvRarity.exists():
- tvRarity = element_tvRarity.attr('text')
- else:
- logger.warning("tvRarity元素未找到")
- return None
- element_tvCurSalesNum = element.offspring(f'{PACKAGE_NAME}:id/tvCurSalesNum') # 上周期销量
- if element_tvCurSalesNum.exists():
- tvCurSalesNum = element_tvCurSalesNum.attr('text')
- if '张' in tvCurSalesNum:
- tvCurSalesNum = tvCurSalesNum.replace('张', '')
- else:
- logger.warning("tvCurSalesNum元素未找到")
- return None
- element_tvLastSalesNum = element.offspring(f'{PACKAGE_NAME}:id/tvLastSalesNum') # 本周期销量涨跌幅
- if element_tvLastSalesNum.exists():
- tvLastSalesNum = element_tvLastSalesNum.attr('text')
- else:
- logger.warning("tvLastSalesNum元素未找到")
- return None
- element_tvPrice = element.offspring(f'{PACKAGE_NAME}:id/tvPrice') # 本周期平均集换价
- if element_tvPrice.exists():
- tvPrice = element_tvPrice.attr('text')
- else:
- logger.warning("tvPrice元素未找到")
- return None
- element_tvRank = element.offspring(f'{PACKAGE_NAME}:id/tvRank')
- if element_tvRank.exists():
- tvRank = element_tvRank.attr('text')
- else:
- logger.warning("tvRank元素未找到, 查找rankIv元素(前三名)")
- element_rankIv = element.offspring(f'{PACKAGE_NAME}:id/rankIv')
- if element_rankIv.exists():
- tvRank = element_rankIv.attr('text')
- logger.debug(f'tvRank元素未找到,使用rankIv元素(前三名)')
- logger.debug(tvRank)
- else:
- tvRank = None
- # keyevent('BACK')
- data_dict = {
- 'title': title,
- # 'card_image': cardImage,
- 'tv_sales_data_num': tvSalesDataNum,
- 'tv_number': tvNumber,
- 'tv_rarity': tvRarity,
- 'tv_cur_sales_num': tvCurSalesNum,
- 'tv_last_sales_num': tvLastSalesNum,
- 'tv_price': tvPrice,
- 'tv_rank': tvRank
- }
- # logger.debug(f'data_dict:{data_dict}')
- return data_dict
- except Exception as e:
- logger.error(f"抓取详情时发生错误: {e}")
- return None
- def parse_sales_data(raw_text):
- """
- 解析销售数据文本
- Args:
- raw_text (str): 原始文本数据
- Returns:
- dict: 解析后的数据字典
- """
- raw_list = raw_text.split("\n")
- # 确保第一个元素是数字,如果不是则插入空字符串
- if not raw_list[0].isdigit():
- raw_list.insert(0, '')
- # 检查列表长度是否足够
- if len(raw_list) < 11:
- raise ValueError("输入数据不完整")
- tv_rank = raw_list[0]
- title = raw_list[1]
- tv_number = raw_list[2]
- tv_rarity = raw_list[3]
- tv_cur_sales_num = raw_list[5]
- tv_cur_sales_num = tv_cur_sales_num.replace('张', '') if '张' in tv_cur_sales_num else tv_cur_sales_num
- # tv_last_sales_num = raw_list[7]
- # 增加判断:如果第7行和第8行连续是"本周期销量涨跌幅"和"本周期平均集换价",
- # 则在它们之间插入'-',并将tv_last_sales_num设为'-'
- if len(raw_list) >= 8 and raw_list[6] == "本周期销量涨跌幅" and raw_list[7] == "本周期平均集换价":
- # 在"本周期销量涨跌幅"后插入'-'
- raw_list.insert(7, '-')
- tv_last_sales_num = '-'
- else:
- try:
- tv_last_sales_num = raw_list[7]
- except IndexError:
- tv_last_sales_num = ''
- tv_price = raw_list[9]
- tv_price = tv_price.replace('¥', '') if '¥' in tv_price else tv_price
- tv_sales_data_num = raw_list[10]
- data_dict = {
- 'tv_rank': tv_rank,
- 'title': title,
- 'tv_number': tv_number,
- 'tv_rarity': tv_rarity,
- 'tv_cur_sales_num': tv_cur_sales_num,
- 'tv_last_sales_num': tv_last_sales_num,
- 'tv_price': tv_price,
- 'tv_sales_data_num': tv_sales_data_num
- }
- return data_dict
- def scroll_up():
- # swipe((window_width * 0.5, window_height * 0.8),
- # vector=[0, -0.5], duration=1)
- swipe(
- v1=(0.5, 0.9), # 起始点(底部)
- v2=(0.5, 0.7), # 结束点(顶部)
- duration=1.0 # 滑动持续 1 秒(单位:秒)
- )
- def is_page_changed(old_elements, new_elements):
- """检查页面是否成功翻页"""
- if not old_elements or not new_elements:
- return True
- # 比较多个元素来判断页面是否变化
- # min_len = min(len(old_elements), len(new_elements), 5) # 比较前5个元素
- # for i in range(min_len):
- old_title_element = old_elements[-1].offspring(f'{PACKAGE_NAME}:id/tvName')
- new_title_element = new_elements[-1].offspring(f'{PACKAGE_NAME}:id/tvName')
- if old_title_element.exists() and new_title_element.exists():
- old_title = old_title_element.attr('text')
- new_title = new_title_element.attr('text')
- logger.debug(f'old_title: {old_title}, new_title: {new_title}')
- if old_title != new_title:
- return True # 找到不同元素,页面已变化
- else:
- logger.debug(f'old_title: {old_title}, new_title: {new_title}')
- return False
- else:
- logger.warning("title元素未找到")
- return True
- def get_data(sql_pool):
- """
- 获取数据
- :param sql_pool: MySQL连接池对象
- """
- scrape_index()
- for ca_idx, category_value in enumerate(category_list):
- category = list(category_value.keys())[0]
- lang_list = list(category_value.values())[0]
- logger.debug(
- f'当前分类索引: {ca_idx}, 分类名称: {category}, 语言列表: {lang_list} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- for lang_idx, language in enumerate(lang_list):
- # 点击下拉框列表
- # tag_drop_down = poco_(f'{PACKAGE_NAME}:id/tvName')
- tag_drop_down = \
- poco("android:id/content").child("android.widget.FrameLayout").child(
- "android.widget.FrameLayout").child(
- "android.view.View").child("android.view.View").child("android.view.View").child(
- "android.widget.Button")[1]
- tag_drop_down.click()
- # 获取集换榜
- tag_bang_list = poco("android:id/content").child("android.widget.FrameLayout").child(
- "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child(
- "android.view.View").child("android.widget.ScrollView")
- # tag_bang_list.wait_for_appearance(timeout=60)
- if not tag_bang_list.exists():
- logger.warning("tag_bang_list元素未找到")
- else:
- # 断连重跑
- # if tag_idx > 7:
- # logger.warning("已断连重跑")
- # 在 tag_bang_list 循环内部,当 ca_idx > 7 时,需要滑动下拉菜单
- if ca_idx > 8:
- logger.debug('滑动下拉菜单')
- # tag_drop_down.click()
- poco("android:id/content").child("android.widget.FrameLayout").child(
- "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child(
- "android.view.View").child("android.widget.ScrollView").child("android.widget.Button")[
- 7].swipe(
- [0.0035, -0.3152])
- time.sleep(2)
- poco("android:id/content").child("android.widget.FrameLayout").child(
- "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child(
- "android.view.View").child("android.widget.ScrollView").child("android.widget.Button")[
- ca_idx - 3].click()
- logger.info(f'点击 集换榜分类 -> {category}')
- else:
- poco("android:id/content").child("android.widget.FrameLayout").child(
- "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child(
- "android.view.View").child("android.widget.ScrollView").child("android.widget.Button")[
- ca_idx].click()
- logger.info(f'点击 集换榜分类 -> {category}')
- # tag_bang.wait_for_appearance(timeout=60)
- # time.sleep(2)
- # 点击语言分类
- logger.info(f'当前语言名称: {language} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- # tag_lang_list = poco_("android:id/content").child("android.widget.FrameLayout").child(
- # "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child(
- # "android.view.View").child("android.view.View")[1].child("android.widget.Button")[lang_idx]
- tag_lang_list = poco(language)
- tag_lang_list.click()
- # time.sleep(2)
- # 点击畅销榜
- touch(Template(r"tpl1761026966680.png", record_pos=(0.048, -0.781), resolution=(1080, 2340)))
- # try:
- # # 定位滚动容器
- # scroll_view = poco_("android:id/content").child("android.widget.FrameLayout").child("android.widget.FrameLayout").child("android.view.View").child("android.view.View").child("android.view.View").child("android.view.View")[0]
- # # print(scroll_view.attr("text"))
- #
- # # 方法1:优先用文本匹配(最直观)
- # target = scroll_view.offspring(descContains="畅销榜")
- # if target.exists():
- # target.click()
- # else:
- # print("未找到“畅销榜”标签")
- #
- # except PocoNoSuchNodeException as e:
- # print("找不到指定的 UI 路径:", e)
- time.sleep(2)
- # 数据列表 -> 翻页
- stop_page = False
- # previous_elements = None
- page_count = 1
- max_pages = 30
- data_list = []
- while page_count <= max_pages and not stop_page:
- if page_count > max_pages:
- logger.warning("已达到最大翻页次数,停止翻页")
- break
- # 翻页操作
- if page_count == 1:
- logger.debug("第一页开始翻页")
- scroll_up()
- # scroll_up()
- else:
- logger.debug(f"{page_count}翻页.........")
- scroll_up()
- scroll_up()
- scroll_up()
- scroll_up()
- tag_list_view = poco('android.widget.ScrollView')
- # tag_list_view.wait_for_appearance(timeout=60)
- elements = tag_list_view.offspring('android.widget.Button')
- # current_elements = list(elements)[1:]
- for element in elements:
- element_title = element.child()
- if not element_title.exists():
- continue
- # element_data = scrape_detail(element)
- tag_element_data = element_title.attr('name')
- # print(f'element_data:{tag_element_data}')
- try:
- if '加入心愿单' not in tag_element_data:
- element_data = parse_sales_data(tag_element_data)
- logger.info(element_data)
- if element_data:
- filtered_data = {k: v for k, v in element_data.items() if k != 'tv_rank'}
- filtered_data.update({
- 'category': category,
- 'language': language,
- 'crawler_date': time.strftime("%Y-%m-%d", time.localtime())
- })
- logger.debug(f'scraped data: {filtered_data}')
- data_list.append(filtered_data)
- # 检查排名是否为100,如果是则停止翻页
- if element_data.get('tv_rank') == '100':
- logger.success(
- f'已获取第 {page_count} 页数据, 第 {element_data["tv_rank"]} 名, 停止翻页....')
- stop_page = True
- break
- else:
- logger.warning("未获取到数据")
- else:
- logger.debug('<加入心愿单> 字样在tag_element_data, 跳过')
- except ValueError as e:
- logger.error(f"数据解析错误: {e}")
- if stop_page:
- logger.debug(f'已获取 {len(data_list)} 条数据, 停止翻页....')
- break
- page_count += 1
- time.sleep(1)
- logger.success(
- '---------------------------------------------------------------------------------------------')
- # 保存数据
- # 替换掉原来的 list(set(...)) 实现去重
- seen = set()
- unique_data_list = []
- for item in data_list:
- frozen_item = frozenset(item.items())
- if frozen_item not in seen:
- seen.add(frozen_item)
- unique_data_list.append(item)
- data_list = unique_data_list
- sql_pool.insert_many(table='jhs_bestseller_record', data_list=data_list, ignore=True)
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def jhs_app_bestseller_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool:
- log.error("MySQL数据库连接失败")
- raise Exception("MySQL数据库连接失败")
- try:
- get_data(sql_pool)
- # stop_app(PACKAGE_NAME)
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- # schedule_task()
- jhs_app_bestseller_main(logger)
|