55 lines
2.0 KiB
Python
55 lines
2.0 KiB
Python
|
|
from datetime import datetime, timedelta, timezone
|
|||
|
|
import importlib.util
|
|||
|
|
|
|||
|
|
from fastapi import Depends, HTTPException, status
|
|||
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|||
|
|
|
|||
|
|
_jose_spec = importlib.util.find_spec("jose")
|
|||
|
|
_jose_origin = _jose_spec.origin if _jose_spec else None
|
|||
|
|
|
|||
|
|
_incompatible_jose = bool(_jose_origin and _jose_origin.endswith("site-packages/jose.py"))
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
import jwt
|
|||
|
|
from jwt import InvalidTokenError as JWTError
|
|||
|
|
except Exception as exc:
|
|||
|
|
if not _incompatible_jose:
|
|||
|
|
try:
|
|||
|
|
from jose import JWTError, jwt
|
|||
|
|
except Exception as fallback_exc:
|
|||
|
|
raise RuntimeError(
|
|||
|
|
"JWT 依赖不可用,请安装 python-jose[cryptography] 或 PyJWT"
|
|||
|
|
) from fallback_exc
|
|||
|
|
else:
|
|||
|
|
raise RuntimeError(
|
|||
|
|
"检测到不兼容依赖 jose,请卸载 jose 后安装 python-jose[cryptography] 或安装 PyJWT"
|
|||
|
|
) from exc
|
|||
|
|
|
|||
|
|
from backend.config import settings
|
|||
|
|
|
|||
|
|
security = HTTPBearer()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def create_access_token(subject: str) -> str:
|
|||
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.access_token_expire_minutes)
|
|||
|
|
payload = {"sub": subject, "exp": expire}
|
|||
|
|
return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def verify_password(username: str, password: str) -> bool:
|
|||
|
|
return username == settings.admin_username and password == settings.admin_password
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_current_user(
|
|||
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|||
|
|
) -> str:
|
|||
|
|
token = credentials.credentials
|
|||
|
|
try:
|
|||
|
|
payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
|
|||
|
|
username: str | None = payload.get("sub")
|
|||
|
|
if not username:
|
|||
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效Token")
|
|||
|
|
return username
|
|||
|
|
except JWTError as exc:
|
|||
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="登录已过期") from exc
|