import requests import json import re import os # ================= 配置参数区域 ================= # 1. API 配置 API_URL = "http://100.64.0.8/v1/workflows/run" API_KEY = "Bearer app-qR46FHcfLyKz2kb0tiiRfV50" USER_ID = "abc-123" # 2. 滑动窗口配置 CHUNK_SIZE = 15000 # 每次发送给AI的字符长度 OVERLAP_SIZE = 500 # 窗口重叠部分的长度 (防止截断关键句子,建议 200-500) # =============================================== def send(text_chunk): """ 发送单个文本片段给 AI 接口 """ headers = { "Authorization": API_KEY, "Content-Type": "application/json" } payload = { "inputs": { "question": text_chunk }, "response_mode": "blocking", "user": USER_ID } try: response = requests.post(API_URL, headers=headers, json=payload) response.raise_for_status() # 获取 AI 返回的纯文本结果 result_text = response.json()['data']['outputs']['result'] return result_text except requests.exceptions.RequestException as e: print(f"❌ 请求发生错误: {e}") return "[]" # 发生错误返回空列表字符串,防止程序崩溃 def split_text_with_overlap(text, chunk_size, overlap): """ 生成器:将长文本按滑动窗口分割 """ start = 0 text_len = len(text) while start < text_len: end = start + chunk_size # 如果是最后一段,end 不能超过文本长度 if end > text_len: end = text_len yield text[start:end] # 如果已经到达末尾,停止循环 if end == text_len: break # 下一次的起点 = 当前起点 + 步长 (步长 = 块大小 - 重叠量) start += (chunk_size - overlap) def parse_ai_json(json_str): """ 清洗并解析 AI 返回的 JSON 字符串 """ if not json_str: return [] # 1. 移除可能存在的 Markdown 标记 (```json ... ```) cleaned_str = re.sub(r"```json\s*", "", json_str) cleaned_str = re.sub(r"```\s*", "", cleaned_str) cleaned_str = cleaned_str.strip() # 2. 尝试解析 try: return json.loads(cleaned_str) except json.JSONDecodeError: print(f"⚠️ 解析 JSON 失败,AI 返回原始内容: {cleaned_str[:100]}...") return [] def deduplicate_results(all_hits): """ 简单去重:如果时间戳和英文卡名完全一致,则视为重复 (处理滑动窗口重叠区域可能导致的重复识别) """ seen = set() unique_hits = [] for hit in all_hits: # 创建一个唯一标识 Key key = (hit.get('time'), hit.get('card_name_en')) if key not in seen: seen.add(key) unique_hits.append(hit) return unique_hits if __name__ == '__main__': # 1. 读取文件 text_path = r"C:\Code\ML\Project\untitled10\Audio\temp\transcripts\vortexcards.txt" try: with open(text_path, "r", encoding="utf-8") as f: full_text = f.read() except FileNotFoundError: print(f"❌ 找不到文件: {text_path}") exit() print(f"📄 原文总长度: {len(full_text)} 字符") all_extracted_cards = [] # 2. 开始滑动窗口处理 chunks = list(split_text_with_overlap(full_text, CHUNK_SIZE, OVERLAP_SIZE)) total_chunks = len(chunks) print(f"✂️ 将分割为 {total_chunks} 个片段进行处理 (Size: {CHUNK_SIZE}, Overlap: {OVERLAP_SIZE})...\n") for i, chunk in enumerate(chunks): print(f"⏳ 正在处理第 {i + 1}/{total_chunks} 个片段 (长度 {len(chunk)})...") # 发送请求 ai_response_str = send(chunk) # 解析结果 chunk_hits = parse_ai_json(ai_response_str) if chunk_hits: print(f" ✅ 第 {i + 1} 段识别到 {len(chunk_hits)} 张卡片") all_extracted_cards.extend(chunk_hits) else: print(f" ⚪ 第 {i + 1} 段未发现目标") # 3. 去重 (因为有重叠窗口,可能同一张卡在两段话里都被识别了) final_results = deduplicate_results(all_extracted_cards) # 4. 输出最终结果 print("\n" + "=" * 30) print(f"🎉 处理完成! 共发现 {len(final_results)} 个高光时刻 (已去重)") print("=" * 30) # 打印结果 JSON print(json.dumps(final_results, indent=2, ensure_ascii=False)) # 如果需要保存到文件 file_name = os.path.splitext(os.path.split(text_path)[-1])[0] with open(f"{file_name}.json", "w", encoding="utf-8") as f: json.dump(final_results, f, indent=2, ensure_ascii=False)