ruạṛ
import json import os import secrets import subprocess import sys import time from datetime import datetime, timezone from typing import Any, Dict, List, Optional from app.config import bart_jobs_folder from app.errors.error_codes import ErrorCode from app.errors.exceptions import BartError from app.utils.job_activity_logger import JobActivityLogger from app.utils.utility_functions import read_json_file DELAY_SECONDS = 15 BULK_USERS_INPUT_NAME = "bulk_users.json" BULK_USERS_OUTPUT_NAME = "bulk_users_output.json" BULK_USERS_OUTPUT_TRANSFORMED_NAME = "bulk_users_output_transformed.json" START_BULK_USER_BACKUP_SCRIPT = "/usr/local/bin/start_bulk_user_backup.py" # Long-running bulk backup; override with BART_BULK_USER_BACKUP_TIMEOUT_SEC if needed. _DEFAULT_TIMEOUT_SEC = int(os.getenv("BART_BULK_USER_BACKUP_TIMEOUT_SEC", "86400")) activity_logger: Optional[JobActivityLogger] = None def multi_user_backup(request_body: Dict[str, Any], is_cli: bool = False) -> Dict[str, Any]: cbs_job_id = request_body.get("cbs_job_id") set_job_logger(cbs_job_id, is_cli) activity_logger.info(f"<======== Starting Multi-User Backup ========>") job_folder = create_job_folder(cbs_job_id) save_input_json_file(job_folder, request_body) output_json_path = call_multi_user_backup_script(job_folder, cbs_job_id) activity_logger.info(f"output_json_path for cbs_job_id {cbs_job_id} is {output_json_path}") # output_json_path = call_multi_user_backup_script_dummy(job_folder) # transformed_path = write_bulk_users_output_transformed(job_folder) return generate_callback_data(output_json_path) def set_job_logger(job_id, is_cli): global activity_logger activity_logger = JobActivityLogger(job_id, echo_to_stdout=is_cli) def create_job_folder(cbs_id: int) -> str: job_folder = f"{bart_jobs_folder}/job_{cbs_id}" os.makedirs(job_folder, exist_ok=True) activity_logger.success(f"Created job folder: {job_folder}") return job_folder def save_input_json_file(job_folder: str, request_body: Dict[str, Any]) -> None: assert activity_logger is not None input_json_path = os.path.join(job_folder, BULK_USERS_INPUT_NAME) backup_requests = request_body.get("backup_requests") or {} jobs: list = [] for request_id, job in backup_requests.items(): if isinstance(job, dict): jobs.append({**job, "preference_id": str(request_id)}) bulk_users_payload = { "volumes": request_body.get("volumes"), "zfs_pool_ids": request_body.get("zfs_pool_ids"), "jobs": jobs, } try: with open(input_json_path, "w", encoding="utf-8") as f: json.dump(bulk_users_payload, f, indent=2) except OSError as e: activity_logger.error(f"Failed to write input JSON: {e}") raise BartError(ErrorCode.PERMISSION_DENIED, f"Cannot write {input_json_path}: {e}") from e activity_logger.note(f"Wrote transformed request payload to {input_json_path}") def _reference_id_from_bulk_job_id(job_id: Any) -> str: """ Derive CBS reference id from composite ``job_id`` (e.g. ``11008_2`` → ``2``). Uses the substring after the first ``_``; if there is no underscore, returns ``str(job_id)``. """ if job_id is None: return "" s = str(job_id).strip() if "_" in s: return s.split("_", 1)[1] return s def write_bulk_users_output_transformed(job_folder: str) -> str: assert activity_logger is not None src_path = os.path.join(job_folder, BULK_USERS_OUTPUT_NAME) out_path = os.path.join(job_folder, BULK_USERS_OUTPUT_TRANSFORMED_NAME) if not os.path.isfile(src_path): activity_logger.error(f"Bulk output not found: {src_path}") raise BartError(ErrorCode.FILE_NOT_FOUND, f"Missing bulk output file: {src_path}") try: data = read_json_file(src_path) except ValueError as e: activity_logger.error(f"Invalid JSON in {src_path}: {e}") raise BartError(ErrorCode.VALIDATION_FAILED, str(e)) from e if not isinstance(data, list): activity_logger.error(f"Expected JSON array in {src_path}, got {type(data).__name__}") raise BartError(ErrorCode.VALIDATION_FAILED, f"{src_path} must contain a JSON array") transformed: List[Dict[str, Any]] = [] for row in data: if not isinstance(row, dict): activity_logger.warn(f"Skipping non-object row in {src_path}: {row!r}") continue jid = row.get("job_id") ref = _reference_id_from_bulk_job_id(jid) transformed.append({**row, "preference_id": ref}) try: with open(out_path, "w", encoding="utf-8") as f: json.dump(transformed, f, indent=2) except OSError as e: activity_logger.error(f"Failed to write transformed bulk output: {e}") raise BartError(ErrorCode.PERMISSION_DENIED, f"Cannot write {out_path}: {e}") from e activity_logger.success( f"Wrote transformed bulk output ({len(transformed)} row(s)) to {out_path}" ) return out_path def call_multi_user_backup_script(job_folder: str, cbs_job_id: str): input_json_path = os.path.join(job_folder, BULK_USERS_INPUT_NAME) if not os.path.isfile(input_json_path): activity_logger.error(f"Input file not found: {input_json_path}") raise BartError(ErrorCode.FILE_NOT_FOUND, f"Missing bulk input file: {input_json_path}") if not os.path.isfile(START_BULK_USER_BACKUP_SCRIPT): activity_logger.error(f"Backup script not found: {START_BULK_USER_BACKUP_SCRIPT}") raise BartError( ErrorCode.FILE_NOT_FOUND, f"Multi-user backup script is not installed: {START_BULK_USER_BACKUP_SCRIPT}", ) cmd = ["sudo", "-n", sys.executable, START_BULK_USER_BACKUP_SCRIPT, f"--bulk-job-id={cbs_job_id}", f"--bulk-users-json={input_json_path}"] activity_logger.note(f"Running bulk user backup: {' '.join(cmd)}") activity_logger.info(f"Script timeout set to {_DEFAULT_TIMEOUT_SEC} seconds") started = time.monotonic() try: result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=_DEFAULT_TIMEOUT_SEC, check=False, ) except subprocess.TimeoutExpired as e: elapsed = time.monotonic() - started activity_logger.error(f"Bulk user backup script timed out after {elapsed:.1f}s") raise BartError( ErrorCode.TIMEOUT, f"start_bulk_user_backup.py exceeded {_DEFAULT_TIMEOUT_SEC} seconds", ) from e except OSError as e: activity_logger.error(f"Failed to execute bulk user backup script: {e}") raise BartError(ErrorCode.COMMAND_FAILED, f"Could not run backup script: {e}") from e elapsed = time.monotonic() - started activity_logger.info(f"Bulk user backup script finished in {elapsed:.3f} seconds (exit {result.returncode})") if result.stdout and result.stdout.strip(): activity_logger.debug(f"Script stdout (last 2k): {result.stdout.strip()[-2000:]!r}") if result.stderr and result.stderr.strip(): activity_logger.note(f"Script stderr (last 2k): {result.stderr.strip()[-2000:]!r}") if result.returncode != 0: err_tail = result.stderr or result.stdout or "no output" activity_logger.error(f"Bulk user backup script failed with exit code {result.returncode}: {err_tail}") raise BartError(ErrorCode.COMMAND_FAILED, f"start_bulk_user_backup.py failed (rc={result.returncode}): {err_tail}") output_path = os.path.join(job_folder, BULK_USERS_OUTPUT_NAME) if os.path.isfile(output_path): return output_path return None def _jobs_from_request(request_data: Dict[str, Any]) -> list: """Return [(reference_id, job_dict), ...] from ``jobs`` or ``backup_requests``.""" jobs = request_data.get("jobs") if isinstance(jobs, list) and jobs: out = [] for j in jobs: if not isinstance(j, dict): continue ref = j.get("preference_id") if ref is None: ref = j.get("job_id") if ref is None: continue out.append((str(ref), j)) return out backup_requests = request_data.get("backup_requests") or {} if isinstance(backup_requests, dict) and backup_requests: return [(str(ref), j) for ref, j in backup_requests.items() if isinstance(j, dict)] return [] def _docroot_paths_from_job(job: Dict[str, Any]) -> list: paths = [] for d in job.get("docroots") or []: if isinstance(d, dict) and d.get("path"): paths.append(str(d["path"])) return paths def _write_dummy_bulk_artifact_files( reference_id: str, file_list_path: str, changed_files_list_path: str, ) -> None: """Create placeholder file-list and backup-diff files for dummy multi-user backup.""" assert activity_logger is not None ref_s = str(reference_id) nonce = secrets.token_hex(8) try: os.makedirs(os.path.dirname(file_list_path), exist_ok=True) with open(file_list_path, "w", encoding="utf-8") as f: f.write( f"# BART dummy bulk file list (reference_id={ref_s} nonce={nonce})\n" f"public_html/index.php\n" f"homedir/tmp/.dummy-{nonce[:12]}.dat\n" ) with open(changed_files_list_path, "w", encoding="utf-8") as f: f.write( f"# BART dummy bulk changed files (reference_id={ref_s} nonce={nonce})\n" f"M\tpublic_html/wp-config.php\n" f"A\tmail/.Trash_{nonce[:6]}\n" ) except OSError as e: activity_logger.error(f"Failed to write dummy artifacts under {ref_s}: {e}") raise BartError( ErrorCode.PERMISSION_DENIED, f"Cannot write dummy bulk files for reference {ref_s}: {e}", ) from e activity_logger.note(f"Wrote dummy artifacts {file_list_path} and {changed_files_list_path}") def call_multi_user_backup_script_dummy(job_folder: str) -> str: """ Simulate bulk backup output: read ``bulk_users.json`` and write ``bulk_users_output.json`` as a JSON array of per-job results (same shape the real script is expected to produce). """ input_json_path = os.path.join(job_folder, BULK_USERS_INPUT_NAME) output_json_path = os.path.join(job_folder, BULK_USERS_OUTPUT_NAME) if not os.path.isfile(input_json_path): activity_logger.error(f"Input file not found: {input_json_path}") raise BartError(ErrorCode.FILE_NOT_FOUND, f"Missing bulk input file: {input_json_path}") try: request_data = read_json_file(input_json_path) except ValueError as e: activity_logger.error(f"Invalid JSON in {input_json_path}: {e}") raise BartError(ErrorCode.VALIDATION_FAILED, str(e)) from e if not isinstance(request_data, dict): raise BartError(ErrorCode.VALIDATION_FAILED, f"{input_json_path} must contain a JSON object") cbs_job_id = request_data.get("cbs_job_id") job_prefix = f"{bart_jobs_folder}/job_{cbs_job_id}" if cbs_job_id is not None else job_folder output_rows = [] for reference_id, job in _jobs_from_request(request_data): ref_s = str(reference_id) user_subdir = os.path.normpath(os.path.join(job_prefix, ref_s)) file_list_path = os.path.join(user_subdir, "file-list.txt") changed_files_list_path = os.path.join(user_subdir, "backup-diff.txt") _write_dummy_bulk_artifact_files(ref_s, file_list_path, changed_files_list_path) succeeded_docroots = _docroot_paths_from_job(job) succeeded_dbs = [str(x) for x in (job.get("databases") or []) if x is not None] row = { "job_id": str(job.get("job_id")), "preference_id": reference_id, "status": "success", "size_bytes": 1024, "succeeded_docroots": succeeded_docroots, "succeeded_dbs": succeeded_dbs, "failed_docroots": [], "failed_dbs": [], "file_list_path": file_list_path, "changed_files_list_path": changed_files_list_path, } output_rows.append(row) try: with open(output_json_path, "w", encoding="utf-8") as f: json.dump(output_rows, f, indent=2) except OSError as e: activity_logger.error(f"Failed to write output JSON: {e}") raise BartError(ErrorCode.PERMISSION_DENIED, f"Cannot write {output_json_path}: {e}") from e activity_logger.success(f"Wrote dummy bulk output ({len(output_rows)} row(s)) to {output_json_path}") return output_json_path def generate_callback_data_dummy(request_body, output_json_path): # Code below is just dummy code time.sleep(DELAY_SECONDS) backup_requests = request_body.get("backup_requests") or {} today = datetime.now(timezone.utc).strftime("%Y-%m-%d") action_data: Dict[str, Any] = {} for idx, req_id in enumerate(backup_requests.keys()): n = idx + 1 action_data[req_id] = { "backup_status": "success", "file_list_path": f"customer_scheduled/custbacktest{n}/{today}/file_list.json", "changed_file_list_path": f"customer_scheduled/custbacktest{n}/{today}/changed_file_list.json", "backup_size_bytes": 1024, } activity_logger.note("Filled placeholder paths for action_data") return action_data def generate_callback_data(output_json_path: str) -> Dict[str, Any]: if not output_json_path or not os.path.isfile(output_json_path): activity_logger.error(f"Output file not found: {output_json_path}") raise BartError(ErrorCode.FILE_NOT_FOUND, f"Missing bulk output file: {output_json_path}") try: data = read_json_file(output_json_path) except ValueError as e: activity_logger.error(f"Invalid JSON in {output_json_path}: {e}") raise BartError(ErrorCode.VALIDATION_FAILED, str(e)) from e if not isinstance(data, list): activity_logger.error(f"Expected JSON array in {output_json_path}, got {type(data).__name__}") raise BartError(ErrorCode.VALIDATION_FAILED, f"{output_json_path} must contain a JSON array") action_data: Dict[str, Any] = {} for row in data: pref_s = str(row.get("preference_id")) action_data[pref_s] = { "backup_status": row.get("status", "unknown"), "file_list_path": row.get("file_list_path", ""), "changed_file_list_path": row.get("changed_files_list_path", ""), "backup_size_bytes": row.get("size_bytes", 0), "succeeded_docroots": row.get("succeeded_docroots", []), "succeeded_dbs": row.get("succeeded_dbs", []), "failed_docroots": row.get("failed_docroots", []), "failed_dbs": row.get("failed_dbs", []), } activity_logger.note(f"Built action_data for {len(action_data)} reference(s) from {output_json_path}") return action_data
cải xoăn