1
0

crud_card.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. from typing import Optional, List, Dict, Any
  2. from datetime import date
  3. from mysql.connector.pooling import PooledMySQLConnection
  4. import json
  5. from datetime import datetime
  6. import copy
  7. import io
  8. import os
  9. from app.core.config import settings
  10. from app.core.minio_client import minio_client
  11. from app.utils.scheme import CardImageResponse, CardType, SortBy, SortOrder, ImageType
  12. from app.utils.card_score_calculate import calculate_scores_from_images, SCORE_SOURCE_IMAGE_TYPES
  13. from app.core.logger import get_logger
  14. logger = get_logger(__name__)
  15. THUMBNAIL_MAX_SIZE = (320, 320)
  16. # 定义灰度图固定的 detection_json 结构
  17. EMPTY_DETECTION_JSON = {
  18. "result": {
  19. "center_result": {},
  20. "defect_result": {
  21. "defects": []
  22. }
  23. }
  24. }
  25. def _normalize_storage_rel_path(path: Optional[str]) -> Optional[str]:
  26. if not path:
  27. return None
  28. value = str(path)
  29. if value.startswith(settings.DATA_HOST_URL):
  30. value = value.replace(settings.DATA_HOST_URL, "", 1)
  31. return "/" + value.lstrip("/\\")
  32. def _build_thumbnail_rel_path(image_path: Optional[str]) -> Optional[str]:
  33. rel_path = _normalize_storage_rel_path(image_path)
  34. if not rel_path:
  35. return None
  36. base_name = os.path.basename(rel_path)
  37. stem, _ = os.path.splitext(base_name)
  38. if not stem:
  39. return None
  40. return f"/Thumbnail/{stem}_320.jpg"
  41. def _get_or_create_thumbnail_url(image_path: Optional[str]) -> Optional[str]:
  42. rel_path = _normalize_storage_rel_path(image_path)
  43. thumbnail_rel_path = _build_thumbnail_rel_path(image_path)
  44. if not rel_path or not thumbnail_rel_path:
  45. return None
  46. source_object_name = f"{settings.MINIO_BASE_PREFIX}{rel_path}"
  47. thumbnail_object_name = f"{settings.MINIO_BASE_PREFIX}{thumbnail_rel_path}"
  48. try:
  49. minio_client.stat_object(settings.MINIO_BUCKET, thumbnail_object_name)
  50. return settings.get_full_url(thumbnail_rel_path)
  51. except Exception:
  52. pass
  53. try:
  54. from PIL import Image
  55. response = minio_client.get_object(settings.MINIO_BUCKET, source_object_name)
  56. image_bytes = response.read()
  57. response.close()
  58. response.release_conn()
  59. with Image.open(io.BytesIO(image_bytes)) as img:
  60. if img.mode not in ("RGB", "L"):
  61. img = img.convert("RGB")
  62. img.thumbnail(THUMBNAIL_MAX_SIZE)
  63. out_bytes = io.BytesIO()
  64. img.save(out_bytes, format="JPEG", quality=85)
  65. out_bytes.seek(0)
  66. minio_client.put_object(
  67. settings.MINIO_BUCKET,
  68. thumbnail_object_name,
  69. out_bytes,
  70. len(out_bytes.getvalue()),
  71. content_type="image/jpeg"
  72. )
  73. return settings.get_full_url(thumbnail_rel_path)
  74. except Exception as e:
  75. logger.warning("生成缩略图失败: source=%s error=%s", source_object_name, e)
  76. return None
  77. def update_card_scores_and_status(db_conn: PooledMySQLConnection, card_id: int):
  78. """
  79. 更新cards表中的分数和状态。注意:只基于 card_images 表(主4张图)计算。
  80. """
  81. with db_conn.cursor(dictionary=True) as cursor:
  82. # 1. 获取所有关联图片 (主表)
  83. query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s"
  84. cursor.execute(query_images, (card_id,))
  85. image_records = cursor.fetchall()
  86. images = [
  87. CardImageResponse.model_validate(row)
  88. for row in image_records
  89. if row.get("image_type") in SCORE_SOURCE_IMAGE_TYPES
  90. ]
  91. # 2. 计算分数和状态
  92. scores_data = calculate_scores_from_images(images)
  93. # 3. 更新 cards 表
  94. query_update_card = (
  95. f"UPDATE {settings.DB_CARD_TABLE_NAME} SET "
  96. "detection_score = %s, modified_score = %s, is_edited = %s, updated_at = NOW() "
  97. "WHERE id = %s"
  98. )
  99. params = (
  100. scores_data["detection_score"],
  101. scores_data["modified_score"],
  102. scores_data["is_edited"],
  103. card_id,
  104. )
  105. cursor.execute(query_update_card, params)
  106. db_conn.commit()
  107. def _parse_json_value(value: Any) -> Optional[Dict[str, Any]]:
  108. if value is None:
  109. return None
  110. if isinstance(value, str):
  111. try:
  112. return json.loads(value)
  113. except json.JSONDecodeError:
  114. return None
  115. return value if isinstance(value, dict) else None
  116. def _has_center_box_shapes(center_result: Any) -> bool:
  117. if not isinstance(center_result, dict):
  118. return False
  119. box_result = center_result.get("box_result", {})
  120. if not isinstance(box_result, dict):
  121. return False
  122. inner_shapes = box_result.get("inner_box", {}).get("shapes", [])
  123. outer_shapes = box_result.get("outer_box", {}).get("shapes", [])
  124. return bool(inner_shapes or outer_shapes)
  125. def _extract_center_result(json_data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
  126. if not json_data:
  127. return None
  128. center_result = json_data.get("result", {}).get("center_result")
  129. if _has_center_box_shapes(center_result):
  130. return copy.deepcopy(center_result)
  131. return None
  132. def _apply_center_result(json_data: Optional[Dict[str, Any]], center_result: Dict[str, Any]) -> Dict[str, Any]:
  133. payload = copy.deepcopy(json_data) if isinstance(json_data, dict) else copy.deepcopy(EMPTY_DETECTION_JSON)
  134. payload.setdefault("result", {})["center_result"] = copy.deepcopy(center_result)
  135. return payload
  136. def _share_side_center_results(images: List[CardImageResponse]) -> None:
  137. """同面各图共享 center_result(优先 fusion/ring,其次 stripe)。"""
  138. side_priority_types = {
  139. "front": [
  140. ImageType.front_fusion.value,
  141. ImageType.front_ring.value,
  142. ImageType.front_coaxial.value,
  143. ImageType.front_stripe1.value,
  144. ImageType.front_stripe2.value,
  145. ImageType.front_stripe3.value,
  146. ImageType.front_stripe4.value,
  147. ],
  148. "back": [
  149. ImageType.back_fusion.value,
  150. ImageType.back_ring.value,
  151. ImageType.back_coaxial.value,
  152. ImageType.back_stripe1.value,
  153. ImageType.back_stripe2.value,
  154. ImageType.back_stripe3.value,
  155. ImageType.back_stripe4.value,
  156. ],
  157. }
  158. for side, priority_types in side_priority_types.items():
  159. side_center = None
  160. for image_type in priority_types:
  161. for img in images:
  162. if img.image_type != image_type:
  163. continue
  164. for json_field in ("modified_json", "detection_json"):
  165. source = _extract_center_result(getattr(img, json_field, None))
  166. if source:
  167. side_center = source
  168. break
  169. if side_center:
  170. break
  171. if side_center:
  172. break
  173. if not side_center:
  174. continue
  175. prefix = f"{side}_"
  176. for img in images:
  177. if not img.image_type.startswith(prefix):
  178. continue
  179. if img.image_type.endswith("_gray"):
  180. continue
  181. img.detection_json = _apply_center_result(img.detection_json, side_center)
  182. if img.modified_json:
  183. img.modified_json = _apply_center_result(img.modified_json, side_center)
  184. def _construct_gray_image_json(gray_type: str, ring_image_data: Optional[Dict[str, Any]]) -> Dict[str, Any]:
  185. """
  186. 内部辅助:构建辅助图(灰度图/融合图)的 modified_json
  187. gray_type: front_gray / back_gray / front_fusion / back_fusion
  188. ring_image_data: 对应的 ring 图的数据库原始字典数据 (包含 detection_json/modified_json 字段)
  189. """
  190. if not ring_image_data:
  191. return None
  192. # 获取 Ring 图最新的 JSON (优先取 modified, 没有则取 detection)
  193. source_json = ring_image_data.get('modified_json')
  194. if not source_json:
  195. source_json = ring_image_data.get('detection_json')
  196. # 解析 JSON 字符串
  197. if isinstance(source_json, str):
  198. try:
  199. source_json = json.loads(source_json)
  200. except:
  201. return None
  202. if not source_json:
  203. return None
  204. # 开始筛选 defects
  205. defects = source_json.get("result", {}).get("defect_result", {}).get("defects", [])
  206. filtered_defects = []
  207. for defect in defects:
  208. # 判断当前缺陷是否属于当前这张辅助图
  209. # 如果是融合图,就找 fusion_id;如果是灰度图,就找 gray_id
  210. is_fusion = gray_type in ("front_fusion", "back_fusion")
  211. key_to_check = "fusion_id" if is_fusion else "gray_id"
  212. if key_to_check in defect:
  213. filtered_defects.append(defect)
  214. # 即使没有缺陷,也最好返回一个空的结构,不要返回 None
  215. # 不然前端可能会觉得没这个字段或者解析报错
  216. gray_modified_json = copy.deepcopy(EMPTY_DETECTION_JSON)
  217. gray_modified_json["result"]["defect_result"]["defects"] = filtered_defects
  218. center_result = source_json.get("result", {}).get("center_result")
  219. if _has_center_box_shapes(center_result):
  220. gray_modified_json["result"]["center_result"] = copy.deepcopy(center_result)
  221. # 还可以把 Ring 图的宽高带过来,防止前端报错
  222. gray_modified_json["result"]["imageHeight"] = source_json.get("result", {}).get("imageHeight", 0)
  223. gray_modified_json["result"]["imageWidth"] = source_json.get("result", {}).get("imageWidth", 0)
  224. return gray_modified_json
  225. def get_card_with_details(db_conn: PooledMySQLConnection, card_id: int) -> Optional[Dict[str, Any]]:
  226. """获取单个卡牌的完整信息,包含主图和灰度图。"""
  227. with db_conn.cursor(dictionary=True) as cursor:
  228. # 1. 获取卡牌信息
  229. query_card = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s"
  230. cursor.execute(query_card, (card_id,))
  231. card_data = cursor.fetchone()
  232. if not card_data:
  233. return None
  234. # 2. 获取主图片 (Card Images)
  235. query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s"
  236. cursor.execute(query_images, (card_id,))
  237. main_image_records = cursor.fetchall()
  238. # 3. 获取灰度图片 (Gray Images)
  239. query_gray = f"SELECT * FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id = %s"
  240. cursor.execute(query_gray, (card_id,))
  241. gray_image_records = cursor.fetchall()
  242. # 4. 寻找 Ring 图的数据,用于辅助构建灰度图 JSON
  243. # 建立映射: ImageType -> Record Dict
  244. main_images_map = {rec['image_type']: rec for rec in main_image_records}
  245. final_images_list = []
  246. # 处理主图片,补充全路径
  247. for row in main_image_records:
  248. row['thumbnail_path'] = _get_or_create_thumbnail_url(row.get('image_path'))
  249. row['image_path'] = settings.get_full_url(row.get('image_path'))
  250. row['detection_image_path'] = settings.get_full_url(row.get('detection_image_path'))
  251. row['modified_image_path'] = settings.get_full_url(row.get('modified_image_path'))
  252. final_images_list.append(CardImageResponse.model_validate(row))
  253. # 处理 card_gray_images:真灰度图 + ring/stripe 等辅助图(导入时可能落在此表)
  254. for row in gray_image_records:
  255. g_type = row['image_type']
  256. # 主表已有同类型时跳过,避免 images 列表重复
  257. if g_type in main_images_map:
  258. continue
  259. fusion_type = None
  260. if g_type.startswith("front_") and g_type != ImageType.front_gray.value:
  261. fusion_type = ImageType.front_fusion.value
  262. elif g_type.startswith("back_") and g_type != ImageType.back_gray.value:
  263. fusion_type = ImageType.back_fusion.value
  264. if g_type in (ImageType.front_gray.value, ImageType.back_gray.value):
  265. target_ring_type = (
  266. ImageType.front_ring.value
  267. if g_type == ImageType.front_gray.value
  268. else ImageType.back_ring.value
  269. )
  270. virtual_detection_json = copy.deepcopy(EMPTY_DETECTION_JSON)
  271. ring_data = main_images_map.get(target_ring_type)
  272. virtual_modified_json = _construct_gray_image_json(g_type, ring_data)
  273. ring_detection = _parse_json_value(ring_data.get("detection_json") if ring_data else None)
  274. center_result = _extract_center_result(ring_detection)
  275. if center_result:
  276. virtual_detection_json = _apply_center_result(virtual_detection_json, center_result)
  277. elif fusion_type and fusion_type in main_images_map:
  278. # ring / stripe 等在辅助表时,JSON 与融合图共用
  279. fusion_row = main_images_map[fusion_type]
  280. virtual_detection_json = fusion_row.get("detection_json") or copy.deepcopy(EMPTY_DETECTION_JSON)
  281. if isinstance(virtual_detection_json, str):
  282. virtual_detection_json = json.loads(virtual_detection_json)
  283. virtual_modified_json = fusion_row.get("modified_json")
  284. if isinstance(virtual_modified_json, str):
  285. virtual_modified_json = json.loads(virtual_modified_json)
  286. else:
  287. virtual_detection_json = copy.deepcopy(EMPTY_DETECTION_JSON)
  288. virtual_modified_json = None
  289. gray_image_dict = {
  290. "id": row['id'],
  291. "card_id": row['card_id'],
  292. "image_type": g_type,
  293. "image_path": settings.get_full_url(row['image_path']),
  294. "thumbnail_path": _get_or_create_thumbnail_url(row.get('image_path')),
  295. "created_at": row['created_at'],
  296. "updated_at": row['updated_at'],
  297. "detection_json": virtual_detection_json,
  298. "modified_json": virtual_modified_json,
  299. "image_name": None,
  300. "detection_image_path": None,
  301. "modified_image_path": None,
  302. "is_edited": False,
  303. }
  304. final_images_list.append(CardImageResponse.model_validate(gray_image_dict))
  305. _share_side_center_results(final_images_list)
  306. # 5. 获取分数详情:fusion/ring/coaxial 参与算分(每面仅用一份 JSON,不重复扣分)
  307. main_images_objs = [
  308. img for img in final_images_list
  309. if img.image_type in SCORE_SOURCE_IMAGE_TYPES
  310. ]
  311. score_details = calculate_scores_from_images(main_images_objs)
  312. # 6. 对图片列表进行自定义排序(14 类:每面 fusion / gray / ring / stripe1-4)
  313. sort_priority = {
  314. ImageType.front_fusion.value: 0,
  315. ImageType.back_fusion.value: 1,
  316. ImageType.front_gray.value: 2,
  317. ImageType.back_gray.value: 3,
  318. ImageType.front_ring.value: 4,
  319. ImageType.back_ring.value: 5,
  320. ImageType.front_stripe1.value: 6,
  321. ImageType.front_stripe2.value: 7,
  322. ImageType.front_stripe3.value: 8,
  323. ImageType.front_stripe4.value: 9,
  324. ImageType.back_stripe1.value: 10,
  325. ImageType.back_stripe2.value: 11,
  326. ImageType.back_stripe3.value: 12,
  327. ImageType.back_stripe4.value: 13,
  328. ImageType.front_coaxial.value: 14,
  329. ImageType.back_coaxial.value: 15,
  330. }
  331. final_images_list.sort(key=lambda x: sort_priority.get(x.image_type, 999))
  332. card_data.update({
  333. "images": final_images_list, # 包含所有图片
  334. "detection_score": score_details["detection_score"],
  335. "modified_score": score_details["modified_score"],
  336. "detection_score_detail": score_details["detection_score_detail"],
  337. "modified_score_detail": score_details["modified_score_detail"]
  338. })
  339. return card_data
  340. def get_card_list_with_images(
  341. db_conn: PooledMySQLConnection,
  342. card_id: Optional[int],
  343. card_name: Optional[str],
  344. card_type: Optional[CardType],
  345. is_edited: Optional[bool],
  346. min_detect_score: Optional[float],
  347. max_detect_score: Optional[float],
  348. min_mod_score: Optional[float],
  349. max_mod_score: Optional[float],
  350. created_start: Optional[date],
  351. created_end: Optional[date],
  352. updated_start: Optional[date],
  353. updated_end: Optional[date],
  354. sort_by: SortBy,
  355. sort_order: SortOrder,
  356. skip: int,
  357. limit: int
  358. ) -> List[Dict[str, Any]]:
  359. # 此函数逻辑主要是列表展示,不需要复杂的 JSON 构造,
  360. # 只需要把灰度图的路径也带出来即可。
  361. with db_conn.cursor(dictionary=True) as cursor:
  362. query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}"
  363. conditions = []
  364. params = []
  365. if card_id is not None: conditions.append("id = %s"); params.append(card_id)
  366. if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%")
  367. if card_type: conditions.append("card_type = %s"); params.append(card_type.value)
  368. if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited)
  369. if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score)
  370. if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score)
  371. if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score)
  372. if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score)
  373. if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start)
  374. if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end)
  375. if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start)
  376. if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end)
  377. if conditions: query += " WHERE " + " AND ".join(conditions)
  378. query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC"
  379. query += " LIMIT %s OFFSET %s"
  380. params.extend([limit, skip])
  381. cursor.execute(query, tuple(params))
  382. cards = cursor.fetchall()
  383. if not cards:
  384. return []
  385. card_ids = [card['id'] for card in cards]
  386. format_strings = ','.join(['%s'] * len(card_ids))
  387. # 1. 查询主图片
  388. image_query = (
  389. f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path "
  390. f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  391. )
  392. cursor.execute(image_query, tuple(card_ids))
  393. images = cursor.fetchall()
  394. # 2. 查询灰度图片
  395. gray_query = (
  396. f"SELECT id, card_id, image_type, image_path "
  397. f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  398. )
  399. cursor.execute(gray_query, tuple(card_ids))
  400. gray_images = cursor.fetchall()
  401. # 分组
  402. images_by_card_id = {}
  403. for img in images:
  404. cid = img['card_id']
  405. if cid not in images_by_card_id: images_by_card_id[cid] = []
  406. images_by_card_id[cid].append(img)
  407. for g_img in gray_images:
  408. cid = g_img['card_id']
  409. if cid not in images_by_card_id: images_by_card_id[cid] = []
  410. # 补齐字段结构以便前端统一处理
  411. g_img['detection_image_path'] = None
  412. g_img['modified_image_path'] = None
  413. images_by_card_id[cid].append(g_img)
  414. # 附加到卡牌
  415. for card in cards:
  416. card['image_path_list'] = {}
  417. card['detection_image_path_list'] = {}
  418. card['modified_image_path_list'] = {}
  419. related_images = images_by_card_id.get(card['id'], [])
  420. for image_data in related_images:
  421. image_type = image_data['image_type']
  422. if image_type:
  423. card['image_path_list'][image_type] = settings.get_full_url(image_data.get('image_path'))
  424. card['detection_image_path_list'][image_type] = settings.get_full_url(
  425. image_data.get('detection_image_path'))
  426. card['modified_image_path_list'][image_type] = settings.get_full_url(
  427. image_data.get('modified_image_path'))
  428. return cards
  429. def get_card_list_and_count(
  430. db_conn: PooledMySQLConnection,
  431. card_id: Optional[int],
  432. cardNo: Optional[str],
  433. card_name: Optional[str],
  434. card_type: Optional[CardType],
  435. is_edited: Optional[bool],
  436. review_state: Optional[int],
  437. min_detect_score: Optional[float],
  438. max_detect_score: Optional[float],
  439. min_mod_score: Optional[float],
  440. max_mod_score: Optional[float],
  441. created_start: Optional[date],
  442. created_end: Optional[date],
  443. updated_start: Optional[date],
  444. updated_end: Optional[date],
  445. sort_by: SortBy,
  446. sort_order: SortOrder,
  447. skip: int,
  448. limit: int,
  449. permission_user_id: Optional[str] = None,
  450. bound_user_id: Optional[str] = None
  451. ) -> Dict[str, Any]:
  452. with db_conn.cursor(dictionary=True) as cursor:
  453. conditions = []
  454. params = []
  455. if card_id is not None: conditions.append("id = %s"); params.append(card_id)
  456. if cardNo: conditions.append("cardNo LIKE %s"); params.append(f"%{cardNo}%")
  457. if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%")
  458. if card_type: conditions.append("card_type = %s"); params.append(card_type.value)
  459. if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited)
  460. if review_state is not None: conditions.append("review_state = %s"); params.append(review_state)
  461. if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score)
  462. if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score)
  463. if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score)
  464. if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score)
  465. if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start)
  466. if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end)
  467. if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start)
  468. if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end)
  469. if permission_user_id is not None:
  470. conditions.append(
  471. f"id IN (SELECT card_id FROM `{settings.DB_USER_CARD_TABLE_NAME}` WHERE user_id = %s)"
  472. )
  473. params.append(permission_user_id)
  474. where_clause = ""
  475. if conditions: where_clause = " WHERE " + " AND ".join(conditions)
  476. # Count
  477. count_query = f"SELECT COUNT(*) as total FROM {settings.DB_CARD_TABLE_NAME}" + where_clause
  478. cursor.execute(count_query, tuple(params))
  479. total_count = cursor.fetchone()['total']
  480. # List
  481. data_query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}" + where_clause
  482. data_query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC"
  483. data_query += " LIMIT %s OFFSET %s"
  484. data_params = params.copy()
  485. data_params.extend([limit, skip])
  486. cursor.execute(data_query, tuple(data_params))
  487. cards = cursor.fetchall()
  488. if cards:
  489. card_ids = [card['id'] for card in cards]
  490. format_strings = ','.join(['%s'] * len(card_ids))
  491. bound_card_ids = set()
  492. if bound_user_id is not None:
  493. # 只标记当前页卡片是否绑定到指定外部用户,不影响原有列表筛选逻辑。
  494. bound_query = (
  495. f"SELECT card_id FROM `{settings.DB_USER_CARD_TABLE_NAME}` "
  496. f"WHERE user_id = %s AND card_id IN ({format_strings})"
  497. )
  498. cursor.execute(bound_query, tuple([bound_user_id] + card_ids))
  499. bound_card_ids = {row['card_id'] for row in cursor.fetchall()}
  500. # 主图
  501. image_query = (
  502. f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path "
  503. f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  504. )
  505. cursor.execute(image_query, tuple(card_ids))
  506. images = cursor.fetchall()
  507. # [NEW] 灰度图
  508. gray_query = (
  509. f"SELECT id, card_id, image_type, image_path "
  510. f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  511. )
  512. cursor.execute(gray_query, tuple(card_ids))
  513. gray_images = cursor.fetchall()
  514. images_by_card_id = {}
  515. for img in images:
  516. cid = img['card_id']
  517. if cid not in images_by_card_id: images_by_card_id[cid] = []
  518. images_by_card_id[cid].append(img)
  519. # 混入灰度图
  520. for g_img in gray_images:
  521. cid = g_img['card_id']
  522. if cid not in images_by_card_id: images_by_card_id[cid] = []
  523. g_img['detection_image_path'] = None
  524. g_img['modified_image_path'] = None
  525. images_by_card_id[cid].append(g_img)
  526. for card in cards:
  527. card['is_bound'] = card['id'] in bound_card_ids
  528. card['image_path_list'] = {}
  529. card['detection_image_path_list'] = {}
  530. card['modified_image_path_list'] = {}
  531. related_images = images_by_card_id.get(card['id'], [])
  532. for image_data in related_images:
  533. image_type = image_data['image_type']
  534. if image_type:
  535. card['image_path_list'][image_type] = settings.get_full_url(image_data.get('image_path'))
  536. card['detection_image_path_list'][image_type] = settings.get_full_url(
  537. image_data.get('detection_image_path'))
  538. card['modified_image_path_list'][image_type] = settings.get_full_url(
  539. image_data.get('modified_image_path'))
  540. return {
  541. "total": total_count,
  542. "list": cards
  543. }