ruạṛ
#!/usr/bin/env python3 import argparse import json import os import shlex import sys from typing import List, Dict, Optional from app.errors.error_codes import ErrorCode from app.errors.exceptions import BartError from app.utils.job_activity_logger import JobActivityLogger from app.utils.utility_functions import run_sudo_as_user, read_homedir, does_path_exists activity_logger: JobActivityLogger = None def _abs(p: str) -> str: return os.path.normpath(os.path.abspath(p)) class ImmutableScanner: """ Scan destination paths for immutable (chattr +i) files/dirs. - One pass: { lsattr -d -- DEST; lsattr -aR -- DEST; } - Emits homedir-relative paths (e.g., 'public_html/...'). - Excludes can be absolute or dest-relative. """ def __init__(self, username: str, homedir: str): self.username = username self.homedir_abs = _abs(homedir) # ---- subprocess helpers ---- def _run_as_user(self, shell_cmd: str) -> str: try: output = run_sudo_as_user(self.username, ["/bin/sh", "-c", shell_cmd]) if output is None: return "" return output except Exception as e: error_str = str(e).lower() # Treat unsupported/missing lsattr as empty result (no fail-fast here) if "operation not supported" in error_str or "not found" in error_str: return "" raise RuntimeError(f"command failed: {shell_cmd}\nstderr: {str(e)}") def _lsattr_all(self, dest_abs: str) -> List[str]: cmd = "{ lsattr -d -- " + shlex.quote(dest_abs) + "; lsattr -aR -- " + shlex.quote(dest_abs) + "; }" out = self._run_as_user(cmd) return out.splitlines() if out else [] # ---- parsing & filters ---- @staticmethod def _parse_immutable(lsattr_lines: List[str]) -> List[str]: """ Keep entries where first field contains lowercase 'i'. Skip '/path:' headers. Returns absolute paths. """ imm: List[str] = [] for ln in lsattr_lines: ln = ln.strip() if not ln or ln.endswith(":"): continue parts = ln.split(None, 1) if len(parts) != 2: continue attrs, path = parts if "i" in attrs: imm.append(_abs(path)) return sorted(set(imm)) @staticmethod def _apply_excludes(paths_abs: List[str], excludes_abs: List[str]) -> List[str]: if not excludes_abs: return paths_abs excl = [_abs(e) for e in excludes_abs] kept: List[str] = [] for p in paths_abs: if any(p == e or p.startswith(e + os.sep) for e in excl): continue kept.append(p) return kept # ---- public API ---- def scan_dest( self, dest_abs: str, excludes: Optional[List[str]] = None, ) -> List[Dict[str, str]]: """ Scan a single DEST (absolute) and return: [{"path": "public_html/...", "type": "file|directory"}, ...] """ dest_abs = _abs(dest_abs) if not does_path_exists(dest_abs): return [] # Normalize excludes for this dest: # - absolute excludes left as-is # - relative excludes treated as under this dest # excludes_abs = [ # _abs(e if e.startswith(os.sep) else os.path.join(dest_abs, e)) # for e in (excludes or []) # ] # Normalize excludes for this dest - handle all formats excludes_abs = [] for e in (excludes or []): exclude_abs = None if e.startswith(os.sep): # Absolute path - use as-is exclude_abs = _abs(e) else: # Relative path - normalize it base = e.strip('/') # Remove username prefix if present (e.g., "cvlmhste/public_html/wp-content") if base.startswith(self.username + '/'): base = base[len(self.username) + 1:] # Remove destination relative path prefix if present # e.g., if dest is /home1/user/public_html and base is "public_html/wp-content" dest_relative = os.path.relpath(dest_abs, self.homedir_abs) if dest_relative != '.' and base.startswith(dest_relative + '/'): base = base[len(dest_relative) + 1:] # Join with dest_abs exclude_abs = _abs(os.path.join(dest_abs, base)) # Ensure exclusion path is under dest if exclude_abs == dest_abs or exclude_abs.startswith(dest_abs + os.sep): excludes_abs.append(exclude_abs) lines = self._lsattr_all(dest_abs) imm_abs = self._apply_excludes(self._parse_immutable(lines), excludes_abs) # Convert to homedir-relative and attach type out: List[Dict[str, str]] = [] for full in imm_abs: # Prefer homedir-relative if full.startswith(self.homedir_abs + os.sep): rel = full[len(self.homedir_abs):].lstrip(os.sep) elif full == self.homedir_abs: rel = os.path.basename(self.homedir_abs) else: rel = full # outside homedir; keep absolute ptype = "directory" if os.path.isdir(full) else "file" out.append({"path": rel, "type": ptype}) out.sort(key=lambda x: (x["path"], x["type"])) return out def scan_many( self, dests_abs: List[str], excludes: Optional[List[str]] = None, ) -> List[Dict[str, str]]: """Scan multiple absolute dests and return a combined, de-duplicated list.""" all_items: List[Dict[str, str]] = [] for d in dests_abs: all_items.extend(self.scan_dest(d, excludes)) seen = set() uniq: List[Dict[str, str]] = [] for item in all_items: key = (item["path"], item["type"]) if key in seen: continue seen.add(key) uniq.append(item) uniq.sort(key=lambda x: (x["path"], x["type"])) return uniq # ----------------- CLI helpers ----------------- def _read_homedir(self,username: str) -> str: # Resolve homedir via getent; fallback to /home/<user> homedir = read_homedir(username) if not homedir: error_msg = f"User {username} does not exist on the server." activity_logger.error(error_msg) raise BartError(ErrorCode.USER_NOT_FOUND, error_msg) return homedir def _normalize_dest(self, username: str, homedir_abs: str, p: str) -> str: """ Accepts absolute, homedir-relative, or username-prefixed paths. - '/home1/u/public_html' -> absolute kept - 'public_html' -> homedir/public_html - 'u/public_html' (matches user) -> homedir/public_html """ p = p.strip() if not p: return p if p.startswith("/"): return _abs(p) # username-prefixed? if p.startswith(username + "/"): p = p[len(username) + 1 :] return _abs(os.path.join(homedir_abs, p)) @classmethod def check_immutable_paths( cls, username: str, paths: List[str], cbs_job_id: int, is_cli: bool = False, exclude_paths: Optional[List[str]] = None, ) -> Dict: global activity_logger activity_logger = JobActivityLogger(cbs_job_id, echo_to_stdout=is_cli) activity_logger.info(f"<======== Starting Immutable Paths Check ========>") # Get homedir homedir = read_homedir(username) if not homedir: error_msg = f"User {username} does not exist on the server." activity_logger.error(error_msg) raise BartError(ErrorCode.USER_NOT_FOUND, error_msg) homedir_abs = _abs(homedir) # Normalize all destination paths scanner_temp = cls(username=username, homedir=homedir_abs) dests_abs = [scanner_temp._normalize_dest(username, homedir_abs, p) for p in paths] # Create scanner scanner = cls(username=username, homedir=homedir_abs) # Scan all destinations immutable_paths = scanner.scan_many(dests_abs=dests_abs, excludes=exclude_paths) # Format response to match expected structure total_immutable = len(immutable_paths) activity_logger.info(f"Immutable paths check completed. Found {total_immutable} immutable path(s)") return { "message": "Immutable paths check completed", "username": username, "total_immutable_paths": total_immutable, "immutable_paths": immutable_paths, } def main(): ap = argparse.ArgumentParser( description="List immutable (chattr +i) entries under destination paths." ) ap.add_argument("username", help="Run lsattr as this user (sudo -u USER).") ap.add_argument( "paths", help="Comma-separated destinations (absolute, homedir-relative, or username-prefixed).", ) ap.add_argument( "--exclude", default="", help="Comma-separated excludes (absolute or dest-relative), applied to each dest.", ) ap.add_argument("--json", action="store_true", help="Emit JSON") args = ap.parse_args() username = args.username homedir = read_homedir(username) if not homedir: print(f"ERROR: User {username} does not exist on the server.", file=sys.stderr) sys.exit(1) homedir_abs = _abs(homedir) # Accept mixed formats; normalize all to absolute using username+homedir raw_dests = [p.strip() for p in args.paths.split(",") if p.strip()] if not raw_dests: print("ERROR: no destination paths provided", file=sys.stderr) sys.exit(1) scanner_temp = ImmutableScanner(username=username, homedir=homedir_abs) dests = [scanner_temp._normalize_dest(username, homedir_abs, p) for p in raw_dests] excludes = [e.strip() for e in args.exclude.split(",") if e.strip()] if args.exclude else [] scanner = ImmutableScanner(username=username, homedir=homedir_abs) try: immutable_paths = scanner.scan_many(dests_abs=dests, excludes=excludes) except Exception as e: print(f"ERROR: {e}", file=sys.stderr) sys.exit(1) payload = { "username": username, "total_immutable_paths": len(immutable_paths), "immutable_paths": immutable_paths, # requested shape } if args.json: print(json.dumps(payload, indent=2)) else: print(f"Immutable paths for user={username}") for item in immutable_paths: print(f"[{item['type']}] {item['path']}") print(f"\nTOTAL: {len(immutable_paths)}") # Exit: 0 if none; 2 if found sys.exit(0 if len(immutable_paths) == 0 else 2) if __name__ == "__main__": main()
cải xoăn