인증/인가 — JWT와 OAuth2
FastAPI는 OAuth2PasswordBearer와 JWT를 조합해 표준 토큰 인증을 구현합니다.
설치
pip install python-jose[cryptography] # JWT
pip install passlib[bcrypt] # 비밀번호 해싱
비밀번호 해싱
from passlib.context import CryptContext
# bcrypt 해싱 컨텍스트
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# 사용 예
hashed = hash_password("my-secret-password")
print(verify_password("my-secret-password", hashed)) # True
print(verify_password("wrong-password", hashed)) # False
JWT 토큰 생성·검증
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from pydantic import BaseModel
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class TokenData(BaseModel):
username: str | None = None
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
payload = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
payload.update({"exp": expire})
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> TokenData:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise ValueError("토큰에 사용자 정보 없음")
return TokenData(username=username)
except JWTError:
raise ValueError("유효하지 않은 토큰")
전체 인증 흐름
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from datetime import timedelta
from typing import Annotated
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2: 토큰을 Authorization: Bearer <token> 헤더에서 추출
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# 사용자 DB (실제로는 DB 사용)
fake_users_db = {
"alice": {
"username": "alice",
"hashed_password": pwd_context.hash("password123"),
"email": "alice@example.com",
"role": "admin",
"disabled": False,
},
"bob": {
"username": "bob",
"hashed_password": pwd_context.hash("bobpass"),
"email": "bob@example.com",
"role": "user",
"disabled": False,
},
}
# 현재 사용자 의존성
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)]
) -> dict:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="인증 정보를 확인할 수 없습니다",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token_data = decode_token(token)
except ValueError:
raise credentials_exception
user = fake_users_db.get(token_data.username)
if user is None or user["disabled"]:
raise credentials_exception
return user
# 활성 사용자만 허용
async def get_active_user(
current_user: Annotated[dict, Depends(get_current_user)]
) -> dict:
if current_user["disabled"]:
raise HTTPException(status_code=400, detail="비활성화된 계정")
return current_user
# 관리자 권한
async def require_admin(
user: Annotated[dict, Depends(get_active_user)]
) -> dict:
if user["role"] != "admin":
raise HTTPException(status_code=403, detail="관리자 권한 필요")
return user
CurrentUser = Annotated[dict, Depends(get_active_user)]
AdminUser = Annotated[dict, Depends(require_admin)]
# 로그인 엔드포인트
@app.post("/auth/login")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = fake_users_db.get(form_data.username)
if not user or not pwd_context.verify(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="아이디 또는 비밀번호가 틀렸습니다",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": user["username"]},
expires_delta=timedelta(minutes=30),
)
return {"access_token": access_token, "token_type": "bearer"}
# 보호된 엔드포인트
@app.get("/users/me")
async def read_me(current_user: CurrentUser):
return {
"username": current_user["username"],
"email": current_user["email"],
"role": current_user["role"],
}
@app.get("/admin/users")
async def list_all_users(admin: AdminUser):
return list(fake_users_db.values())
@app.post("/auth/logout")
async def logout(current_user: CurrentUser):
# JWT는 서버에서 무효화 불가 — 클라이언트에서 토큰 삭제
# 서버 측 무효화: Redis 블랙리스트 사용
return {"message": "로그아웃 완료"}
Refresh Token 패턴
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import timedelta
app = FastAPI()
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
def create_token_pair(username: str) -> TokenPair:
access_token = create_access_token(
{"sub": username, "type": "access"},
expires_delta=timedelta(minutes=30),
)
refresh_token = create_access_token(
{"sub": username, "type": "refresh"},
expires_delta=timedelta(days=7),
)
return TokenPair(access_token=access_token, refresh_token=refresh_token)
@app.post("/auth/refresh")
async def refresh_token(refresh_token: str) -> TokenPair:
try:
token_data = decode_token(refresh_token)
except ValueError:
raise HTTPException(status_code=401, detail="유효하지 않은 리프레시 토큰")
# 새 토큰 쌍 발급
return create_token_pair(token_data.username)
정리
| 구성 요소 | 역할 |
|---|---|
OAuth2PasswordBearer | Bearer 토큰 추출 |
OAuth2PasswordRequestForm | 로그인 폼 파싱 |
passlib + bcrypt | 비밀번호 해싱 |
python-jose + JWT | 토큰 생성·검증 |
Depends(get_current_user) | 인증 의존성 주입 |
| Refresh Token | Access Token 갱신 |
JWT 인증에서 Access Token은 짧게(30분), Refresh Token은 길게(7일) 유지하는 것이 표준입니다.