| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- import base64
- import json
- from typing import List, Optional
- from fastapi import APIRouter, Depends, Header, HTTPException, status
- from mysql.connector.pooling import PooledMySQLConnection
- from pydantic import BaseModel, Field
- from app.core.config import settings
- from app.core.database_loader import get_db_connection
- from app.core.logger import get_logger
- logger = get_logger(__name__)
- router = APIRouter()
- db_dependency = Depends(get_db_connection)
- class BindCardRequest(BaseModel):
- user_id: int = Field(..., ge=0)
- card_id: List[int] = Field(..., min_length=1)
- def _auth_exception(detail: str = "用户认证信息无效") -> HTTPException:
- return HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=detail,
- headers={"WWW-Authenticate": "X-USER-BASE64"},
- )
- def _decode_user_base64(user_base64: str) -> dict:
- """解析外部认证系统传入的 X-USER-BASE64 用户信息。"""
- try:
- padding = "=" * (-len(user_base64) % 4)
- decoded_bytes = base64.urlsafe_b64decode(f"{user_base64}{padding}".encode("utf-8"))
- payload = json.loads(decoded_bytes.decode("utf-8"))
- return payload.get("user", payload)
- except Exception:
- raise _auth_exception("X-USER-BASE64 解析失败")
- def get_current_user(x_user_base64: Optional[str] = Header(None, alias="X-USER-BASE64")) -> dict:
- if not x_user_base64:
- raise _auth_exception("缺少 X-USER-BASE64 请求头")
- user_data = _decode_user_base64(x_user_base64)
- user_id = user_data.get("id")
- role_code_list = user_data.get("roleCodeList") or []
- if user_id is None:
- raise _auth_exception("X-USER-BASE64 缺少用户 id")
- if not isinstance(role_code_list, list):
- raise _auth_exception("X-USER-BASE64 的 roleCodeList 格式错误")
- try:
- user_id = int(user_id)
- except (TypeError, ValueError):
- raise _auth_exception("X-USER-BASE64 的用户 id 格式错误")
- return {
- "id": user_id,
- "is_admin": "admin" in role_code_list,
- "roleCodeList": role_code_list,
- "nickname": user_data.get("nickname"),
- "account": user_data.get("account"),
- "raw": user_data
- }
- def require_admin_user(current_user: dict = Depends(get_current_user)) -> dict:
- if not current_user.get("is_admin"):
- raise HTTPException(status_code=403, detail="该请求需要管理员权限")
- return current_user
- def check_card_permission(db_conn: PooledMySQLConnection, current_user: dict, card_id: int):
- """管理员直接放行;普通用户需要在用户-卡片绑定表中存在对应关系。"""
- if current_user.get("is_admin"):
- return
- with db_conn.cursor() as cursor:
- cursor.execute(
- f"SELECT 1 FROM `{settings.DB_USER_CARD_TABLE_NAME}` WHERE user_id = %s AND card_id = %s LIMIT 1",
- (current_user["id"], card_id)
- )
- if cursor.fetchone():
- return
- raise HTTPException(status_code=403, detail="没有该卡片权限")
- @router.post("/bind_card", status_code=200, summary="给外部用户绑定卡片ID [用户调用]")
- def bind_card_to_user(
- data: BindCardRequest,
- current_user: dict = Depends(require_admin_user),
- db_conn: PooledMySQLConnection = db_dependency
- ):
- try:
- # 请求字段保持 card_id,但支持一次绑定多个卡片,并自动去重。
- card_ids = list(dict.fromkeys(data.card_id))
- with db_conn.cursor(dictionary=True) as cursor:
- format_strings = ",".join(["%s"] * len(card_ids))
- cursor.execute(
- f"SELECT id FROM `{settings.DB_CARD_TABLE_NAME}` WHERE id IN ({format_strings})",
- tuple(card_ids)
- )
- existing_card_ids = {row["id"] for row in cursor.fetchall()}
- missing_card_ids = sorted(set(card_ids) - existing_card_ids)
- if missing_card_ids:
- raise HTTPException(status_code=404, detail=f"卡片未发现: {missing_card_ids}")
- bind_params = [(data.user_id, card_id) for card_id in card_ids]
- cursor.executemany(
- f"INSERT IGNORE INTO `{settings.DB_USER_CARD_TABLE_NAME}` (user_id, card_id) VALUES (%s, %s)",
- bind_params
- )
- inserted_count = cursor.rowcount
- db_conn.commit()
- logger.info(f"Admin {current_user['id']} bound cards {card_ids} to external user {data.user_id}")
- return {
- "message": "卡片绑定成功",
- "user_id": data.user_id,
- "card_id": card_ids,
- "inserted_count": inserted_count
- }
- except HTTPException:
- db_conn.rollback()
- raise
- except Exception as e:
- db_conn.rollback()
- logger.error(f"Bind card failed: {e}")
- raise HTTPException(status_code=500, detail="卡片绑定失败")
|