auto_import.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. import os
  2. import json
  3. import asyncio
  4. import aiohttp
  5. from typing import Dict, Any, Tuple, Optional, Union
  6. from fastapi import APIRouter, HTTPException, Request, UploadFile, File, Form
  7. from app.core.config import settings
  8. from app.core.logger import get_logger
  9. from app.utils.scheme import CardType, IMAGE_TYPE_TO_SCORE_TYPE
  10. logger = get_logger(__name__)
  11. router = APIRouter()
  12. IMPORT_REQUEST_TIMEOUT = aiohttp.ClientTimeout(
  13. total=600,
  14. connect=10,
  15. sock_connect=10,
  16. sock_read=480,
  17. )
  18. # --- 内部辅助函数 ---
  19. async def call_api_with_bytes(
  20. session: aiohttp.ClientSession,
  21. url: str,
  22. file_bytes: bytes,
  23. filename: str,
  24. params: Dict[str, Any] = None,
  25. form_fields: Dict[str, Any] = None,
  26. file_field_name: str = 'file'
  27. ) -> Tuple[int, bytes]:
  28. """通用的文件上传API调用函数 (接收字节数据)"""
  29. form_data = aiohttp.FormData()
  30. if form_fields:
  31. for key, value in form_fields.items():
  32. form_data.add_field(key, str(value))
  33. # 直接将内存中的字节数据添加到表单
  34. form_data.add_field(
  35. file_field_name,
  36. file_bytes,
  37. filename=filename or 'image.jpg',
  38. content_type='image/jpeg'
  39. )
  40. try:
  41. async with session.post(url, data=form_data, params=params) as response:
  42. response_content = await response.read()
  43. if not response.ok:
  44. logger.error(
  45. f"[API Error] {url} -> Status: {response.status}, Msg: {response_content.decode('utf-8', errors='ignore')[:100]}")
  46. return response.status, response_content
  47. except Exception as e:
  48. logger.error(f"[Conn Error] {url} -> {e}")
  49. raise e
  50. async def process_main_image(
  51. session: aiohttp.ClientSession,
  52. file_bytes: bytes,
  53. filename: str,
  54. image_type: str,
  55. is_reflect_card: str
  56. ) -> Dict[str, Any]:
  57. """调用推理服务,处理主图片"""
  58. score_type = IMAGE_TYPE_TO_SCORE_TYPE.get(image_type)
  59. if not score_type:
  60. raise HTTPException(status_code=400, detail=f"不支持的主图类型: {image_type}")
  61. logger.info(f"处理主图: image_type={image_type}, score_type={score_type} -> {filename}")
  62. inference_base_url = settings.SCORE_UPDATE_SERVER_URL
  63. # 1. 获取转正后的图片
  64. rectify_url = f"{inference_base_url}/api/card_inference/card_rectify_and_center"
  65. rectify_status, rectified_image_bytes = await call_api_with_bytes(
  66. session, url=rectify_url, file_bytes=file_bytes, filename=filename
  67. )
  68. if rectify_status >= 300:
  69. raise HTTPException(status_code=500, detail=f"图片转正失败: {image_type}")
  70. # 2. 获取分数JSON
  71. score_url = f"{inference_base_url}/api/card_score/score_inference"
  72. score_params = {"score_type": score_type, "is_reflect_card": is_reflect_card}
  73. score_status, score_json_bytes = await call_api_with_bytes(
  74. session, url=score_url, file_bytes=file_bytes, filename=filename, params=score_params
  75. )
  76. if score_status >= 300:
  77. raise HTTPException(status_code=500, detail=f"推理分数失败: {image_type}")
  78. return {
  79. "image_type": image_type,
  80. "rectified_image": rectified_image_bytes,
  81. "score_json": json.loads(score_json_bytes)
  82. }
  83. async def create_card_record(session: aiohttp.ClientSession, base_url: str, card_name: str, cardNo: Optional[str],
  84. card_type: CardType) -> int:
  85. """调用自身服务创建新的卡牌记录"""
  86. url = f"{base_url}{settings.API_PREFIX}/cards/created"
  87. params = {'card_name': card_name, 'card_type': card_type.value}
  88. if cardNo:
  89. params['cardNo'] = cardNo
  90. async with session.post(url, params=params) as response:
  91. if response.status == 201:
  92. data = await response.json()
  93. return data.get('id')
  94. else:
  95. text = await response.text()
  96. raise HTTPException(status_code=response.status, detail=f"创建卡牌记录失败: {text}")
  97. async def upload_main_image(session: aiohttp.ClientSession, base_url: str, card_id: int,
  98. processed_data: Dict[str, Any]):
  99. """将处理后的主图和JSON上传到自身服务"""
  100. image_type = processed_data['image_type']
  101. url = f"{base_url}{settings.API_PREFIX}/images/insert/{card_id}"
  102. form_data = aiohttp.FormData()
  103. form_data.add_field('image_type', image_type)
  104. form_data.add_field('json_data_str', json.dumps(processed_data['score_json'], ensure_ascii=False))
  105. form_data.add_field(
  106. 'image',
  107. processed_data['rectified_image'],
  108. filename=f'{image_type}_rectified.jpg',
  109. content_type='image/jpeg'
  110. )
  111. async with session.post(url, data=form_data) as response:
  112. if response.status != 201:
  113. logger.error(f"[主图上传失败] {image_type} code={response.status}")
  114. raise HTTPException(status_code=500, detail=f"主图保存失败: {image_type}")
  115. async def upload_gray_image(session: aiohttp.ClientSession, base_url: str, card_id: int, image_type: str,
  116. file_bytes: bytes, filename: str):
  117. """将灰度图源文件上传到自身服务"""
  118. url = f"{base_url}{settings.API_PREFIX}/images/insert/gray/{card_id}"
  119. form_fields = {'image_type': image_type}
  120. status, _ = await call_api_with_bytes(
  121. session, url=url, file_bytes=file_bytes, filename=filename, form_fields=form_fields, file_field_name='image'
  122. )
  123. if status != 201:
  124. logger.error(f"[灰度图上传失败] {image_type} code={status}")
  125. raise HTTPException(status_code=500, detail=f"灰度图保存失败: {image_type}")
  126. # --- 暴露的API接口 ---
  127. @router.post("/process_and_import", summary="自动化处理并导入卡牌数据[相机后台调用]")
  128. async def auto_import_script_api(
  129. request: Request,
  130. card_name: str = Form(..., description="卡牌名称"),
  131. cardNo: Optional[str] = Form(None, description="卡牌编号"),
  132. card_type: CardType = Form(CardType.pokemon, description="卡牌类型"),
  133. is_reflect_card: bool = Form(True, description="是否是反光卡"),
  134. strict_mode: bool = Form(False, description="如果为True,必须提供所有4张主图"),
  135. front_ring: Union[UploadFile, str, None] = File(None, description="正面环光图文件"),
  136. front_coaxial: Union[UploadFile, str, None] = File(None, description="正面同轴光图文件"),
  137. back_ring: Union[UploadFile, str, None] = File(None, description="背面环光图文件"),
  138. back_coaxial: Union[UploadFile, str, None] = File(None, description="背面同轴光图文件"),
  139. front_gray: Union[UploadFile, str, None] = File(None, description="正面灰度图文件"),
  140. back_gray: Union[UploadFile, str, None] = File(None, description="背面灰度图文件")
  141. ):
  142. local_base_url = str(request.base_url).rstrip('/')
  143. main_inputs = {
  144. "front_ring": front_ring,
  145. "front_coaxial": front_coaxial,
  146. "back_ring": back_ring,
  147. "back_coaxial": back_coaxial
  148. }
  149. gray_inputs = {
  150. "front_gray": front_gray,
  151. "back_gray": back_gray
  152. }
  153. valid_main_files = {
  154. k: v for k, v in main_inputs.items()
  155. if (v is not None) and v.filename
  156. }
  157. valid_gray_files = {
  158. k: v for k, v in gray_inputs.items()
  159. if (v is not None) and v.filename
  160. }
  161. provided_main_count = len(valid_main_files)
  162. if strict_mode and provided_main_count != 4:
  163. raise HTTPException(status_code=400, detail=f"严格模式开启,必须提供所有4张主图。")
  164. if not strict_mode and provided_main_count == 0 and not valid_gray_files:
  165. raise HTTPException(status_code=400, detail="未提供任何图片文件,无法创建。")
  166. logger.info(
  167. "auto_import_script_api start card_name=%s cardNo=%s card_type=%s is_reflect_card=%s strict_mode=%s "
  168. "main_types=%s gray_types=%s",
  169. card_name,
  170. cardNo or "",
  171. card_type.value,
  172. is_reflect_card,
  173. strict_mode,
  174. ",".join(valid_main_files.keys()) or "none",
  175. ",".join(valid_gray_files.keys()) or "none",
  176. )
  177. is_reflect_str = "true" if is_reflect_card else "false"
  178. connector = aiohttp.TCPConnector(limit=20, force_close=True)
  179. async with aiohttp.ClientSession(timeout=IMPORT_REQUEST_TIMEOUT, connector=connector) as session:
  180. try:
  181. main_bytes_data = {k: (await v.read(), v.filename) for k, v in valid_main_files.items()}
  182. gray_bytes_data = {k: (await v.read(), v.filename) for k, v in valid_gray_files.items()}
  183. logger.info(
  184. "auto_import_script_api files_loaded card_name=%s main_count=%s gray_count=%s",
  185. card_name,
  186. len(main_bytes_data),
  187. len(gray_bytes_data),
  188. )
  189. processed_results = []
  190. for img_type, (f_bytes, f_name) in main_bytes_data.items():
  191. if len(f_bytes) == 0:
  192. raise HTTPException(status_code=400, detail=f"图片文件 {f_name} 内容为空")
  193. res = await process_main_image(session, f_bytes, f_name, img_type, is_reflect_str)
  194. processed_results.append(res)
  195. card_id = await create_card_record(
  196. session, local_base_url, card_name, cardNo, card_type
  197. )
  198. logger.info(
  199. "auto_import_script_api card_created card_name=%s card_id=%s processed_main_count=%s gray_count=%s",
  200. card_name,
  201. card_id,
  202. len(processed_results),
  203. len(gray_bytes_data),
  204. )
  205. # ---------- 修改点:safe_upload_task 调用 + gather ----------
  206. upload_tasks = []
  207. for res in processed_results:
  208. upload_tasks.append(upload_main_image(session, local_base_url, card_id, res))
  209. for img_type, (f_bytes, f_name) in gray_bytes_data.items():
  210. upload_tasks.append(upload_gray_image(session, local_base_url, card_id, img_type, f_bytes, f_name))
  211. if upload_tasks:
  212. await asyncio.gather(*upload_tasks)
  213. # ------------------------------------------------------------
  214. logger.info(
  215. "auto_import_script_api finished card_name=%s card_id=%s upload_task_count=%s",
  216. card_name,
  217. card_id,
  218. len(upload_tasks),
  219. )
  220. return {
  221. "message": "导入成功",
  222. "card_id": card_id,
  223. "card_name": card_name,
  224. "cardNo": cardNo
  225. }
  226. except HTTPException as e:
  227. logger.error(
  228. "auto_import_script_api failed_http card_name=%s detail=%s status_code=%s",
  229. card_name,
  230. e.detail,
  231. e.status_code,
  232. )
  233. raise
  234. except Exception as e:
  235. logger.exception("auto_import_script_api failed_unexpected card_name=%s", card_name)
  236. raise HTTPException(status_code=500, detail=f"自动化处理异常: {str(e)}")
  237. @router.post("/process_and_import_url", summary="通过URL自动化处理并导入卡牌数据")
  238. async def auto_import_url_script_api(
  239. request: Request,
  240. card_name: str = Form(..., description="卡牌名称"),
  241. cardNo: Optional[str] = Form(None, description="卡牌编号"),
  242. card_type: CardType = Form(CardType.pokemon, description="卡牌类型"),
  243. is_reflect_card: bool = Form(True, description="是否是反光卡"),
  244. strict_mode: bool = Form(False, description="如果为True,必须提供所有4张主图URL"),
  245. front_ring: Optional[str] = Form(None, description="正面环光图URL"),
  246. front_coaxial: Optional[str] = Form(None, description="正面同轴光图URL"),
  247. back_ring: Optional[str] = Form(None, description="背面环光图URL"),
  248. back_coaxial: Optional[str] = Form(None, description="背面同轴光图URL"),
  249. front_gray: Optional[str] = Form(None, description="正面灰度图URL"),
  250. back_gray: Optional[str] = Form(None, description="背面灰度图URL")
  251. ):
  252. logger.info(f"--- 开始URL导入任务:auto_import_url_script_api: {card_name} ---")
  253. local_base_url = str(request.base_url).rstrip('/')
  254. main_inputs = {
  255. "front_ring": front_ring, "front_coaxial": front_coaxial,
  256. "back_ring": back_ring, "back_coaxial": back_coaxial
  257. }
  258. gray_inputs = {
  259. "front_gray": front_gray, "back_gray": back_gray
  260. }
  261. valid_main_urls = {k: v for k, v in main_inputs.items() if v and v.strip()}
  262. valid_gray_urls = {k: v for k, v in gray_inputs.items() if v and v.strip()}
  263. provided_main_count = len(valid_main_urls)
  264. if strict_mode and provided_main_count != 4:
  265. raise HTTPException(status_code=400, detail="严格模式开启,必须提供所有4张主图URL。")
  266. if not strict_mode and provided_main_count == 0 and not valid_gray_urls:
  267. raise HTTPException(status_code=400, detail="未提供任何图片URL,无法创建。")
  268. is_reflect_str = "true" if is_reflect_card else "false"
  269. connector = aiohttp.TCPConnector(limit=20, force_close=True)
  270. async with aiohttp.ClientSession(timeout=IMPORT_REQUEST_TIMEOUT, connector=connector) as session:
  271. try:
  272. logger.info(f"--- 开始URL自动导入任务: {card_name} ---")
  273. async def fetch_image(img_key: str, img_url: str):
  274. try:
  275. async with session.get(img_url) as resp:
  276. if resp.status != 200:
  277. raise HTTPException(status_code=400, detail=f"下载图片失败: {img_key} -> {resp.status}")
  278. file_bytes = await resp.read()
  279. filename = img_url.split('/')[-1].split('?')[0]
  280. if not filename or '.' not in filename:
  281. filename = f"{img_key}.jpg"
  282. return img_key, (file_bytes, filename)
  283. except Exception as e:
  284. if isinstance(e, HTTPException): raise e
  285. raise HTTPException(status_code=400, detail=f"访问图片URL异常: {img_key} -> {str(e)}")
  286. fetch_tasks = [fetch_image(k, url) for k, url in valid_main_urls.items()]
  287. fetch_tasks += [fetch_image(k, url) for k, url in valid_gray_urls.items()]
  288. downloaded_files = await asyncio.gather(*fetch_tasks)
  289. main_bytes_data = {}
  290. gray_bytes_data = {}
  291. for key, data in downloaded_files:
  292. if key in valid_main_urls:
  293. main_bytes_data[key] = data
  294. else:
  295. gray_bytes_data[key] = data
  296. processed_results = []
  297. for img_type, (f_bytes, f_name) in main_bytes_data.items():
  298. if len(f_bytes) == 0:
  299. raise HTTPException(status_code=400, detail=f"图片文件 {f_name} 内容为空")
  300. res = await process_main_image(session, f_bytes, f_name, img_type, is_reflect_str)
  301. processed_results.append(res)
  302. card_id = await create_card_record(session, local_base_url, card_name, cardNo, card_type)
  303. logger.info(f"URL导入卡片记录创建成功,ID: {card_id}")
  304. # ---------- 修改点:safe_upload_task 调用 + gather ----------
  305. upload_tasks = []
  306. for res in processed_results:
  307. upload_tasks.append(upload_main_image(session, local_base_url, card_id, res))
  308. for img_type, (f_bytes, f_name) in gray_bytes_data.items():
  309. upload_tasks.append(upload_gray_image(session, local_base_url, card_id, img_type, f_bytes, f_name))
  310. if upload_tasks:
  311. await asyncio.gather(*upload_tasks)
  312. # ------------------------------------------------------------
  313. logger.info(f"--- URL自动导入流程结束, Card ID: {card_id} ---")
  314. return {
  315. "message": "URL导入成功",
  316. "card_id": card_id,
  317. "card_name": card_name,
  318. "cardNo": cardNo
  319. }
  320. except HTTPException:
  321. raise
  322. except Exception as e:
  323. logger.error(f"[URL导入流程终止] 发生异常: {e}")
  324. raise HTTPException(status_code=500, detail=f"自动化处理异常: {str(e)}")