image_data.py 14 KB


  1. import os
  2. import uuid
  3. import json
  4. from datetime import date, datetime
  5. from typing import Optional, Dict, Any, List
  6. from fastapi import APIRouter, File, UploadFile, Depends, HTTPException, Form, Query
  7. from fastapi.responses import JSONResponse, FileResponse
  8. from pydantic import BaseModel, field_validator
  9. from mysql.connector.pooling import PooledMySQLConnection
  10. from app.core.config import settings
  11. from app.core.logger import get_logger
  12. from app.core.database_loader import get_db_connection
  13. # --- 初始化 ---
  14. logger = get_logger(__name__)
  15. router = APIRouter()
  16. # 创建一个依赖项的别名
  17. db_dependency = Depends(get_db_connection)
  18. # --- Pydantic 数据模型 ---
  19. class ImageRecordResponse(BaseModel):
  20. """用于API响应的数据模型,确保数据结构一致"""
  21. img_id: int
  22. img_name: Optional[str] = None
  23. img_path: str
  24. img_result_json: Dict[str, Any]
  25. created_at: datetime
  26. @field_validator('img_result_json', mode='before')
  27. @classmethod
  28. def parse_json_string(cls, v):
  29. """
  30. 这个验证器会在Pydantic进行类型检查之前运行 (因为 pre=True)。
  31. 它负责将从数据库取出的JSON字符串转换为Python字典。
  32. """
  33. if isinstance(v, str):
  34. try:
  35. return json.loads(v)
  36. except json.JSONDecodeError:
  37. # 如果数据库中的JSON格式错误,则抛出异常
  38. raise ValueError("Invalid JSON string in database")
  39. return v
  40. # --- FIX ENDS HERE ---
  41. # --- 辅助函数 ---
  42. def map_row_to_model(row: tuple, columns: List[str]) -> ImageRecordResponse:
  43. """将数据库查询出的一行数据映射到Pydantic模型"""
  44. row_dict = dict(zip(columns, row))
  45. # 现在当 ImageRecordResponse 被调用时,上面的验证器会先生效
  46. return ImageRecordResponse(**row_dict)
  47. # --- API 接口实现 ---
  48. # 1: 存储图片和JSON数据
  49. @router.post("/insert", status_code=201, summary="1. 存储图片和JSON数据")
  50. async def create_image_data(
  51. image: UploadFile = File(..., description="要上传的图片文件"),
  52. json_data_str: str = Form(..., description="与图片关联的JSON格式字符串"),
  53. img_name: Optional[str] = Form(None, description="图片的可选名称"),
  54. db_conn: PooledMySQLConnection = db_dependency
  55. ):
  56. """
  57. 接收图片和JSON数据,存入数据库。
  58. - 图片存储在 `Data` 目录。
  59. - 记录存入MySQL,`img_id` 自动生成。
  60. """
  61. try:
  62. img_result_json = json.loads(json_data_str)
  63. except json.JSONDecodeError:
  64. raise HTTPException(status_code=400, detail="`json_data_str` 格式无效。")
  65. # 生成唯一文件名并保存图片
  66. file_extension = os.path.splitext(image.filename)[1]
  67. unique_filename = f"{uuid.uuid4()}{file_extension}"
  68. image_path = settings.DATA_DIR / unique_filename
  69. try:
  70. with open(image_path, "wb") as buffer:
  71. content = await image.read()
  72. buffer.write(content)
  73. logger.info(f"图片已保存到: {image_path}")
  74. except Exception as e:
  75. logger.error(f"保存图片失败: {e}")
  76. raise HTTPException(status_code=500, detail="无法保存图片文件。")
  77. # 将记录插入数据库
  78. cursor = None
  79. try:
  80. cursor = db_conn.cursor()
  81. query = (
  82. f"INSERT INTO {settings.DB_TABLE_NAME} (img_name, img_path, img_result_json) "
  83. "VALUES (%s, %s, %s)"
  84. )
  85. # 确保存入数据库的是JSON字符串
  86. params = (img_name, str(image_path), json.dumps(img_result_json, ensure_ascii=False))
  87. cursor.execute(query, params)
  88. db_conn.commit()
  89. new_id = cursor.lastrowid
  90. logger.info(f"新记录已创建, ID: {new_id}")
  91. return {"message": "成功存储图片和数据", "img_id": new_id}
  92. except Exception as e:
  93. db_conn.rollback()
  94. logger.error(f"数据库插入失败: {e}")
  95. if os.path.exists(image_path):
  96. os.remove(image_path)
  97. raise HTTPException(status_code=500, detail="数据库插入失败。")
  98. finally:
  99. if cursor:
  100. cursor.close()
  101. # 2: 获取数据列表 (带筛选)
  102. @router.get("/data_list", response_model=List[ImageRecordResponse], summary="2. 获取数据列表 (可筛选)")
  103. def list_image_records(
  104. start_id: Optional[int] = Query(None, description="筛选条件:起始img_id"),
  105. end_id: Optional[int] = Query(None, description="筛选条件:结束img_id"),
  106. name_like: Optional[str] = Query(None, description="筛选条件:名称模糊搜索"),
  107. start_date: Optional[date] = Query(None, description="筛选条件:起始日期 (YYYY-MM-DD)"),
  108. end_date: Optional[date] = Query(None, description="筛选条件:结束日期 (YYYY-MM-DD)"),
  109. skip: int = Query(0, ge=0, description="分页:跳过的记录数"),
  110. limit: int = Query(100, ge=1, le=1000, description="分页:每页的记录数"),
  111. db_conn: PooledMySQLConnection = db_dependency
  112. ):
  113. """
  114. 根据多个可选条件查询记录列表,并支持分页。
  115. """
  116. query = f"SELECT * FROM {settings.DB_TABLE_NAME}"
  117. conditions = []
  118. params = []
  119. if start_id is not None:
  120. conditions.append("img_id >= %s")
  121. params.append(start_id)
  122. if end_id is not None:
  123. conditions.append("img_id <= %s")
  124. params.append(end_id)
  125. if name_like:
  126. conditions.append("img_name LIKE %s")
  127. params.append(f"%{name_like}%")
  128. if start_date:
  129. conditions.append("created_at >= %s")
  130. params.append(start_date)
  131. if end_date:
  132. conditions.append("created_at < DATE_ADD(%s, INTERVAL 1 DAY)")
  133. params.append(end_date)
  134. if conditions:
  135. query += " WHERE " + " AND ".join(conditions)
  136. query += " ORDER BY img_id DESC LIMIT %s OFFSET %s"
  137. params.extend([limit, skip])
  138. cursor = None
  139. try:
  140. cursor = db_conn.cursor()
  141. cursor.execute(query, tuple(params))
  142. columns = [desc[0] for desc in cursor.description]
  143. return [map_row_to_model(row, columns) for row in cursor.fetchall()]
  144. except Exception as e:
  145. logger.error(f"查询列表失败: {e}")
  146. raise HTTPException(status_code=500, detail="获取数据列表失败。")
  147. finally:
  148. if cursor:
  149. cursor.close()
  150. # 2: 根据img_id查询
  151. @router.get("/query/{img_id}", response_model=ImageRecordResponse, summary="2. 根据img_id查询完整记录")
  152. def get_record_by_id(img_id: int, db_conn: PooledMySQLConnection = db_dependency):
  153. """获取指定ID的完整数据库记录。"""
  154. cursor = None
  155. try:
  156. cursor = db_conn.cursor()
  157. query = f"SELECT * FROM {settings.DB_TABLE_NAME} WHERE img_id = %s"
  158. cursor.execute(query, (img_id,))
  159. result = cursor.fetchone()
  160. if not result:
  161. raise HTTPException(status_code=404, detail=f"ID为 {img_id} 的记录未找到。")
  162. columns = [desc[0] for desc in cursor.description]
  163. return map_row_to_model(result, columns)
  164. except Exception as e:
  165. logger.error(f"ID查询失败 ({img_id}): {e}")
  166. if isinstance(e, HTTPException): raise e
  167. raise HTTPException(status_code=500, detail="数据库查询失败。")
  168. finally:
  169. if cursor:
  170. cursor.close()
  171. # 3: 根据img_name查询
  172. @router.get("/query/name/{img_name}", response_model=List[ImageRecordResponse], summary="3. 根据img_name查询记录")
  173. def get_records_by_name(img_name: str, db_conn: PooledMySQLConnection = db_dependency):
  174. """获取所有与指定名称匹配的记录列表。"""
  175. cursor = None
  176. try:
  177. cursor = db_conn.cursor()
  178. query = f"SELECT * FROM {settings.DB_TABLE_NAME} WHERE img_name = %s"
  179. cursor.execute(query, (img_name,))
  180. results = cursor.fetchall()
  181. if not results:
  182. return [] # 未找到则返回空列表
  183. columns = [desc[0] for desc in cursor.description]
  184. return [map_row_to_model(row, columns) for row in results]
  185. except Exception as e:
  186. logger.error(f"名称查询失败 ({img_name}): {e}")
  187. raise HTTPException(status_code=500, detail="数据库查询失败。")
  188. finally:
  189. if cursor:
  190. cursor.close()
  191. # ... (其他接口代码保持不变) ...
  192. # 5: 修改JSON数据
  193. @router.put("/update/json/{img_id}", status_code=200, summary="5. 修改指定ID记录的JSON数据")
  194. def update_record_json(
  195. img_id: int,
  196. new_json_data: Dict[str, Any],
  197. db_conn: PooledMySQLConnection = db_dependency
  198. ):
  199. """根据img_id,用请求体中的新JSON覆盖原有的JSON数据。"""
  200. cursor = None
  201. try:
  202. cursor = db_conn.cursor()
  203. new_json_str = json.dumps(new_json_data, ensure_ascii=False)
  204. query = f"UPDATE {settings.DB_TABLE_NAME} SET img_result_json = %s WHERE img_id = %s"
  205. cursor.execute(query, (new_json_str, img_id))
  206. if cursor.rowcount == 0:
  207. raise HTTPException(status_code=404, detail=f"ID为 {img_id} 的记录未找到。")
  208. db_conn.commit()
  209. logger.info(f"ID {img_id} 的JSON数据已更新。")
  210. return {"message": f"成功更新 ID {img_id} 的JSON数据"}
  211. except Exception as e:
  212. db_conn.rollback()
  213. logger.error(f"更新JSON失败 ({img_id}): {e}")
  214. if isinstance(e, HTTPException): raise e
  215. raise HTTPException(status_code=500, detail="更新JSON数据失败。")
  216. finally:
  217. if cursor:
  218. cursor.close()
  219. # 6: 获取图片文件
  220. @router.get("/image/{img_id}", summary="6. 获取指定ID的图片文件")
  221. def get_image_file(img_id: int, db_conn: PooledMySQLConnection = db_dependency):
  222. """根据img_id查找记录,并返回对应的图片文件。"""
  223. cursor = None
  224. try:
  225. cursor = db_conn.cursor()
  226. query = f"SELECT img_path FROM {settings.DB_TABLE_NAME} WHERE img_id = %s"
  227. cursor.execute(query, (img_id,))
  228. result = cursor.fetchone()
  229. if not result:
  230. raise HTTPException(status_code=404, detail=f"ID为 {img_id} 的记录未找到。")
  231. image_path = result[0]
  232. if not os.path.exists(image_path):
  233. logger.error(f"文件在服务器上未找到: {image_path} (数据库ID: {img_id})")
  234. raise HTTPException(status_code=404, detail="图片文件在服务器上不存在。")
  235. return FileResponse(image_path)
  236. except Exception as e:
  237. logger.error(f"获取图片失败 ({img_id}): {e}")
  238. if isinstance(e, HTTPException): raise e
  239. raise HTTPException(status_code=500, detail="获取图片文件失败。")
  240. finally:
  241. if cursor:
  242. cursor.close()
  243. # 7: 获取JSON数据
  244. @router.get("/json/{img_id}", summary="7. 获取指定ID的JSON数据")
  245. def get_record_json(img_id: int, db_conn: PooledMySQLConnection = db_dependency):
  246. """根据img_id查找记录,并仅返回其JSON数据部分。"""
  247. cursor = None
  248. try:
  249. cursor = db_conn.cursor()
  250. query = f"SELECT img_result_json FROM {settings.DB_TABLE_NAME} WHERE img_id = %s"
  251. cursor.execute(query, (img_id,))
  252. result = cursor.fetchone()
  253. if not result:
  254. raise HTTPException(status_code=404, detail=f"ID为 {img_id} 的记录未找到。")
  255. json_data = json.loads(result[0])
  256. return JSONResponse(content=json_data)
  257. except Exception as e:
  258. logger.error(f"获取JSON失败 ({img_id}): {e}")
  259. if isinstance(e, HTTPException): raise e
  260. raise HTTPException(status_code=500, detail="获取JSON数据失败。")
  261. finally:
  262. if cursor:
  263. cursor.close()
  264. # 8: 根据 img_id 删除记录
  265. @router.delete("/delete/{img_id}", status_code=200, summary="8. 根据img_id删除记录及其图片")
  266. def delete_record_by_id(
  267. img_id: int,
  268. db_conn: PooledMySQLConnection = db_dependency
  269. ):
  270. """
  271. 根据img_id删除数据库记录以及存储在服务器上的对应图片文件。
  272. 这是一个原子操作,如果文件删除失败,数据库更改将回滚。
  273. """
  274. cursor = None
  275. try:
  276. cursor = db_conn.cursor()
  277. # 1. 先查询记录,获取文件路径,并确认记录存在
  278. query_path = f"SELECT img_path FROM {settings.DB_TABLE_NAME} WHERE img_id = %s"
  279. cursor.execute(query_path, (img_id,))
  280. result = cursor.fetchone()
  281. if not result:
  282. raise HTTPException(status_code=404, detail=f"ID为 {img_id} 的记录未找到。")
  283. image_path = result[0]
  284. # 2. 删除数据库记录
  285. query_delete = f"DELETE FROM {settings.DB_TABLE_NAME} WHERE img_id = %s"
  286. cursor.execute(query_delete, (img_id,))
  287. # 3. 删除对应的图片文件(如果存在)
  288. if os.path.exists(image_path):
  289. try:
  290. os.remove(image_path)
  291. logger.info(f"图片文件已删除: {image_path}")
  292. except OSError as e:
  293. # 如果文件删除失败,回滚数据库操作以保持一致性
  294. db_conn.rollback()
  295. logger.error(f"删除文件失败: {image_path}. 数据库操作已回滚。错误: {e}")
  296. raise HTTPException(status_code=500, detail="删除文件失败,数据库操作已回滚。")
  297. else:
  298. logger.warning(f"数据库记录指向的文件不存在,无需删除: {image_path} (ID: {img_id})")
  299. # 4. 提交事务
  300. db_conn.commit()
  301. logger.info(f"ID {img_id} 的记录和关联文件已成功删除。")
  302. return {"message": f"成功删除 ID {img_id} 的记录及其关联文件"}
  303. except Exception as e:
  304. db_conn.rollback()
  305. logger.error(f"删除记录失败 ({img_id}): {e}")
  306. if isinstance(e, HTTPException):
  307. raise e
  308. raise HTTPException(status_code=500, detail="删除记录时发生服务器内部错误。")
  309. finally:
  310. if cursor:
  311. cursor.close()