| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- import base64
- import json
- from typing import List, Optional
- from fastapi import APIRouter, Depends, Header, HTTPException, status
- from mysql.connector.pooling import PooledMySQLConnection
- from pydantic import BaseModel, Field
- from app.core.config import settings
- from app.core.database_loader import get_db_connection
- from app.core.logger import get_logger
- logger = get_logger(__name__)
- router = APIRouter()
- db_dependency = Depends(get_db_connection)
- class BindCardRequest(BaseModel):
- user_id: int = Field(..., ge=0)
- bind_card_id: List[int] = Field(default_factory=list)
- unbind_card_id: List[int] = Field(default_factory=list)
- def _auth_exception(detail: str = "用户认证信息无效") -> HTTPException:
- return HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=detail,
- headers={"WWW-Authenticate": "X-USER-BASE64"},
- )
- def _decode_user_base64(user_base64: str) -> dict:
- """解析外部认证系统传入的 X-USER-BASE64 用户信息。"""
- try:
- padding = "=" * (-len(user_base64) % 4)
- decoded_bytes = base64.urlsafe_b64decode(f"{user_base64}{padding}".encode("utf-8"))
- payload = json.loads(decoded_bytes.decode("utf-8"))
- return payload.get("user", payload)
- except Exception:
- raise _auth_exception("X-USER-BASE64 解析失败")
- def get_current_user(x_user_base64: Optional[str] = Header(None, alias="X-USER-BASE64")) -> dict:
- if not x_user_base64:
- raise _auth_exception("缺少 X-USER-BASE64 请求头")
- user_data = _decode_user_base64(x_user_base64)
- user_id = user_data.get("id")
- role_code_list = user_data.get("roleCodeList") or []
- if user_id is None:
- raise _auth_exception("X-USER-BASE64 缺少用户 id")
- if not isinstance(role_code_list, list):
- raise _auth_exception("X-USER-BASE64 的 roleCodeList 格式错误")
- try:
- user_id = int(user_id)
- except (TypeError, ValueError):
- raise _auth_exception("X-USER-BASE64 的用户 id 格式错误")
- return {
- "id": user_id,
- "is_admin": "admin" in role_code_list,
- "roleCodeList": role_code_list,
- "nickname": user_data.get("nickname"),
- "account": user_data.get("account"),
- "raw": user_data
- }
- def require_admin_user(current_user: dict = Depends(get_current_user)) -> dict:
- if not current_user.get("is_admin"):
- raise HTTPException(status_code=403, detail="该请求需要管理员权限")
- return current_user
- def check_card_permission(db_conn: PooledMySQLConnection, current_user: dict, card_id: int):
- """管理员直接放行;普通用户需要在用户-卡片绑定表中存在对应关系。"""
- if current_user.get("is_admin"):
- return
- with db_conn.cursor() as cursor:
- cursor.execute(
- f"SELECT 1 FROM `{settings.DB_USER_CARD_TABLE_NAME}` WHERE user_id = %s AND card_id = %s LIMIT 1",
- (current_user["id"], card_id)
- )
- if cursor.fetchone():
- return
- raise HTTPException(status_code=403, detail="没有该卡片权限")
- @router.post("/bind_card", status_code=200, summary="(管理员)给外部用户绑定卡片ID [用户调用]")
- def bind_card_to_user(
- data: BindCardRequest,
- current_user: dict = Depends(require_admin_user),
- db_conn: PooledMySQLConnection = db_dependency
- ):
- try:
- # 先解绑再绑定;如果同一张卡同时出现在两个列表,最终以绑定列表为准。
- bind_card_ids = list(dict.fromkeys(data.bind_card_id))
- unbind_card_ids = list(dict.fromkeys(data.unbind_card_id))
- operation_card_ids = list(dict.fromkeys(unbind_card_ids + bind_card_ids))
- if not operation_card_ids:
- raise HTTPException(status_code=400, detail="绑定列表和解绑列表不能同时为空")
- with db_conn.cursor(dictionary=True) as cursor:
- format_strings = ",".join(["%s"] * len(operation_card_ids))
- cursor.execute(
- f"SELECT id FROM `{settings.DB_CARD_TABLE_NAME}` WHERE id IN ({format_strings})",
- tuple(operation_card_ids)
- )
- existing_card_ids = {row["id"] for row in cursor.fetchall()}
- missing_card_ids = sorted(set(operation_card_ids) - existing_card_ids)
- if missing_card_ids:
- raise HTTPException(status_code=404, detail=f"卡片未发现: {missing_card_ids}")
- deleted_count = 0
- if unbind_card_ids:
- unbind_format_strings = ",".join(["%s"] * len(unbind_card_ids))
- cursor.execute(
- f"DELETE FROM `{settings.DB_USER_CARD_TABLE_NAME}` "
- f"WHERE user_id = %s AND card_id IN ({unbind_format_strings})",
- tuple([data.user_id] + unbind_card_ids)
- )
- deleted_count = cursor.rowcount
- inserted_count = 0
- if bind_card_ids:
- bind_params = [(data.user_id, card_id) for card_id in bind_card_ids]
- cursor.executemany(
- f"INSERT IGNORE INTO `{settings.DB_USER_CARD_TABLE_NAME}` (user_id, card_id) VALUES (%s, %s)",
- bind_params
- )
- inserted_count = cursor.rowcount
- db_conn.commit()
- logger.info(
- f"Admin {current_user['id']} updated card bindings for external user {data.user_id}: "
- f"bind={bind_card_ids}, unbind={unbind_card_ids}"
- )
- return {
- "message": "卡片绑定关系更新成功",
- "user_id": data.user_id,
- "bind_card_id": bind_card_ids,
- "unbind_card_id": unbind_card_ids,
- "deleted_count": deleted_count,
- "inserted_count": inserted_count
- }
- except HTTPException:
- db_conn.rollback()
- raise
- except Exception as e:
- db_conn.rollback()
- logger.error(f"Bind card failed: {e}")
- raise HTTPException(status_code=500, detail="卡片绑定失败")
|