1 year ago
#260189
AliAryaie
How to refresh token and burn token in logout In FastAPI?
I use this code for authentication.
#1. auth_handler.py
import time
from decouple import config
import jwt
JWT_SECRET = config("JWT_SECRET")
JWT_ALGORITHM = config("JWT_ALGORITHM")
def response_token(token: str) -> dict:
"""
docstring
"""
return {
"access_token": token
}
def sign_jwt(user_id: str or int, validity: int) -> dict:
"""
docstring
"""
payload = {
"user_id": user_id,
"expiry": time.time() + validity
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return response_token(
{
"access_token": token
}
)
def decode_jwt(token: str) -> dict:
"""
docstring
"""
try:
decoded_token = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM]
)
return decoded_token if decoded_token["expiry"] >= time.time() else {}
except:
return {}
and
#2. auth_bearer.py
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from . import auth_handler
class JWTBearer(HTTPBearer):
"""
docstring
"""
def __init__(self, auto_error: bool = True):
"""
docstring
"""
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
"""
docstring
"""
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=403,
detail="Invalid Authentication Scheme."
)
if not self.verify_jwt(credentials.credentials):
raise HTTPException(
status_code=403,
detail="Invalid Token OR Expired Token."
)
return credentials.credentials
else:
raise HTTPException(
status_code=403,
detail="Invalid Authorization Code."
)
def verify_jwt(self, jwt_token: str) -> bool:
"""
docstring
"""
isTokenValid: bool = False
try:
payload = auth_handler.decode_jwt(jwt_token)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
To secure the routes, I'll leverage dependency injection via FastAPI's Depends.
from fastapi import FastAPI, Body, Depends
from schemas import UserSchema, UserLoginSchema
from auth_bearer import JWTBearer
from auth_handler import signJWT
app = FastAPI()
def check_user(data: UserLoginSchema):
for user in users:
if user.email == data.email and user.password == data.password:
return True
return False
@app.post("/user/signup", tags=["user"])
async def create_user(user: UserSchema = Body(...)):
users.append(user) # replace with db call, making sure to hash the password first
return signJWT(user.email)
@app.post("/user/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
if check_user(user):
return signJWT(user.email)
return {
"error": "Wrong login details!"
}
@app.post("/posts", dependencies=[Depends(JWTBearer())], tags=["posts"])
async def add_post(post: PostSchema) -> dict:
post.id = len(posts) + 1
posts.append(post.dict())
return {
"data": "post added."
}
this code works for sign-in and correct authorization.
But I want to add refresh token functionality and burn token functionality for the time that users log out.
How to implement these functionalities?
python
jwt
fastapi
pyjwt
0 Answers
Your Answer