ruạṛ
from typing import List from fastapi import BackgroundTasks, APIRouter from pydantic import BaseModel from app.utils.immutable_path_scanner import ImmutableScanner from app.utils.background_task_wrapper import background_task_wrapper from app.utils.threaded_tasks import run_threaded_task router = APIRouter() class CheckImmutablePathsRequest(BaseModel): username: str paths: List[str] cbs_job_id: int exclude_paths: List[str] = None @router.post("/check-immutable-paths", summary="Check for immutable paths at restore destinations") async def check_immutable_paths(request: CheckImmutablePathsRequest, background_tasks: BackgroundTasks): background_tasks.add_task( run_check_immutable_paths, request.username, request.paths, request.cbs_job_id, request.exclude_paths, ) return { "message": "Immutable paths check request has been accepted.", "cbs_job_id": request.cbs_job_id } @background_task_wrapper def run_check_immutable_paths(username: str, paths: List[str], cbs_job_id: int, exclude_paths: List[str] = None): # Ensure cbs_job_id is an int (handle string conversion from API) if isinstance(cbs_job_id, str): cbs_job_id = int(cbs_job_id) def task(): return ImmutableScanner.check_immutable_paths(username, paths, cbs_job_id, is_cli=False, exclude_paths=exclude_paths) def success(result): return result run_threaded_task(cbs_job_id, "check-immutable-paths", task, success)
cải xoăn