import base64 import json from typing import List, Optional from fastapi import APIRouter, Depends, Header, HTTPException, status from mysql.connector.pooling import PooledMySQLConnection from pydantic import BaseModel, Field from app.core.config import settings from app.core.database_loader import get_db_connection from app.core.logger import get_logger logger = get_logger(__name__) router = APIRouter() db_dependency = Depends(get_db_connection) class BindCardRequest(BaseModel): user_id: int = Field(..., ge=0) bind_card_id: List[int] = Field(default_factory=list) unbind_card_id: List[int] = Field(default_factory=list) def _auth_exception(detail: str = "用户认证信息无效") -> HTTPException: return HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers={"WWW-Authenticate": "X-USER-BASE64"}, ) def _decode_user_base64(user_base64: str) -> dict: """解析外部认证系统传入的 X-USER-BASE64 用户信息。""" try: padding = "=" * (-len(user_base64) % 4) decoded_bytes = base64.urlsafe_b64decode(f"{user_base64}{padding}".encode("utf-8")) payload = json.loads(decoded_bytes.decode("utf-8")) return payload.get("user", payload) except Exception: raise _auth_exception("X-USER-BASE64 解析失败") def get_current_user(x_user_base64: Optional[str] = Header(None, alias="X-USER-BASE64")) -> dict: if not x_user_base64: raise _auth_exception("缺少 X-USER-BASE64 请求头") user_data = _decode_user_base64(x_user_base64) user_id = user_data.get("id") role_code_list = user_data.get("roleCodeList") or [] if user_id is None: raise _auth_exception("X-USER-BASE64 缺少用户 id") if not isinstance(role_code_list, list): raise _auth_exception("X-USER-BASE64 的 roleCodeList 格式错误") try: user_id = int(user_id) except (TypeError, ValueError): raise _auth_exception("X-USER-BASE64 的用户 id 格式错误") return { "id": user_id, "is_admin": "admin" in role_code_list, "roleCodeList": role_code_list, "nickname": user_data.get("nickname"), "account": user_data.get("account"), "raw": user_data } def require_admin_user(current_user: dict = Depends(get_current_user)) -> dict: if not current_user.get("is_admin"): raise HTTPException(status_code=403, detail="该请求需要管理员权限") return current_user def check_card_permission(db_conn: PooledMySQLConnection, current_user: dict, card_id: int): """管理员直接放行;普通用户需要在用户-卡片绑定表中存在对应关系。""" if current_user.get("is_admin"): return with db_conn.cursor() as cursor: cursor.execute( f"SELECT 1 FROM `{settings.DB_USER_CARD_TABLE_NAME}` WHERE user_id = %s AND card_id = %s LIMIT 1", (current_user["id"], card_id) ) if cursor.fetchone(): return raise HTTPException(status_code=403, detail="没有该卡片权限") @router.post("/bind_card", status_code=200, summary="(管理员)给外部用户绑定卡片ID [用户调用]") def bind_card_to_user( data: BindCardRequest, current_user: dict = Depends(require_admin_user), db_conn: PooledMySQLConnection = db_dependency ): try: # 先解绑再绑定;如果同一张卡同时出现在两个列表,最终以绑定列表为准。 bind_card_ids = list(dict.fromkeys(data.bind_card_id)) unbind_card_ids = list(dict.fromkeys(data.unbind_card_id)) operation_card_ids = list(dict.fromkeys(unbind_card_ids + bind_card_ids)) if not operation_card_ids: raise HTTPException(status_code=400, detail="绑定列表和解绑列表不能同时为空") with db_conn.cursor(dictionary=True) as cursor: format_strings = ",".join(["%s"] * len(operation_card_ids)) cursor.execute( f"SELECT id FROM `{settings.DB_CARD_TABLE_NAME}` WHERE id IN ({format_strings})", tuple(operation_card_ids) ) existing_card_ids = {row["id"] for row in cursor.fetchall()} missing_card_ids = sorted(set(operation_card_ids) - existing_card_ids) if missing_card_ids: raise HTTPException(status_code=404, detail=f"卡片未发现: {missing_card_ids}") deleted_count = 0 if unbind_card_ids: unbind_format_strings = ",".join(["%s"] * len(unbind_card_ids)) cursor.execute( f"DELETE FROM `{settings.DB_USER_CARD_TABLE_NAME}` " f"WHERE user_id = %s AND card_id IN ({unbind_format_strings})", tuple([data.user_id] + unbind_card_ids) ) deleted_count = cursor.rowcount inserted_count = 0 if bind_card_ids: bind_params = [(data.user_id, card_id) for card_id in bind_card_ids] cursor.executemany( f"INSERT IGNORE INTO `{settings.DB_USER_CARD_TABLE_NAME}` (user_id, card_id) VALUES (%s, %s)", bind_params ) inserted_count = cursor.rowcount db_conn.commit() logger.info( f"Admin {current_user['id']} updated card bindings for external user {data.user_id}: " f"bind={bind_card_ids}, unbind={unbind_card_ids}" ) return { "message": "卡片绑定关系更新成功", "user_id": data.user_id, "bind_card_id": bind_card_ids, "unbind_card_id": unbind_card_ids, "deleted_count": deleted_count, "inserted_count": inserted_count } except HTTPException: db_conn.rollback() raise except Exception as e: db_conn.rollback() logger.error(f"Bind card failed: {e}") raise HTTPException(status_code=500, detail="卡片绑定失败")