import base64 import json from typing import List, Optional from fastapi import APIRouter, Depends, Header, HTTPException, status from mysql.connector.pooling import PooledMySQLConnection from pydantic import BaseModel, Field from app.core.config import settings from app.core.database_loader import get_db_connection from app.core.logger import get_logger logger = get_logger(__name__) router = APIRouter() db_dependency = Depends(get_db_connection) class BindCardRequest(BaseModel): user_id: int = Field(..., ge=0) card_id: List[int] = Field(..., min_length=1) def _auth_exception(detail: str = "用户认证信息无效") -> HTTPException: return HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers={"WWW-Authenticate": "X-USER-BASE64"}, ) def _decode_user_base64(user_base64: str) -> dict: """解析外部认证系统传入的 X-USER-BASE64 用户信息。""" try: padding = "=" * (-len(user_base64) % 4) decoded_bytes = base64.urlsafe_b64decode(f"{user_base64}{padding}".encode("utf-8")) payload = json.loads(decoded_bytes.decode("utf-8")) return payload.get("user", payload) except Exception: raise _auth_exception("X-USER-BASE64 解析失败") def get_current_user(x_user_base64: Optional[str] = Header(None, alias="X-USER-BASE64")) -> dict: if not x_user_base64: raise _auth_exception("缺少 X-USER-BASE64 请求头") user_data = _decode_user_base64(x_user_base64) user_id = user_data.get("id") role_code_list = user_data.get("roleCodeList") or [] if user_id is None: raise _auth_exception("X-USER-BASE64 缺少用户 id") if not isinstance(role_code_list, list): raise _auth_exception("X-USER-BASE64 的 roleCodeList 格式错误") try: user_id = int(user_id) except (TypeError, ValueError): raise _auth_exception("X-USER-BASE64 的用户 id 格式错误") return { "id": user_id, "is_admin": "admin" in role_code_list, "roleCodeList": role_code_list, "nickname": user_data.get("nickname"), "account": user_data.get("account"), "raw": user_data } def require_admin_user(current_user: dict = Depends(get_current_user)) -> dict: if not current_user.get("is_admin"): raise HTTPException(status_code=403, detail="该请求需要管理员权限") return current_user def check_card_permission(db_conn: PooledMySQLConnection, current_user: dict, card_id: int): """管理员直接放行;普通用户需要在用户-卡片绑定表中存在对应关系。""" if current_user.get("is_admin"): return with db_conn.cursor() as cursor: cursor.execute( f"SELECT 1 FROM `{settings.DB_USER_CARD_TABLE_NAME}` WHERE user_id = %s AND card_id = %s LIMIT 1", (current_user["id"], card_id) ) if cursor.fetchone(): return raise HTTPException(status_code=403, detail="没有该卡片权限") @router.post("/bind_card", status_code=200, summary="(管理员)给外部用户绑定卡片ID [用户调用]") def bind_card_to_user( data: BindCardRequest, current_user: dict = Depends(require_admin_user), db_conn: PooledMySQLConnection = db_dependency ): try: # 请求字段保持 card_id,但支持一次绑定多个卡片,并自动去重。 card_ids = list(dict.fromkeys(data.card_id)) with db_conn.cursor(dictionary=True) as cursor: format_strings = ",".join(["%s"] * len(card_ids)) cursor.execute( f"SELECT id FROM `{settings.DB_CARD_TABLE_NAME}` WHERE id IN ({format_strings})", tuple(card_ids) ) existing_card_ids = {row["id"] for row in cursor.fetchall()} missing_card_ids = sorted(set(card_ids) - existing_card_ids) if missing_card_ids: raise HTTPException(status_code=404, detail=f"卡片未发现: {missing_card_ids}") bind_params = [(data.user_id, card_id) for card_id in card_ids] cursor.executemany( f"INSERT IGNORE INTO `{settings.DB_USER_CARD_TABLE_NAME}` (user_id, card_id) VALUES (%s, %s)", bind_params ) inserted_count = cursor.rowcount db_conn.commit() logger.info(f"Admin {current_user['id']} bound cards {card_ids} to external user {data.user_id}") return { "message": "卡片绑定成功", "user_id": data.user_id, "card_id": card_ids, "inserted_count": inserted_count } except HTTPException: db_conn.rollback() raise except Exception as e: db_conn.rollback() logger.error(f"Bind card failed: {e}") raise HTTPException(status_code=500, detail="卡片绑定失败")