【FastAPI】JWTトークンを使用した認証方法について

当ページのリンクには広告が含まれています。
  • URLをコピーしました!

近年では良く用いられているJWTトークンを使用した認証を実装いたします。

まずは任意のエンドポイントに認証処理を追加します。

この記事のサンプルコード

FastAPIの基礎についての記事まとめ

目次

OAuth2PasswordBearer

auth/oauth2.pyを作成して、認証に必要な情報を記載します。

エンドポイントを認証によって保護する場合には、fastapi.securityOAuth2PasswordBearerを用います。

from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

python-multipartをインストールしていな方はpip install python-multipartをお願いします

認証機能のインスタンスを作成し、引数のtokenUrl='token'でトークン認証に用いるURLを指定しています。

ローカルの場合だと、127.0.0.1:8000/tokenをURLを指しています。こちらのエンドポイントは後程作成します。

エンドポイントに追加

作成出来たら任意のエンドポイントに認証機能を追加します。

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from db.database import get_db
from db import db_article
from schemas import ArticleBase, ArticleDisplay
from auth.oauth2 import oauth2_scheme

router = APIRouter(
    prefix='/article',
    tags=['article']
)

@router.get('/{id}', response_model=ArticleDisplay)
def get_article(id: int, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
    return db_article.get_article(db, id)

get_article()token: str =Depends(oauth2_scheme)を記載しました。

ドキュメントを確認してみると

一番右上にはAuthorize、先ほどのエンドポイントには鍵マークが表示されています。

Authorizeボタンを押してみると下記が表示されます。

これで見た目だけは認証機能が追加されました。次にトークン用のエンドポイントを作成します。

トークン作成関数の作成

トークン生成用の関数を作成します。

python-joseのインストールが必要ですので、pip install python-joseをお願いします。

https://fastapi.tiangolo.com/ja/tutorial/security/oauth2-jwt/#jwt_1

rom fastapi.security import OAuth2PasswordBearer
from typing import Optional
from datetime import datetime, timedelta
from jose import jwt
 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
SECRET_KEY = '77407c7339a6c00544e51af1101c4abb4aea2a31157ca5f7dfd87da02a628107'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
 
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
  to_encode = data.copy()
  if expires_delta:
    expire = datetime.utcnow() + expires_delta
  else:
    expire = datetime.utcnow() + timedelta(minutes=15)
  to_encode.update({"exp": expire})
  encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  return encoded_jwt

引数のdataから、SEACRET_KEYALGORITHMによってJWTキーを作成しています。

シークレットキーの作成は下記の様に作成できます。

openssl rand -hex 32
0a0b62ffe010d8a54940837fcbe22af717b5a2205951867777aebac6adb35539

トークン生成用エンドポイントの作成

先程作成したトークン生成関数をエンドポイントに紐づけます。

エンドポイントのURLは/tokenになります。

from fastapi import APIRouter, HTTPException, status
from fastapi.param_functions import Depends
from fastapi.security.oauth2 import OAuth2PasswordRequestForm
from sqlalchemy.orm.session import Session
from db.database import get_db
from db import models
from db.hash import Hash
from auth import oauth2

router = APIRouter(
    tags=['authentication']
)

@router.post('/token')
def get_token(request: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.username == request.username).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail='Invalid credentials'
        )
    if not Hash.verify(user.password, request.password):
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail='Incorrect password'
        )
    access_token = oauth2.create_access_token(data={'sub': user.username})
    return {
        'access_token': access_token,
        'token_type': 'bearer',
        'user_id': user.id,
        'username': user.username
    }

処理としては

  • OAuth2PasswordRequestFormからusernamepasswordを受け取る
  • データベースのUserモデルからデータを検索
  • ユーザーがいなければエラー
  • ハッシュ化したパスワードを照合し、合致しなければエラー
  • アクセストークンの生成

となります。

データベースの作成やパスワードのハッシュ化については下記記事をご参照ください。

エンドポイントの確認

ドキュメントに戻って、認証が通るか確認してみましょう。

usernamepasswordを入力すると無事認証が通りました。

次に、先ほど作成したトークン生成用のエンドポイントを確認します。

usernamepasswordを入力すると・・・

レスポンスでaccess_tokenなどが返ってきました!

念のため、外部からもAPIを叩いてみましょう。Postmanから実行してみます。

https://www.postman.com/

問題なさそうです!

作成したこのエンドポイントは、承認処理が必要なエンドポイントに自動的に実行されます。

次に作成されたアクセストークンが正しいものなのか検証するための関数を作成します。

トークンからユーザー情報を取得する関数を作成

from fastapi.param_functions import Depends
from fastapi.security import OAuth2PasswordBearer
from typing import Optional
from datetime import datetime, timedelta
from jose import jwt
from jose.exceptions import JWTError
from sqlalchemy.orm.session import Session
from fastapi import HTTPException, status
from db.database import get_db
from db import db_user
 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
SECRET_KEY = '0a0b62ffe010d8a54940837fcbe22af717b5a2205951867777aebac6adb35539'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
 
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
  to_encode = data.copy()
  if expires_delta:
    expire = datetime.utcnow() + expires_delta
  else:
    expire = datetime.utcnow() + timedelta(minutes=15)
  to_encode.update({"exp": expire})
  encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  return encoded_jwt

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail='Colud not validate credentials',
        headers={'WWW-Authenticate': "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    
    user = db_user.get_user_by_username(db, username)
    if user is None:
        raise credentials_exception
    return user
from fastapi import HTTPException, status
from sqlalchemy.orm.session import Session
from db.models import User

def get_user_by_username(db: Session, username: int):
    user = db.query(User).filter(User.username == id).first()
    if not username:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
            detail=f'User with id {username} not found')
    return user

認証が必要なエンドポイントの修正

current_user: str = Depends(get_current_user)を引数に追加し、認証機能を追加しました。

@router.post('/', response_model=ArticleDisplay)
def create_article(
    request: ArticleBase, 
    db: Session = Depends(get_db), 
    current_user: str = Depends(get_current_user)
    ):
    return db_article.create_article(db, request)

保護したいAPIに追加すれば完了です。

この記事のサンプルコード

FastAPIの基礎についての記事まとめ

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

コメント

コメントする

目次