| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- import requests
- import json
- import re
- import os
- # ================= 配置参数区域 =================
- # 1. API 配置
- API_URL = "http://100.64.0.8/v1/workflows/run"
- API_KEY = "Bearer app-qR46FHcfLyKz2kb0tiiRfV50"
- USER_ID = "abc-123"
- # 2. 滑动窗口配置
- CHUNK_SIZE = 15000 # 每次发送给AI的字符长度
- OVERLAP_SIZE = 500 # 窗口重叠部分的长度 (防止截断关键句子,建议 200-500)
- # ===============================================
- def send(text_chunk):
- """
- 发送单个文本片段给 AI 接口
- """
- headers = {
- "Authorization": API_KEY,
- "Content-Type": "application/json"
- }
- payload = {
- "inputs": {
- "question": text_chunk
- },
- "response_mode": "blocking",
- "user": USER_ID
- }
- try:
- response = requests.post(API_URL, headers=headers, json=payload)
- response.raise_for_status()
- # 获取 AI 返回的纯文本结果
- result_text = response.json()['data']['outputs']['result']
- return result_text
- except requests.exceptions.RequestException as e:
- print(f"❌ 请求发生错误: {e}")
- return "[]" # 发生错误返回空列表字符串,防止程序崩溃
- def split_text_with_overlap(text, chunk_size, overlap):
- """
- 生成器:将长文本按滑动窗口分割
- """
- start = 0
- text_len = len(text)
- while start < text_len:
- end = start + chunk_size
- # 如果是最后一段,end 不能超过文本长度
- if end > text_len:
- end = text_len
- yield text[start:end]
- # 如果已经到达末尾,停止循环
- if end == text_len:
- break
- # 下一次的起点 = 当前起点 + 步长 (步长 = 块大小 - 重叠量)
- start += (chunk_size - overlap)
- def parse_ai_json(json_str):
- """
- 清洗并解析 AI 返回的 JSON 字符串
- """
- if not json_str:
- return []
- # 1. 移除可能存在的 Markdown 标记 (```json ... ```)
- cleaned_str = re.sub(r"```json\s*", "", json_str)
- cleaned_str = re.sub(r"```\s*", "", cleaned_str)
- cleaned_str = cleaned_str.strip()
- # 2. 尝试解析
- try:
- return json.loads(cleaned_str)
- except json.JSONDecodeError:
- print(f"⚠️ 解析 JSON 失败,AI 返回原始内容: {cleaned_str[:100]}...")
- return []
- def deduplicate_results(all_hits):
- """
- 简单去重:如果时间戳和英文卡名完全一致,则视为重复
- (处理滑动窗口重叠区域可能导致的重复识别)
- """
- seen = set()
- unique_hits = []
- for hit in all_hits:
- # 创建一个唯一标识 Key
- key = (hit.get('time'), hit.get('card_name_en'))
- if key not in seen:
- seen.add(key)
- unique_hits.append(hit)
- return unique_hits
- if __name__ == '__main__':
- # 1. 读取文件
- text_path = r"C:\Code\ML\Project\untitled10\Audio\temp\transcripts\vortexcards.txt"
- try:
- with open(text_path, "r", encoding="utf-8") as f:
- full_text = f.read()
- except FileNotFoundError:
- print(f"❌ 找不到文件: {text_path}")
- exit()
- print(f"📄 原文总长度: {len(full_text)} 字符")
- all_extracted_cards = []
- # 2. 开始滑动窗口处理
- chunks = list(split_text_with_overlap(full_text, CHUNK_SIZE, OVERLAP_SIZE))
- total_chunks = len(chunks)
- print(f"✂️ 将分割为 {total_chunks} 个片段进行处理 (Size: {CHUNK_SIZE}, Overlap: {OVERLAP_SIZE})...\n")
- for i, chunk in enumerate(chunks):
- print(f"⏳ 正在处理第 {i + 1}/{total_chunks} 个片段 (长度 {len(chunk)})...")
- # 发送请求
- ai_response_str = send(chunk)
- # 解析结果
- chunk_hits = parse_ai_json(ai_response_str)
- if chunk_hits:
- print(f" ✅ 第 {i + 1} 段识别到 {len(chunk_hits)} 张卡片")
- all_extracted_cards.extend(chunk_hits)
- else:
- print(f" ⚪ 第 {i + 1} 段未发现目标")
- # 3. 去重 (因为有重叠窗口,可能同一张卡在两段话里都被识别了)
- final_results = deduplicate_results(all_extracted_cards)
- # 4. 输出最终结果
- print("\n" + "=" * 30)
- print(f"🎉 处理完成! 共发现 {len(final_results)} 个高光时刻 (已去重)")
- print("=" * 30)
- # 打印结果 JSON
- print(json.dumps(final_results, indent=2, ensure_ascii=False))
- # 如果需要保存到文件
- file_name = os.path.splitext(os.path.split(text_path)[-1])[0]
- with open(f"{file_name}.json", "w", encoding="utf-8") as f:
- json.dump(final_results, f, indent=2, ensure_ascii=False)
|