본문으로 건너뛰기
Advertisement

인증/인가 — JWT와 OAuth2

FastAPI는 OAuth2PasswordBearerJWT를 조합해 표준 토큰 인증을 구현합니다.


설치

pip install python-jose[cryptography]  # JWT
pip install passlib[bcrypt] # 비밀번호 해싱

비밀번호 해싱

from passlib.context import CryptContext

# bcrypt 해싱 컨텍스트
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)


# 사용 예
hashed = hash_password("my-secret-password")
print(verify_password("my-secret-password", hashed)) # True
print(verify_password("wrong-password", hashed)) # False

JWT 토큰 생성·검증

from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from pydantic import BaseModel

SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


class TokenData(BaseModel):
username: str | None = None


def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
payload.update({"exp": expire})
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)


def decode_token(token: str) -> TokenData:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise ValueError("토큰에 사용자 정보 없음")
return TokenData(username=username)
except JWTError:
raise ValueError("유효하지 않은 토큰")

전체 인증 흐름

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from datetime import timedelta
from typing import Annotated

app = FastAPI()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2: 토큰을 Authorization: Bearer <token> 헤더에서 추출
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

# 사용자 DB (실제로는 DB 사용)
fake_users_db = {
"alice": {
"username": "alice",
"hashed_password": pwd_context.hash("password123"),
"email": "alice@example.com",
"role": "admin",
"disabled": False,
},
"bob": {
"username": "bob",
"hashed_password": pwd_context.hash("bobpass"),
"email": "bob@example.com",
"role": "user",
"disabled": False,
},
}


# 현재 사용자 의존성
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)]
) -> dict:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="인증 정보를 확인할 수 없습니다",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token_data = decode_token(token)
except ValueError:
raise credentials_exception

user = fake_users_db.get(token_data.username)
if user is None or user["disabled"]:
raise credentials_exception
return user


# 활성 사용자만 허용
async def get_active_user(
current_user: Annotated[dict, Depends(get_current_user)]
) -> dict:
if current_user["disabled"]:
raise HTTPException(status_code=400, detail="비활성화된 계정")
return current_user


# 관리자 권한
async def require_admin(
user: Annotated[dict, Depends(get_active_user)]
) -> dict:
if user["role"] != "admin":
raise HTTPException(status_code=403, detail="관리자 권한 필요")
return user


CurrentUser = Annotated[dict, Depends(get_active_user)]
AdminUser = Annotated[dict, Depends(require_admin)]


# 로그인 엔드포인트
@app.post("/auth/login")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = fake_users_db.get(form_data.username)
if not user or not pwd_context.verify(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="아이디 또는 비밀번호가 틀렸습니다",
headers={"WWW-Authenticate": "Bearer"},
)

access_token = create_access_token(
data={"sub": user["username"]},
expires_delta=timedelta(minutes=30),
)
return {"access_token": access_token, "token_type": "bearer"}


# 보호된 엔드포인트
@app.get("/users/me")
async def read_me(current_user: CurrentUser):
return {
"username": current_user["username"],
"email": current_user["email"],
"role": current_user["role"],
}


@app.get("/admin/users")
async def list_all_users(admin: AdminUser):
return list(fake_users_db.values())


@app.post("/auth/logout")
async def logout(current_user: CurrentUser):
# JWT는 서버에서 무효화 불가 — 클라이언트에서 토큰 삭제
# 서버 측 무효화: Redis 블랙리스트 사용
return {"message": "로그아웃 완료"}

Refresh Token 패턴

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import timedelta

app = FastAPI()


class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"


def create_token_pair(username: str) -> TokenPair:
access_token = create_access_token(
{"sub": username, "type": "access"},
expires_delta=timedelta(minutes=30),
)
refresh_token = create_access_token(
{"sub": username, "type": "refresh"},
expires_delta=timedelta(days=7),
)
return TokenPair(access_token=access_token, refresh_token=refresh_token)


@app.post("/auth/refresh")
async def refresh_token(refresh_token: str) -> TokenPair:
try:
token_data = decode_token(refresh_token)
except ValueError:
raise HTTPException(status_code=401, detail="유효하지 않은 리프레시 토큰")

# 새 토큰 쌍 발급
return create_token_pair(token_data.username)

정리

구성 요소역할
OAuth2PasswordBearerBearer 토큰 추출
OAuth2PasswordRequestForm로그인 폼 파싱
passlib + bcrypt비밀번호 해싱
python-jose + JWT토큰 생성·검증
Depends(get_current_user)인증 의존성 주입
Refresh TokenAccess Token 갱신

JWT 인증에서 Access Token은 짧게(30분), Refresh Token은 길게(7일) 유지하는 것이 표준입니다.

Advertisement