split_text_with_overlap.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import requests
  2. import json
  3. import re
  4. import os
  5. # ================= 配置参数区域 =================
  6. # 1. API 配置
  7. API_URL = "http://100.64.0.8/v1/workflows/run"
  8. API_KEY = "Bearer app-qR46FHcfLyKz2kb0tiiRfV50"
  9. USER_ID = "abc-123"
  10. # 2. 滑动窗口配置
  11. CHUNK_SIZE = 15000 # 每次发送给AI的字符长度
  12. OVERLAP_SIZE = 500 # 窗口重叠部分的长度 (防止截断关键句子,建议 200-500)
  13. # ===============================================
  14. def send(text_chunk):
  15. """
  16. 发送单个文本片段给 AI 接口
  17. """
  18. headers = {
  19. "Authorization": API_KEY,
  20. "Content-Type": "application/json"
  21. }
  22. payload = {
  23. "inputs": {
  24. "question": text_chunk
  25. },
  26. "response_mode": "blocking",
  27. "user": USER_ID
  28. }
  29. try:
  30. response = requests.post(API_URL, headers=headers, json=payload)
  31. response.raise_for_status()
  32. # 获取 AI 返回的纯文本结果
  33. result_text = response.json()['data']['outputs']['result']
  34. return result_text
  35. except requests.exceptions.RequestException as e:
  36. print(f"❌ 请求发生错误: {e}")
  37. return "[]" # 发生错误返回空列表字符串,防止程序崩溃
  38. def split_text_with_overlap(text, chunk_size, overlap):
  39. """
  40. 生成器:将长文本按滑动窗口分割
  41. """
  42. start = 0
  43. text_len = len(text)
  44. while start < text_len:
  45. end = start + chunk_size
  46. # 如果是最后一段,end 不能超过文本长度
  47. if end > text_len:
  48. end = text_len
  49. yield text[start:end]
  50. # 如果已经到达末尾,停止循环
  51. if end == text_len:
  52. break
  53. # 下一次的起点 = 当前起点 + 步长 (步长 = 块大小 - 重叠量)
  54. start += (chunk_size - overlap)
  55. def parse_ai_json(json_str):
  56. """
  57. 清洗并解析 AI 返回的 JSON 字符串
  58. """
  59. if not json_str:
  60. return []
  61. # 1. 移除可能存在的 Markdown 标记 (```json ... ```)
  62. cleaned_str = re.sub(r"```json\s*", "", json_str)
  63. cleaned_str = re.sub(r"```\s*", "", cleaned_str)
  64. cleaned_str = cleaned_str.strip()
  65. # 2. 尝试解析
  66. try:
  67. return json.loads(cleaned_str)
  68. except json.JSONDecodeError:
  69. print(f"⚠️ 解析 JSON 失败,AI 返回原始内容: {cleaned_str[:100]}...")
  70. return []
  71. def deduplicate_results(all_hits):
  72. """
  73. 简单去重:如果时间戳和英文卡名完全一致,则视为重复
  74. (处理滑动窗口重叠区域可能导致的重复识别)
  75. """
  76. seen = set()
  77. unique_hits = []
  78. for hit in all_hits:
  79. # 创建一个唯一标识 Key
  80. key = (hit.get('time'), hit.get('card_name_en'))
  81. if key not in seen:
  82. seen.add(key)
  83. unique_hits.append(hit)
  84. return unique_hits
  85. if __name__ == '__main__':
  86. # 1. 读取文件
  87. text_path = r"C:\Code\ML\Project\untitled10\Audio\temp\transcripts\vortexcards.txt"
  88. try:
  89. with open(text_path, "r", encoding="utf-8") as f:
  90. full_text = f.read()
  91. except FileNotFoundError:
  92. print(f"❌ 找不到文件: {text_path}")
  93. exit()
  94. print(f"📄 原文总长度: {len(full_text)} 字符")
  95. all_extracted_cards = []
  96. # 2. 开始滑动窗口处理
  97. chunks = list(split_text_with_overlap(full_text, CHUNK_SIZE, OVERLAP_SIZE))
  98. total_chunks = len(chunks)
  99. print(f"✂️ 将分割为 {total_chunks} 个片段进行处理 (Size: {CHUNK_SIZE}, Overlap: {OVERLAP_SIZE})...\n")
  100. for i, chunk in enumerate(chunks):
  101. print(f"⏳ 正在处理第 {i + 1}/{total_chunks} 个片段 (长度 {len(chunk)})...")
  102. # 发送请求
  103. ai_response_str = send(chunk)
  104. # 解析结果
  105. chunk_hits = parse_ai_json(ai_response_str)
  106. if chunk_hits:
  107. print(f" ✅ 第 {i + 1} 段识别到 {len(chunk_hits)} 张卡片")
  108. all_extracted_cards.extend(chunk_hits)
  109. else:
  110. print(f" ⚪ 第 {i + 1} 段未发现目标")
  111. # 3. 去重 (因为有重叠窗口,可能同一张卡在两段话里都被识别了)
  112. final_results = deduplicate_results(all_extracted_cards)
  113. # 4. 输出最终结果
  114. print("\n" + "=" * 30)
  115. print(f"🎉 处理完成! 共发现 {len(final_results)} 个高光时刻 (已去重)")
  116. print("=" * 30)
  117. # 打印结果 JSON
  118. print(json.dumps(final_results, indent=2, ensure_ascii=False))
  119. # 如果需要保存到文件
  120. file_name = os.path.splitext(os.path.split(text_path)[-1])[0]
  121. with open(f"{file_name}.json", "w", encoding="utf-8") as f:
  122. json.dump(final_results, f, indent=2, ensure_ascii=False)