ruạṛ
from fastapi import Request, Response from app.auth_framework.AuthStrategy import AuthStrategy import base64 from typing import Optional import bcrypt class BasicAuthStrategy(AuthStrategy): def __init__(self, valid_users: dict): """valid_users is a dictionary of {username: password}""" self.valid_users = valid_users async def authenticate(self, request: Request) -> Optional[Response]: auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): return Response("Unauthorized", status_code=401, headers={"WWW-Authenticate": "Basic"}) try: encoded_credentials = auth_header.split(" ")[1] decoded_credentials = base64.b64decode(encoded_credentials).decode("utf-8") user, passwd = decoded_credentials.split(":", 1) if self.valid_users.get(user) == passwd: return None # Authentication successful # if check_password(passwd, self.valid_users.get(user)): # not working properly as of now # return None # Authentication successful except Exception: pass return Response("Forbidden", status_code=403) def check_password(password: str, hashed: str) -> bool: return bcrypt.checkpw(password.encode(), hashed.encode())
cải xoăn