users.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import base64
  2. import json
  3. from typing import List, Optional
  4. from fastapi import APIRouter, Depends, Header, HTTPException, status
  5. from mysql.connector.pooling import PooledMySQLConnection
  6. from pydantic import BaseModel, Field
  7. from app.core.config import settings
  8. from app.core.database_loader import get_db_connection
  9. from app.core.logger import get_logger
  10. logger = get_logger(__name__)
  11. router = APIRouter()
  12. db_dependency = Depends(get_db_connection)
  13. class BindCardRequest(BaseModel):
  14. user_id: int = Field(..., ge=0)
  15. bind_card_id: List[int] = Field(default_factory=list)
  16. unbind_card_id: List[int] = Field(default_factory=list)
  17. def _auth_exception(detail: str = "用户认证信息无效") -> HTTPException:
  18. return HTTPException(
  19. status_code=status.HTTP_401_UNAUTHORIZED,
  20. detail=detail,
  21. headers={"WWW-Authenticate": "X-USER-BASE64"},
  22. )
  23. def _decode_user_base64(user_base64: str) -> dict:
  24. """解析外部认证系统传入的 X-USER-BASE64 用户信息。"""
  25. try:
  26. padding = "=" * (-len(user_base64) % 4)
  27. decoded_bytes = base64.urlsafe_b64decode(f"{user_base64}{padding}".encode("utf-8"))
  28. payload = json.loads(decoded_bytes.decode("utf-8"))
  29. return payload.get("user", payload)
  30. except Exception:
  31. raise _auth_exception("X-USER-BASE64 解析失败")
  32. def get_current_user(x_user_base64: Optional[str] = Header(None, alias="X-USER-BASE64")) -> dict:
  33. if not x_user_base64:
  34. raise _auth_exception("缺少 X-USER-BASE64 请求头")
  35. user_data = _decode_user_base64(x_user_base64)
  36. user_id = user_data.get("id")
  37. role_code_list = user_data.get("roleCodeList") or []
  38. if user_id is None:
  39. raise _auth_exception("X-USER-BASE64 缺少用户 id")
  40. if not isinstance(role_code_list, list):
  41. raise _auth_exception("X-USER-BASE64 的 roleCodeList 格式错误")
  42. try:
  43. user_id = int(user_id)
  44. except (TypeError, ValueError):
  45. raise _auth_exception("X-USER-BASE64 的用户 id 格式错误")
  46. return {
  47. "id": user_id,
  48. "is_admin": "admin" in role_code_list,
  49. "roleCodeList": role_code_list,
  50. "nickname": user_data.get("nickname"),
  51. "account": user_data.get("account"),
  52. "raw": user_data
  53. }
  54. def require_admin_user(current_user: dict = Depends(get_current_user)) -> dict:
  55. if not current_user.get("is_admin"):
  56. raise HTTPException(status_code=403, detail="该请求需要管理员权限")
  57. return current_user
  58. def check_card_permission(db_conn: PooledMySQLConnection, current_user: dict, card_id: int):
  59. """管理员直接放行;普通用户需要在用户-卡片绑定表中存在对应关系。"""
  60. if current_user.get("is_admin"):
  61. return
  62. with db_conn.cursor() as cursor:
  63. cursor.execute(
  64. f"SELECT 1 FROM `{settings.DB_USER_CARD_TABLE_NAME}` WHERE user_id = %s AND card_id = %s LIMIT 1",
  65. (current_user["id"], card_id)
  66. )
  67. if cursor.fetchone():
  68. return
  69. raise HTTPException(status_code=403, detail="没有该卡片权限")
  70. @router.post("/bind_card", status_code=200, summary="(管理员)给外部用户绑定卡片ID [用户调用]")
  71. def bind_card_to_user(
  72. data: BindCardRequest,
  73. current_user: dict = Depends(require_admin_user),
  74. db_conn: PooledMySQLConnection = db_dependency
  75. ):
  76. try:
  77. # 先解绑再绑定;如果同一张卡同时出现在两个列表,最终以绑定列表为准。
  78. bind_card_ids = list(dict.fromkeys(data.bind_card_id))
  79. unbind_card_ids = list(dict.fromkeys(data.unbind_card_id))
  80. operation_card_ids = list(dict.fromkeys(unbind_card_ids + bind_card_ids))
  81. if not operation_card_ids:
  82. raise HTTPException(status_code=400, detail="绑定列表和解绑列表不能同时为空")
  83. with db_conn.cursor(dictionary=True) as cursor:
  84. format_strings = ",".join(["%s"] * len(operation_card_ids))
  85. cursor.execute(
  86. f"SELECT id FROM `{settings.DB_CARD_TABLE_NAME}` WHERE id IN ({format_strings})",
  87. tuple(operation_card_ids)
  88. )
  89. existing_card_ids = {row["id"] for row in cursor.fetchall()}
  90. missing_card_ids = sorted(set(operation_card_ids) - existing_card_ids)
  91. if missing_card_ids:
  92. raise HTTPException(status_code=404, detail=f"卡片未发现: {missing_card_ids}")
  93. deleted_count = 0
  94. if unbind_card_ids:
  95. unbind_format_strings = ",".join(["%s"] * len(unbind_card_ids))
  96. cursor.execute(
  97. f"DELETE FROM `{settings.DB_USER_CARD_TABLE_NAME}` "
  98. f"WHERE user_id = %s AND card_id IN ({unbind_format_strings})",
  99. tuple([data.user_id] + unbind_card_ids)
  100. )
  101. deleted_count = cursor.rowcount
  102. inserted_count = 0
  103. if bind_card_ids:
  104. bind_params = [(data.user_id, card_id) for card_id in bind_card_ids]
  105. cursor.executemany(
  106. f"INSERT IGNORE INTO `{settings.DB_USER_CARD_TABLE_NAME}` (user_id, card_id) VALUES (%s, %s)",
  107. bind_params
  108. )
  109. inserted_count = cursor.rowcount
  110. db_conn.commit()
  111. logger.info(
  112. f"Admin {current_user['id']} updated card bindings for external user {data.user_id}: "
  113. f"bind={bind_card_ids}, unbind={unbind_card_ids}"
  114. )
  115. return {
  116. "message": "卡片绑定关系更新成功",
  117. "user_id": data.user_id,
  118. "bind_card_id": bind_card_ids,
  119. "unbind_card_id": unbind_card_ids,
  120. "deleted_count": deleted_count,
  121. "inserted_count": inserted_count
  122. }
  123. except HTTPException:
  124. db_conn.rollback()
  125. raise
  126. except Exception as e:
  127. db_conn.rollback()
  128. logger.error(f"Bind card failed: {e}")
  129. raise HTTPException(status_code=500, detail="卡片绑定失败")