ruạṛ
import base64 import json import time import socket from typing import Optional from fastapi import Request, Response from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding from app.utils.request_context import cbs_callback_uuid_context, cbs_custom_callback_url_context import logging class SignedTokenStrategy: def __init__(self, valid_user: str, public_key_path: str): self.valid_user = valid_user self.public_key_path = public_key_path self.current_hostname = socket.gethostname() self.logger = logging.getLogger("bart") async def authenticate(self, request: Request) -> Optional[Response]: auth_header = request.headers.get("Authorization") cbs_custom_callback_url = request.headers.get("X-BART-CBS-Custom-Callback-URL") if not auth_header or not auth_header.startswith("Basic "): self._auth_log_error("MISSING_OR_INCORRECT_HEADER", request) return Response("Unauthorized", status_code=401, headers={"WWW-Authenticate": "Basic"}) try: # Step 1: Split and decode encoded = auth_header.split(" ")[1] if ":" not in encoded: self._auth_log_error("BAD_FORMAT", request) return Response("Invalid token", status_code=401) user_encoded, token_b64_encoded = encoded.split(":", 1) user = base64.b64decode(user_encoded).decode() token_json = base64.b64decode(token_b64_encoded).decode() if user != self.valid_user: self._auth_log_error("INVALID_USER", request) return Response("Invalid token", status_code=403) # Step 2: Decode token and extract payload & signature token = json.loads(token_json) payload = token["payload"] signature_b64 = token["signature"] # Step 3: Validate fields job_id = payload.get("job_id") server_hostname = payload.get("server_hostname") expiry = payload.get("expiry") uuid = payload.get("uuid") required_fields = {"job_id": job_id, "server_hostname": server_hostname, "expiry": expiry, "uuid": uuid} missing = [k for k, v in required_fields.items() if not v] if missing: self._auth_log_error("MISSING_FIELDS", request) return Response("Invalid token", status_code=401) if int(time.time()) > int(expiry): self._auth_log_error(f"EXPIRED [{int(time.time())}:{int(expiry)}]", request) return Response("Invalid token", status_code=401) if not self._is_valid_hostname_match(server_hostname, request): self._auth_log_error("HOSTNAME_MISMATCH", request) return Response("Invalid token", status_code=401) # Step 4: Verify signature if not self._verify_signature(payload, signature_b64): self._auth_log_error("BAD_SIGNATURE", request) return Response("Invalid token", status_code=401) # Step 5: Store uuid in context cbs_callback_uuid_context.set(uuid) if cbs_custom_callback_url: cbs_custom_callback_url_context.set(cbs_custom_callback_url) return None # Authenticated successfully except Exception as e: self._auth_log_error("EXCEPTION", request) return Response("Invalid token", status_code=401) def _is_valid_hostname_match(self, server_hostname: str, request) -> bool: # Check if it matches hostname if server_hostname == self.current_hostname: return True # Check if it's one of the machine's IPs try: local_ips = socket.gethostbyname_ex(socket.gethostname())[2] # local_ips = ["127.0.0.1", "localhost"] # Support local IPs if dev envt if server_hostname in local_ips: return True except Exception as e: self.logger.warning(f"Failed to resolve local IPs: {e}") self._auth_log_warning("SERVER_HOSTNAME_MISMATCH", request) return True # Returning true for now, will restrict this later def _verify_signature(self, payload: dict, signature_b64: str) -> bool: try: with open(self.public_key_path, "rb") as key_file: public_key = serialization.load_pem_public_key(key_file.read()) canonical = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode() signature = base64.b64decode(signature_b64) public_key.verify( signature, canonical, padding.PKCS1v15(), hashes.SHA256() ) return True except Exception: return False def _auth_log_error(self, code: str, request: Request): self.logger.error(f"[AUTH:{code}] Authentication failure from {request.client.host}") def _auth_log_warning(self, code: str, request: Request): self.logger.warning(f"[AUTH:{code}] Authentication failure from {request.client.host}. Continuing anyway.")
cải xoăn