# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/3/17 17:22 import inspect import schedule from airtest.core.api import * from poco.drivers.android.uiautomation import AndroidUiautomationPoco from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed # from poco.exceptions import PocoNoSuchNodeException logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") PACKAGE_NAME = 'com.jihuanshe' init_device("Android") # 在Airtest中, init_device 和 connect_device 是两个用于初始化设备的函数。 # # init_device 函数用于初始化设备连接,它会自动检测并连接可用的设备。你可以通过传递设备的参数来指定要连接的设备,例如设备的序列号、设备类型等。 # # connect_device 函数用于手动连接设备。你需要提供设备的参数,例如设备的序列号、设备类型等,来指定要连接的设备。 # # 总结起来, init_device 函数会自动检测并连接可用的设备,而 connect_device 函数需要手动指定要连接的设备。 stop_app(PACKAGE_NAME) # time.sleep(11) start_app(PACKAGE_NAME) """ # 如果ADB无法找到设备,尝试重启ADB服务 D:\投屏\QtScrcpy-win-x64-v3.3.1\adb.exe kill-server """ category_list = [ {'游戏王': ['日文', '简中']}, {'宝可梦': ['简中', '日文', '英文']}, {'航海王': ['简中', '日文']}, {'符文战场': ['简中']}, {'WS黑白双翼': ['日文', '简中']}, {'数码宝贝': ['简中']}, {'迪士尼 洛卡纳': ['简中']}, {'携站之境UA': ['简中', '日文']}, {'VG卡片战斗先导者': ['简中', '日文']}, {'高达GCG': ['简中']}, {'影之诗': ['简中']} ] poco = AndroidUiautomationPoco( use_airtest_input=True, screenshot_each_action=False) window_width, window_height = poco.get_screen_size() def scrape_index(): # 等待进去首页 time.sleep(10) # 判断是否有广告 需要点击 tag_adv = poco(f'{PACKAGE_NAME}:id/ivClose') for _ in range(3): if tag_adv.exists(): logger.warning("广告元素已找到") tag_adv.click() time.sleep(1) else: logger.debug("广告元素未找到, 退出") break elements = poco(f'{PACKAGE_NAME}:id/imageIv').wait(60) # elements.wait_for_appearance(timeout=60) elements.click() def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") def scrape_detail(element): try: # element.click() # panel = poco_(f'{PACKAGE_NAME}:id/content') # panel.wait_for_appearance(5) element_title = element.offspring(f'{PACKAGE_NAME}:id/tvName') # 标题 if element_title.exists(): title = element_title.attr('text') else: logger.warning("title元素未找到") return None # element_cardImage = element.offspring(f'{PACKAGE_NAME}:id/cardImage') # if element_cardImage.exists(): # cardImage = element_cardImage.attr('text') # else: # logger.warning("cardImage元素未找到") # return None element_tvSalesDataNum = element.offspring(f'{PACKAGE_NAME}:id/tvSalesDataNum') # 销量 if element_tvSalesDataNum.exists(): tvSalesDataNum = element_tvSalesDataNum.attr('text') else: logger.warning("tvSalesDataNum元素未找到") ele_tvSalesData = element.offspring(f'{PACKAGE_NAME}:id/tvSalesData') if ele_tvSalesData.exists(): tvSalesDataNum = ele_tvSalesData.attr('text') if '张' in tvSalesDataNum: tvSalesDataNum = tvSalesDataNum.replace('张', 0) if '销量' in tvSalesDataNum: tvSalesDataNum = tvSalesDataNum.replace('销量', 0) logger.debug(f'tvSalesDataNum元素未找到,使用tvSalesData元素') else: tvSalesDataNum = None # return None element_tvNumber = element.offspring(f'{PACKAGE_NAME}:id/tvNumber') # 编号 if element_tvNumber.exists(): tvNumber = element_tvNumber.attr('text') else: logger.warning("tvNumber元素未找到") return None element_tvRarity = element.offspring(f'{PACKAGE_NAME}:id/tvRarity') # 稀有度 if element_tvRarity.exists(): tvRarity = element_tvRarity.attr('text') else: logger.warning("tvRarity元素未找到") return None element_tvCurSalesNum = element.offspring(f'{PACKAGE_NAME}:id/tvCurSalesNum') # 上周期销量 if element_tvCurSalesNum.exists(): tvCurSalesNum = element_tvCurSalesNum.attr('text') if '张' in tvCurSalesNum: tvCurSalesNum = tvCurSalesNum.replace('张', '') else: logger.warning("tvCurSalesNum元素未找到") return None element_tvLastSalesNum = element.offspring(f'{PACKAGE_NAME}:id/tvLastSalesNum') # 本周期销量涨跌幅 if element_tvLastSalesNum.exists(): tvLastSalesNum = element_tvLastSalesNum.attr('text') else: logger.warning("tvLastSalesNum元素未找到") return None element_tvPrice = element.offspring(f'{PACKAGE_NAME}:id/tvPrice') # 本周期平均集换价 if element_tvPrice.exists(): tvPrice = element_tvPrice.attr('text') else: logger.warning("tvPrice元素未找到") return None element_tvRank = element.offspring(f'{PACKAGE_NAME}:id/tvRank') if element_tvRank.exists(): tvRank = element_tvRank.attr('text') else: logger.warning("tvRank元素未找到, 查找rankIv元素(前三名)") element_rankIv = element.offspring(f'{PACKAGE_NAME}:id/rankIv') if element_rankIv.exists(): tvRank = element_rankIv.attr('text') logger.debug(f'tvRank元素未找到,使用rankIv元素(前三名)') logger.debug(tvRank) else: tvRank = None # keyevent('BACK') data_dict = { 'title': title, # 'card_image': cardImage, 'tv_sales_data_num': tvSalesDataNum, 'tv_number': tvNumber, 'tv_rarity': tvRarity, 'tv_cur_sales_num': tvCurSalesNum, 'tv_last_sales_num': tvLastSalesNum, 'tv_price': tvPrice, 'tv_rank': tvRank } # logger.debug(f'data_dict:{data_dict}') return data_dict except Exception as e: logger.error(f"抓取详情时发生错误: {e}") return None def parse_sales_data(raw_text): """ 解析销售数据文本 Args: raw_text (str): 原始文本数据 Returns: dict: 解析后的数据字典 """ raw_list = raw_text.split("\n") # 确保第一个元素是数字,如果不是则插入空字符串 if not raw_list[0].isdigit(): raw_list.insert(0, '') # 检查列表长度是否足够 if len(raw_list) < 11: raise ValueError("输入数据不完整") tv_rank = raw_list[0] title = raw_list[1] tv_number = raw_list[2] tv_rarity = raw_list[3] tv_cur_sales_num = raw_list[5] tv_cur_sales_num = tv_cur_sales_num.replace('张', '') if '张' in tv_cur_sales_num else tv_cur_sales_num # tv_last_sales_num = raw_list[7] # 增加判断:如果第7行和第8行连续是"本周期销量涨跌幅"和"本周期平均集换价", # 则在它们之间插入'-',并将tv_last_sales_num设为'-' if len(raw_list) >= 8 and raw_list[6] == "本周期销量涨跌幅" and raw_list[7] == "本周期平均集换价": # 在"本周期销量涨跌幅"后插入'-' raw_list.insert(7, '-') tv_last_sales_num = '-' else: try: tv_last_sales_num = raw_list[7] except IndexError: tv_last_sales_num = '' tv_price = raw_list[9] tv_price = tv_price.replace('¥', '') if '¥' in tv_price else tv_price tv_sales_data_num = raw_list[10] data_dict = { 'tv_rank': tv_rank, 'title': title, 'tv_number': tv_number, 'tv_rarity': tv_rarity, 'tv_cur_sales_num': tv_cur_sales_num, 'tv_last_sales_num': tv_last_sales_num, 'tv_price': tv_price, 'tv_sales_data_num': tv_sales_data_num } return data_dict def scroll_up(): # swipe((window_width * 0.5, window_height * 0.8), # vector=[0, -0.5], duration=1) swipe( v1=(0.5, 0.9), # 起始点(底部) v2=(0.5, 0.7), # 结束点(顶部) duration=1.0 # 滑动持续 1 秒(单位:秒) ) def is_page_changed(old_elements, new_elements): """检查页面是否成功翻页""" if not old_elements or not new_elements: return True # 比较多个元素来判断页面是否变化 # min_len = min(len(old_elements), len(new_elements), 5) # 比较前5个元素 # for i in range(min_len): old_title_element = old_elements[-1].offspring(f'{PACKAGE_NAME}:id/tvName') new_title_element = new_elements[-1].offspring(f'{PACKAGE_NAME}:id/tvName') if old_title_element.exists() and new_title_element.exists(): old_title = old_title_element.attr('text') new_title = new_title_element.attr('text') logger.debug(f'old_title: {old_title}, new_title: {new_title}') if old_title != new_title: return True # 找到不同元素,页面已变化 else: logger.debug(f'old_title: {old_title}, new_title: {new_title}') return False else: logger.warning("title元素未找到") return True def get_data(sql_pool): """ 获取数据 :param sql_pool: MySQL连接池对象 """ scrape_index() for ca_idx, category_value in enumerate(category_list): category = list(category_value.keys())[0] lang_list = list(category_value.values())[0] logger.debug( f'当前分类索引: {ca_idx}, 分类名称: {category}, 语言列表: {lang_list} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') for lang_idx, language in enumerate(lang_list): # 点击下拉框列表 # tag_drop_down = poco_(f'{PACKAGE_NAME}:id/tvName') tag_drop_down = \ poco("android:id/content").child("android.widget.FrameLayout").child( "android.widget.FrameLayout").child( "android.view.View").child("android.view.View").child("android.view.View").child( "android.widget.Button")[1] tag_drop_down.click() # 获取集换榜 tag_bang_list = poco("android:id/content").child("android.widget.FrameLayout").child( "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child( "android.view.View").child("android.widget.ScrollView") # tag_bang_list.wait_for_appearance(timeout=60) if not tag_bang_list.exists(): logger.warning("tag_bang_list元素未找到") else: # 断连重跑 # if tag_idx > 7: # logger.warning("已断连重跑") # 在 tag_bang_list 循环内部,当 ca_idx > 7 时,需要滑动下拉菜单 if ca_idx > 8: logger.debug('滑动下拉菜单') # tag_drop_down.click() poco("android:id/content").child("android.widget.FrameLayout").child( "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child( "android.view.View").child("android.widget.ScrollView").child("android.widget.Button")[ 7].swipe( [0.0035, -0.3152]) time.sleep(2) poco("android:id/content").child("android.widget.FrameLayout").child( "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child( "android.view.View").child("android.widget.ScrollView").child("android.widget.Button")[ ca_idx - 3].click() logger.info(f'点击 集换榜分类 -> {category}') else: poco("android:id/content").child("android.widget.FrameLayout").child( "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child( "android.view.View").child("android.widget.ScrollView").child("android.widget.Button")[ ca_idx].click() logger.info(f'点击 集换榜分类 -> {category}') # tag_bang.wait_for_appearance(timeout=60) # time.sleep(2) # 点击语言分类 logger.info(f'当前语言名称: {language} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') # tag_lang_list = poco_("android:id/content").child("android.widget.FrameLayout").child( # "android.widget.FrameLayout").child("android.view.View").child("android.view.View").child( # "android.view.View").child("android.view.View")[1].child("android.widget.Button")[lang_idx] tag_lang_list = poco(language) tag_lang_list.click() # time.sleep(2) # 点击畅销榜 touch(Template(r"tpl1761026966680.png", record_pos=(0.048, -0.781), resolution=(1080, 2340))) # try: # # 定位滚动容器 # scroll_view = poco_("android:id/content").child("android.widget.FrameLayout").child("android.widget.FrameLayout").child("android.view.View").child("android.view.View").child("android.view.View").child("android.view.View")[0] # # print(scroll_view.attr("text")) # # # 方法1:优先用文本匹配(最直观) # target = scroll_view.offspring(descContains="畅销榜") # if target.exists(): # target.click() # else: # print("未找到“畅销榜”标签") # # except PocoNoSuchNodeException as e: # print("找不到指定的 UI 路径:", e) time.sleep(2) # 数据列表 -> 翻页 stop_page = False # previous_elements = None page_count = 1 max_pages = 30 data_list = [] while page_count <= max_pages and not stop_page: if page_count > max_pages: logger.warning("已达到最大翻页次数,停止翻页") break # 翻页操作 if page_count == 1: logger.debug("第一页开始翻页") scroll_up() # scroll_up() else: logger.debug(f"{page_count}翻页.........") scroll_up() scroll_up() scroll_up() scroll_up() tag_list_view = poco('android.widget.ScrollView') # tag_list_view.wait_for_appearance(timeout=60) elements = tag_list_view.offspring('android.widget.Button') # current_elements = list(elements)[1:] for element in elements: element_title = element.child() if not element_title.exists(): continue # element_data = scrape_detail(element) tag_element_data = element_title.attr('name') # print(f'element_data:{tag_element_data}') try: if '加入心愿单' not in tag_element_data: element_data = parse_sales_data(tag_element_data) logger.info(element_data) if element_data: filtered_data = {k: v for k, v in element_data.items() if k != 'tv_rank'} filtered_data.update({ 'category': category, 'language': language, 'crawler_date': time.strftime("%Y-%m-%d", time.localtime()) }) logger.debug(f'scraped data: {filtered_data}') data_list.append(filtered_data) # 检查排名是否为100,如果是则停止翻页 if element_data.get('tv_rank') == '100': logger.success( f'已获取第 {page_count} 页数据, 第 {element_data["tv_rank"]} 名, 停止翻页....') stop_page = True break else: logger.warning("未获取到数据") else: logger.debug('<加入心愿单> 字样在tag_element_data, 跳过') except ValueError as e: logger.error(f"数据解析错误: {e}") if stop_page: logger.debug(f'已获取 {len(data_list)} 条数据, 停止翻页....') break page_count += 1 time.sleep(1) logger.success( '---------------------------------------------------------------------------------------------') # 保存数据 # 替换掉原来的 list(set(...)) 实现去重 seen = set() unique_data_list = [] for item in data_list: frozen_item = frozenset(item.items()) if frozen_item not in seen: seen.add(frozen_item) unique_data_list.append(item) data_list = unique_data_list sql_pool.insert_many(table='jhs_bestseller_record', data_list=data_list, ignore=True) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def jhs_app_bestseller_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool: log.error("MySQL数据库连接失败") raise Exception("MySQL数据库连接失败") try: get_data(sql_pool) # stop_app(PACKAGE_NAME) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': # schedule_task() jhs_app_bestseller_main(logger)