crud_card.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. from typing import Optional, List, Dict, Any
  2. from datetime import date
  3. from mysql.connector.pooling import PooledMySQLConnection
  4. import json
  5. from datetime import datetime
  6. import copy
  7. from app.core.config import settings
  8. from app.utils.scheme import CardImageResponse, CardType, SortBy, SortOrder, ImageType
  9. from app.utils.card_score_calculate import calculate_scores_from_images
  10. from app.core.logger import get_logger
  11. logger = get_logger(__name__)
  12. # 定义灰度图固定的 detection_json 结构
  13. EMPTY_DETECTION_JSON = {
  14. "result": {
  15. "center_result": {},
  16. "defect_result": {
  17. "defects": []
  18. }
  19. }
  20. }
  21. def update_card_scores_and_status(db_conn: PooledMySQLConnection, card_id: int):
  22. """
  23. 更新cards表中的分数和状态。注意:只基于 card_images 表(主4张图)计算。
  24. """
  25. with db_conn.cursor(dictionary=True) as cursor:
  26. # 1. 获取所有关联图片 (主表)
  27. query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s"
  28. cursor.execute(query_images, (card_id,))
  29. image_records = cursor.fetchall()
  30. images = [CardImageResponse.model_validate(row) for row in image_records]
  31. # 2. 计算分数和状态
  32. scores_data = calculate_scores_from_images(images)
  33. # 3. 更新 cards 表
  34. query_update_card = (
  35. f"UPDATE {settings.DB_CARD_TABLE_NAME} SET "
  36. "detection_score = %s, modified_score = %s, is_edited = %s, updated_at = %s "
  37. "WHERE id = %s"
  38. )
  39. params = (
  40. scores_data["detection_score"],
  41. scores_data["modified_score"],
  42. scores_data["is_edited"],
  43. datetime.now(),
  44. card_id,
  45. )
  46. cursor.execute(query_update_card, params)
  47. db_conn.commit()
  48. def _construct_gray_image_json(gray_type: ImageType, ring_image_data: Optional[Dict[str, Any]]) -> Dict[str, Any]:
  49. """
  50. 内部辅助:构建灰度图的 modified_json
  51. gray_type: front_gray 或 back_gray
  52. ring_image_data: 对应的 ring 图的数据库原始字典数据 (包含 detection_json/modified_json 字段)
  53. """
  54. if not ring_image_data:
  55. return None
  56. # 获取 Ring 图最新的 JSON (优先取 modified, 没有则取 detection)
  57. source_json = ring_image_data.get('modified_json')
  58. if not source_json:
  59. source_json = ring_image_data.get('detection_json')
  60. # 解析 JSON 字符串
  61. if isinstance(source_json, str):
  62. try:
  63. source_json = json.loads(source_json)
  64. except:
  65. return None
  66. if not source_json:
  67. return None
  68. # 开始筛选 defects
  69. defects = source_json.get("result", {}).get("defect_result", {}).get("defects", [])
  70. filtered_defects = []
  71. for defect in defects:
  72. # 寻找存在 "gray_id" 字段的数据
  73. if "gray_id" in defect:
  74. filtered_defects.append(defect)
  75. if not filtered_defects:
  76. return None # 这里暂定如果没有相关缺陷,modified_json 为 None
  77. gray_modified_json = copy.deepcopy(EMPTY_DETECTION_JSON)
  78. gray_modified_json["result"]["defect_result"]["defects"] = filtered_defects
  79. # 还可以把 Ring 图的宽高带过来,防止前端报错
  80. gray_modified_json["result"]["imageHeight"] = source_json.get("result", {}).get("imageHeight", 0)
  81. gray_modified_json["result"]["imageWidth"] = source_json.get("result", {}).get("imageWidth", 0)
  82. return gray_modified_json
  83. def get_card_with_details(db_conn: PooledMySQLConnection, card_id: int) -> Optional[Dict[str, Any]]:
  84. """获取单个卡牌的完整信息,包含主图和灰度图。"""
  85. with db_conn.cursor(dictionary=True) as cursor:
  86. # 1. 获取卡牌信息
  87. query_card = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME} WHERE id = %s"
  88. cursor.execute(query_card, (card_id,))
  89. card_data = cursor.fetchone()
  90. if not card_data:
  91. return None
  92. # 2. 获取主图片 (Card Images)
  93. query_images = f"SELECT * FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id = %s"
  94. cursor.execute(query_images, (card_id,))
  95. main_image_records = cursor.fetchall()
  96. # 3. 获取灰度图片 (Gray Images)
  97. query_gray = f"SELECT * FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id = %s"
  98. cursor.execute(query_gray, (card_id,))
  99. gray_image_records = cursor.fetchall()
  100. # 4. 寻找 Ring 图的数据,用于辅助构建灰度图 JSON
  101. # 建立映射: ImageType -> Record Dict
  102. main_images_map = {rec['image_type']: rec for rec in main_image_records}
  103. final_images_list = []
  104. # 处理主图片
  105. for row in main_image_records:
  106. final_images_list.append(CardImageResponse.model_validate(row))
  107. # 处理灰度图片
  108. for row in gray_image_records:
  109. g_type = row['image_type']
  110. # 确定对应的 Ring 类型
  111. target_ring_type = None
  112. if g_type == ImageType.front_gray.value:
  113. target_ring_type = ImageType.front_ring.value
  114. elif g_type == ImageType.back_gray.value:
  115. target_ring_type = ImageType.back_ring.value
  116. # 构造虚拟数据
  117. # a. detection_json 写死
  118. virtual_detection_json = copy.deepcopy(EMPTY_DETECTION_JSON)
  119. # b. modified_json 动态计算
  120. virtual_modified_json = None
  121. if target_ring_type:
  122. ring_data = main_images_map.get(target_ring_type)
  123. virtual_modified_json = _construct_gray_image_json(g_type, ring_data)
  124. # 构造字典以符合 Pydantic 模型
  125. # 灰度图表里没有的字段补 None
  126. gray_image_dict = {
  127. "id": row['id'],
  128. "card_id": row['card_id'],
  129. "image_type": row['image_type'],
  130. "image_path": row['image_path'],
  131. "created_at": row['created_at'],
  132. "updated_at": row['updated_at'],
  133. # 虚拟字段
  134. "detection_json": virtual_detection_json,
  135. "modified_json": virtual_modified_json,
  136. "image_name": None,
  137. "detection_image_path": None,
  138. "modified_image_path": None,
  139. "is_edited": False # 灰度图本身不算被编辑,它只是展示 Ring 的编辑结果
  140. }
  141. final_images_list.append(CardImageResponse.model_validate(gray_image_dict))
  142. # 5. 获取分数详情 (只基于主图片计算)
  143. # 过滤掉灰度图进行分数计算,防止干扰逻辑
  144. main_images_objs = [img for img in final_images_list if
  145. img.image_type not in [ImageType.front_gray.value, ImageType.back_gray.value]]
  146. score_details = calculate_scores_from_images(main_images_objs)
  147. # 6. 对图片列表进行自定义排序
  148. # 顺序: [front_gray, back_gray, front_ring, back_ring, front_coaxial, back_coaxial]
  149. sort_priority = {
  150. ImageType.front_gray.value: 0,
  151. ImageType.back_gray.value: 1,
  152. ImageType.front_ring.value: 2,
  153. ImageType.back_ring.value: 3,
  154. ImageType.front_coaxial.value: 4,
  155. ImageType.back_coaxial.value: 5
  156. }
  157. final_images_list.sort(key=lambda x: sort_priority.get(x.image_type, 999))
  158. card_data.update({
  159. "images": final_images_list, # 包含所有图片
  160. "detection_score_detail": score_details["detection_score_detail"],
  161. "modified_score_detail": score_details["modified_score_detail"]
  162. })
  163. return card_data
  164. def get_card_list_with_images(
  165. db_conn: PooledMySQLConnection,
  166. card_id: Optional[int],
  167. card_name: Optional[str],
  168. card_type: Optional[CardType],
  169. is_edited: Optional[bool],
  170. min_detect_score: Optional[float],
  171. max_detect_score: Optional[float],
  172. min_mod_score: Optional[float],
  173. max_mod_score: Optional[float],
  174. created_start: Optional[date],
  175. created_end: Optional[date],
  176. updated_start: Optional[date],
  177. updated_end: Optional[date],
  178. sort_by: SortBy,
  179. sort_order: SortOrder,
  180. skip: int,
  181. limit: int
  182. ) -> List[Dict[str, Any]]:
  183. # 此函数逻辑主要是列表展示,不需要复杂的 JSON 构造,
  184. # 只需要把灰度图的路径也带出来即可。
  185. with db_conn.cursor(dictionary=True) as cursor:
  186. query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}"
  187. conditions = []
  188. params = []
  189. if card_id is not None: conditions.append("id = %s"); params.append(card_id)
  190. if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%")
  191. if card_type: conditions.append("card_type = %s"); params.append(card_type.value)
  192. if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited)
  193. if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score)
  194. if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score)
  195. if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score)
  196. if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score)
  197. if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start)
  198. if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end)
  199. if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start)
  200. if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end)
  201. if conditions: query += " WHERE " + " AND ".join(conditions)
  202. query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC"
  203. query += " LIMIT %s OFFSET %s"
  204. params.extend([limit, skip])
  205. cursor.execute(query, tuple(params))
  206. cards = cursor.fetchall()
  207. if not cards:
  208. return []
  209. card_ids = [card['id'] for card in cards]
  210. format_strings = ','.join(['%s'] * len(card_ids))
  211. # 1. 查询主图片
  212. image_query = (
  213. f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path "
  214. f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  215. )
  216. cursor.execute(image_query, tuple(card_ids))
  217. images = cursor.fetchall()
  218. # 2. 查询灰度图片
  219. gray_query = (
  220. f"SELECT id, card_id, image_type, image_path "
  221. f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  222. )
  223. cursor.execute(gray_query, tuple(card_ids))
  224. gray_images = cursor.fetchall()
  225. # 分组
  226. images_by_card_id = {}
  227. for img in images:
  228. cid = img['card_id']
  229. if cid not in images_by_card_id: images_by_card_id[cid] = []
  230. images_by_card_id[cid].append(img)
  231. for g_img in gray_images:
  232. cid = g_img['card_id']
  233. if cid not in images_by_card_id: images_by_card_id[cid] = []
  234. # 补齐字段结构以便前端统一处理
  235. g_img['detection_image_path'] = None
  236. g_img['modified_image_path'] = None
  237. images_by_card_id[cid].append(g_img)
  238. # 附加到卡牌
  239. for card in cards:
  240. card['image_path_list'] = {}
  241. card['detection_image_path_list'] = {}
  242. card['modified_image_path_list'] = {}
  243. related_images = images_by_card_id.get(card['id'], [])
  244. for image_data in related_images:
  245. image_type = image_data['image_type']
  246. if image_type:
  247. card['image_path_list'][image_type] = image_data.get('image_path')
  248. card['detection_image_path_list'][image_type] = image_data.get('detection_image_path')
  249. card['modified_image_path_list'][image_type] = image_data.get('modified_image_path')
  250. return cards
  251. def get_card_list_and_count(
  252. db_conn: PooledMySQLConnection,
  253. card_id: Optional[int],
  254. card_name: Optional[str],
  255. card_type: Optional[CardType],
  256. is_edited: Optional[bool],
  257. min_detect_score: Optional[float],
  258. max_detect_score: Optional[float],
  259. min_mod_score: Optional[float],
  260. max_mod_score: Optional[float],
  261. created_start: Optional[date],
  262. created_end: Optional[date],
  263. updated_start: Optional[date],
  264. updated_end: Optional[date],
  265. sort_by: SortBy,
  266. sort_order: SortOrder,
  267. skip: int,
  268. limit: int
  269. ) -> Dict[str, Any]:
  270. with db_conn.cursor(dictionary=True) as cursor:
  271. conditions = []
  272. params = []
  273. if card_id is not None: conditions.append("id = %s"); params.append(card_id)
  274. if card_name: conditions.append("card_name LIKE %s"); params.append(f"%{card_name}%")
  275. if card_type: conditions.append("card_type = %s"); params.append(card_type.value)
  276. if is_edited is not None: conditions.append("is_edited = %s"); params.append(is_edited)
  277. if min_detect_score is not None: conditions.append("detection_score >= %s"); params.append(min_detect_score)
  278. if max_detect_score is not None: conditions.append("detection_score <= %s"); params.append(max_detect_score)
  279. if min_mod_score is not None: conditions.append("modified_score >= %s"); params.append(min_mod_score)
  280. if max_mod_score is not None: conditions.append("modified_score <= %s"); params.append(max_mod_score)
  281. if created_start: conditions.append("DATE(created_at) >= %s"); params.append(created_start)
  282. if created_end: conditions.append("DATE(created_at) <= %s"); params.append(created_end)
  283. if updated_start: conditions.append("DATE(updated_at) >= %s"); params.append(updated_start)
  284. if updated_end: conditions.append("DATE(updated_at) <= %s"); params.append(updated_end)
  285. where_clause = ""
  286. if conditions: where_clause = " WHERE " + " AND ".join(conditions)
  287. # Count
  288. count_query = f"SELECT COUNT(*) as total FROM {settings.DB_CARD_TABLE_NAME}" + where_clause
  289. cursor.execute(count_query, tuple(params))
  290. total_count = cursor.fetchone()['total']
  291. # List
  292. data_query = f"SELECT * FROM {settings.DB_CARD_TABLE_NAME}" + where_clause
  293. data_query += f" ORDER BY {sort_by.value} {sort_order.value}, id DESC"
  294. data_query += " LIMIT %s OFFSET %s"
  295. data_params = params.copy()
  296. data_params.extend([limit, skip])
  297. cursor.execute(data_query, tuple(data_params))
  298. cards = cursor.fetchall()
  299. if cards:
  300. card_ids = [card['id'] for card in cards]
  301. format_strings = ','.join(['%s'] * len(card_ids))
  302. # 主图
  303. image_query = (
  304. f"SELECT id, card_id, image_type, image_path, detection_image_path, modified_image_path "
  305. f"FROM {settings.DB_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  306. )
  307. cursor.execute(image_query, tuple(card_ids))
  308. images = cursor.fetchall()
  309. # [NEW] 灰度图
  310. gray_query = (
  311. f"SELECT id, card_id, image_type, image_path "
  312. f"FROM {settings.DB_GRAY_IMAGE_TABLE_NAME} WHERE card_id IN ({format_strings})"
  313. )
  314. cursor.execute(gray_query, tuple(card_ids))
  315. gray_images = cursor.fetchall()
  316. images_by_card_id = {}
  317. for img in images:
  318. cid = img['card_id']
  319. if cid not in images_by_card_id: images_by_card_id[cid] = []
  320. images_by_card_id[cid].append(img)
  321. # 混入灰度图
  322. for g_img in gray_images:
  323. cid = g_img['card_id']
  324. if cid not in images_by_card_id: images_by_card_id[cid] = []
  325. g_img['detection_image_path'] = None
  326. g_img['modified_image_path'] = None
  327. images_by_card_id[cid].append(g_img)
  328. for card in cards:
  329. card['image_path_list'] = {}
  330. card['detection_image_path_list'] = {}
  331. card['modified_image_path_list'] = {}
  332. related_images = images_by_card_id.get(card['id'], [])
  333. for image_data in related_images:
  334. image_type = image_data['image_type']
  335. if image_type:
  336. card['image_path_list'][image_type] = image_data.get('image_path')
  337. card['detection_image_path_list'][image_type] = image_data.get('detection_image_path')
  338. card['modified_image_path_list'][image_type] = image_data.get('modified_image_path')
  339. return {
  340. "total": total_count,
  341. "list": cards
  342. }