|
|
@@ -0,0 +1,160 @@
|
|
|
+import requests
|
|
|
+import json
|
|
|
+import re
|
|
|
+import os
|
|
|
+
|
|
|
+# ================= 配置参数区域 =================
|
|
|
+# 1. API 配置
|
|
|
+API_URL = "http://100.64.0.8/v1/workflows/run"
|
|
|
+API_KEY = "Bearer app-qR46FHcfLyKz2kb0tiiRfV50"
|
|
|
+USER_ID = "abc-123"
|
|
|
+
|
|
|
+# 2. 滑动窗口配置
|
|
|
+CHUNK_SIZE = 15000 # 每次发送给AI的字符长度
|
|
|
+OVERLAP_SIZE = 500 # 窗口重叠部分的长度 (防止截断关键句子,建议 200-500)
|
|
|
+
|
|
|
+
|
|
|
+# ===============================================
|
|
|
+
|
|
|
+def send(text_chunk):
|
|
|
+ """
|
|
|
+ 发送单个文本片段给 AI 接口
|
|
|
+ """
|
|
|
+ headers = {
|
|
|
+ "Authorization": API_KEY,
|
|
|
+ "Content-Type": "application/json"
|
|
|
+ }
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "inputs": {
|
|
|
+ "question": text_chunk
|
|
|
+ },
|
|
|
+ "response_mode": "blocking",
|
|
|
+ "user": USER_ID
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.post(API_URL, headers=headers, json=payload)
|
|
|
+ response.raise_for_status()
|
|
|
+
|
|
|
+ # 获取 AI 返回的纯文本结果
|
|
|
+ result_text = response.json()['data']['outputs']['result']
|
|
|
+ return result_text
|
|
|
+
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ print(f"❌ 请求发生错误: {e}")
|
|
|
+ return "[]" # 发生错误返回空列表字符串,防止程序崩溃
|
|
|
+
|
|
|
+
|
|
|
+def split_text_with_overlap(text, chunk_size, overlap):
|
|
|
+ """
|
|
|
+ 生成器:将长文本按滑动窗口分割
|
|
|
+ """
|
|
|
+ start = 0
|
|
|
+ text_len = len(text)
|
|
|
+
|
|
|
+ while start < text_len:
|
|
|
+ end = start + chunk_size
|
|
|
+ # 如果是最后一段,end 不能超过文本长度
|
|
|
+ if end > text_len:
|
|
|
+ end = text_len
|
|
|
+
|
|
|
+ yield text[start:end]
|
|
|
+
|
|
|
+ # 如果已经到达末尾,停止循环
|
|
|
+ if end == text_len:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 下一次的起点 = 当前起点 + 步长 (步长 = 块大小 - 重叠量)
|
|
|
+ start += (chunk_size - overlap)
|
|
|
+
|
|
|
+
|
|
|
+def parse_ai_json(json_str):
|
|
|
+ """
|
|
|
+ 清洗并解析 AI 返回的 JSON 字符串
|
|
|
+ """
|
|
|
+ if not json_str:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 1. 移除可能存在的 Markdown 标记 (```json ... ```)
|
|
|
+ cleaned_str = re.sub(r"```json\s*", "", json_str)
|
|
|
+ cleaned_str = re.sub(r"```\s*", "", cleaned_str)
|
|
|
+ cleaned_str = cleaned_str.strip()
|
|
|
+
|
|
|
+ # 2. 尝试解析
|
|
|
+ try:
|
|
|
+ return json.loads(cleaned_str)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ print(f"⚠️ 解析 JSON 失败,AI 返回原始内容: {cleaned_str[:100]}...")
|
|
|
+ return []
|
|
|
+
|
|
|
+
|
|
|
+def deduplicate_results(all_hits):
|
|
|
+ """
|
|
|
+ 简单去重:如果时间戳和英文卡名完全一致,则视为重复
|
|
|
+ (处理滑动窗口重叠区域可能导致的重复识别)
|
|
|
+ """
|
|
|
+ seen = set()
|
|
|
+ unique_hits = []
|
|
|
+
|
|
|
+ for hit in all_hits:
|
|
|
+ # 创建一个唯一标识 Key
|
|
|
+ key = (hit.get('time'), hit.get('card_name_en'))
|
|
|
+
|
|
|
+ if key not in seen:
|
|
|
+ seen.add(key)
|
|
|
+ unique_hits.append(hit)
|
|
|
+
|
|
|
+ return unique_hits
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # 1. 读取文件
|
|
|
+ text_path = r"C:\Code\ML\Project\untitled10\Audio\temp\transcripts\vortexcards.txt"
|
|
|
+ try:
|
|
|
+ with open(text_path, "r", encoding="utf-8") as f:
|
|
|
+ full_text = f.read()
|
|
|
+ except FileNotFoundError:
|
|
|
+ print(f"❌ 找不到文件: {text_path}")
|
|
|
+ exit()
|
|
|
+
|
|
|
+ print(f"📄 原文总长度: {len(full_text)} 字符")
|
|
|
+
|
|
|
+ all_extracted_cards = []
|
|
|
+
|
|
|
+ # 2. 开始滑动窗口处理
|
|
|
+ chunks = list(split_text_with_overlap(full_text, CHUNK_SIZE, OVERLAP_SIZE))
|
|
|
+ total_chunks = len(chunks)
|
|
|
+
|
|
|
+ print(f"✂️ 将分割为 {total_chunks} 个片段进行处理 (Size: {CHUNK_SIZE}, Overlap: {OVERLAP_SIZE})...\n")
|
|
|
+
|
|
|
+ for i, chunk in enumerate(chunks):
|
|
|
+ print(f"⏳ 正在处理第 {i + 1}/{total_chunks} 个片段 (长度 {len(chunk)})...")
|
|
|
+
|
|
|
+ # 发送请求
|
|
|
+ ai_response_str = send(chunk)
|
|
|
+
|
|
|
+ # 解析结果
|
|
|
+ chunk_hits = parse_ai_json(ai_response_str)
|
|
|
+
|
|
|
+ if chunk_hits:
|
|
|
+ print(f" ✅ 第 {i + 1} 段识别到 {len(chunk_hits)} 张卡片")
|
|
|
+ all_extracted_cards.extend(chunk_hits)
|
|
|
+ else:
|
|
|
+ print(f" ⚪ 第 {i + 1} 段未发现目标")
|
|
|
+
|
|
|
+ # 3. 去重 (因为有重叠窗口,可能同一张卡在两段话里都被识别了)
|
|
|
+ final_results = deduplicate_results(all_extracted_cards)
|
|
|
+
|
|
|
+ # 4. 输出最终结果
|
|
|
+ print("\n" + "=" * 30)
|
|
|
+ print(f"🎉 处理完成! 共发现 {len(final_results)} 个高光时刻 (已去重)")
|
|
|
+ print("=" * 30)
|
|
|
+
|
|
|
+ # 打印结果 JSON
|
|
|
+ print(json.dumps(final_results, indent=2, ensure_ascii=False))
|
|
|
+
|
|
|
+ # 如果需要保存到文件
|
|
|
+ file_name = os.path.splitext(os.path.split(text_path)[-1])[0]
|
|
|
+ with open(f"{file_name}.json", "w", encoding="utf-8") as f:
|
|
|
+ json.dump(final_results, f, indent=2, ensure_ascii=False)
|