users.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import base64
  2. import json
  3. from typing import List, Optional
  4. from fastapi import APIRouter, Depends, Header, HTTPException, status
  5. from mysql.connector.pooling import PooledMySQLConnection
  6. from pydantic import BaseModel, Field
  7. from app.core.config import settings
  8. from app.core.database_loader import get_db_connection
  9. from app.core.logger import get_logger
  10. logger = get_logger(__name__)
  11. router = APIRouter()
  12. db_dependency = Depends(get_db_connection)
  13. class BindCardRequest(BaseModel):
  14. user_id: int = Field(..., ge=0)
  15. card_id: List[int] = Field(..., min_length=1)
  16. def _auth_exception(detail: str = "用户认证信息无效") -> HTTPException:
  17. return HTTPException(
  18. status_code=status.HTTP_401_UNAUTHORIZED,
  19. detail=detail,
  20. headers={"WWW-Authenticate": "X-USER-BASE64"},
  21. )
  22. def _decode_user_base64(user_base64: str) -> dict:
  23. """解析外部认证系统传入的 X-USER-BASE64 用户信息。"""
  24. try:
  25. padding = "=" * (-len(user_base64) % 4)
  26. decoded_bytes = base64.urlsafe_b64decode(f"{user_base64}{padding}".encode("utf-8"))
  27. payload = json.loads(decoded_bytes.decode("utf-8"))
  28. return payload.get("user", payload)
  29. except Exception:
  30. raise _auth_exception("X-USER-BASE64 解析失败")
  31. def get_current_user(x_user_base64: Optional[str] = Header(None, alias="X-USER-BASE64")) -> dict:
  32. if not x_user_base64:
  33. raise _auth_exception("缺少 X-USER-BASE64 请求头")
  34. user_data = _decode_user_base64(x_user_base64)
  35. user_id = user_data.get("id")
  36. role_code_list = user_data.get("roleCodeList") or []
  37. if user_id is None:
  38. raise _auth_exception("X-USER-BASE64 缺少用户 id")
  39. if not isinstance(role_code_list, list):
  40. raise _auth_exception("X-USER-BASE64 的 roleCodeList 格式错误")
  41. try:
  42. user_id = int(user_id)
  43. except (TypeError, ValueError):
  44. raise _auth_exception("X-USER-BASE64 的用户 id 格式错误")
  45. return {
  46. "id": user_id,
  47. "is_admin": "admin" in role_code_list,
  48. "roleCodeList": role_code_list,
  49. "nickname": user_data.get("nickname"),
  50. "account": user_data.get("account"),
  51. "raw": user_data
  52. }
  53. def require_admin_user(current_user: dict = Depends(get_current_user)) -> dict:
  54. if not current_user.get("is_admin"):
  55. raise HTTPException(status_code=403, detail="该请求需要管理员权限")
  56. return current_user
  57. def check_card_permission(db_conn: PooledMySQLConnection, current_user: dict, card_id: int):
  58. """管理员直接放行;普通用户需要在用户-卡片绑定表中存在对应关系。"""
  59. if current_user.get("is_admin"):
  60. return
  61. with db_conn.cursor() as cursor:
  62. cursor.execute(
  63. f"SELECT 1 FROM `{settings.DB_USER_CARD_TABLE_NAME}` WHERE user_id = %s AND card_id = %s LIMIT 1",
  64. (current_user["id"], card_id)
  65. )
  66. if cursor.fetchone():
  67. return
  68. raise HTTPException(status_code=403, detail="没有该卡片权限")
  69. @router.post("/bind_card", status_code=200, summary="(管理员)给外部用户绑定卡片ID [用户调用]")
  70. def bind_card_to_user(
  71. data: BindCardRequest,
  72. current_user: dict = Depends(require_admin_user),
  73. db_conn: PooledMySQLConnection = db_dependency
  74. ):
  75. try:
  76. # 请求字段保持 card_id,但支持一次绑定多个卡片,并自动去重。
  77. card_ids = list(dict.fromkeys(data.card_id))
  78. with db_conn.cursor(dictionary=True) as cursor:
  79. format_strings = ",".join(["%s"] * len(card_ids))
  80. cursor.execute(
  81. f"SELECT id FROM `{settings.DB_CARD_TABLE_NAME}` WHERE id IN ({format_strings})",
  82. tuple(card_ids)
  83. )
  84. existing_card_ids = {row["id"] for row in cursor.fetchall()}
  85. missing_card_ids = sorted(set(card_ids) - existing_card_ids)
  86. if missing_card_ids:
  87. raise HTTPException(status_code=404, detail=f"卡片未发现: {missing_card_ids}")
  88. bind_params = [(data.user_id, card_id) for card_id in card_ids]
  89. cursor.executemany(
  90. f"INSERT IGNORE INTO `{settings.DB_USER_CARD_TABLE_NAME}` (user_id, card_id) VALUES (%s, %s)",
  91. bind_params
  92. )
  93. inserted_count = cursor.rowcount
  94. db_conn.commit()
  95. logger.info(f"Admin {current_user['id']} bound cards {card_ids} to external user {data.user_id}")
  96. return {
  97. "message": "卡片绑定成功",
  98. "user_id": data.user_id,
  99. "card_id": card_ids,
  100. "inserted_count": inserted_count
  101. }
  102. except HTTPException:
  103. db_conn.rollback()
  104. raise
  105. except Exception as e:
  106. db_conn.rollback()
  107. logger.error(f"Bind card failed: {e}")
  108. raise HTTPException(status_code=500, detail="卡片绑定失败")