ruạṛ
import argparse import json import os import shlex import sys from typing import List, Dict from app.errors.error_codes import ErrorCode from app.errors.exceptions import BartError from app.utils.job_activity_logger import JobActivityLogger from app.utils.utility_functions import read_homedir, does_path_exists, run_sudo_as_user activity_logger: JobActivityLogger = None # Deprecated. Keeping for fallback class ImmutablePathChecker: """Simplified immutable path checker with clean separation of concerns.""" def __init__(self, activity_logger: JobActivityLogger): self.logger = activity_logger def _normalize_path(self, path: str, base_path: str, homedir: str, username: str = None) -> str: """Normalize a path to an absolute path, handling various input formats.""" if os.path.isabs(path): return os.path.normpath(path) base = path.strip('/') # Remove username prefix if present if username and base.startswith(username + '/'): base = base[len(username) + 1:] # Remove destination relative path prefix if present if base_path != homedir: dest_relative = os.path.relpath(base_path, homedir) if dest_relative != '.' and base.startswith(dest_relative + '/'): base = base[len(dest_relative) + 1:] return os.path.normpath(os.path.join(base_path, base)) def _normalize_exclude_paths(self, exclude_paths: List[str], dest_abs: str, homedir: str, username: str) -> List[ str]: """Normalize all exclude paths upfront.""" if not exclude_paths: return [] homedir_abs = os.path.abspath(homedir) exclude_paths_abs = [] for exclude_path in exclude_paths: exclude_abs = self._normalize_path(exclude_path, dest_abs, homedir_abs, username) # Ensure exclusion path is under dest if exclude_abs == dest_abs or exclude_abs.startswith(dest_abs + os.sep): exclude_paths_abs.append(exclude_abs) return exclude_paths_abs def _build_find_command(self, dest_abs: str, exclude_paths_abs: List[str]) -> str: """Build find command with exclusions in clean format.""" find_cmd_parts = ["/usr/bin/find", dest_abs] if exclude_paths_abs: # Build exclude expression: \( -path path1 -o -path path2 ... \) exclude_expr_parts = ["\\("] for i, exclude_abs in enumerate(exclude_paths_abs): if i > 0: exclude_expr_parts.append("-o") exclude_expr_parts.extend(["-path", exclude_abs]) exclude_expr_parts.append("\\)") # Join exclude expression (parentheses are already escaped, don't quote them) exclude_expr = " ".join( p if p in ["\\(", "\\)"] else shlex.quote(p) for p in exclude_expr_parts ) find_cmd_parts.extend([exclude_expr, "-prune", "-o"]) find_cmd_parts.append("-print0") # Build final command string (don't quote the exclude expression) find_cmd = " ".join( p if "\\(" in p and "\\)" in p else shlex.quote(p) for p in find_cmd_parts ) return find_cmd def collect_immutable_paths(self, dest: str, homedir: str = None, exclude_paths: List[str] = None, username: str = None) -> List[tuple]: """Collect immutable paths under destination, excluding specified paths.""" immutable_paths = [] exclude_paths = exclude_paths or [] if not does_path_exists(dest): self.logger.warn(f"Path does not exist: {dest}") return immutable_paths dest_abs = os.path.abspath(dest) # Extract homedir from dest if not provided if not homedir: parts = dest_abs.split(os.sep) for i in range(len(parts), 0, -1): potential_homedir = os.sep + os.path.join(*parts[:i]) if potential_homedir.startswith('/home') and len(parts[:i]) >= 2: homedir = potential_homedir break if not homedir: error_msg = f"Could not determine homedir from path {dest}" self.logger.error(error_msg) raise BartError(ErrorCode.VALIDATION_FAILED, error_msg) # Extract username from homedir if not provided if not username: username = os.path.basename(homedir) try: # Step 1: Normalize all exclude paths upfront exclude_paths_abs = self._normalize_exclude_paths(exclude_paths, dest_abs, homedir, username) # Step 2: Build find command find_cmd = self._build_find_command(dest_abs, exclude_paths_abs) # Step 3: Build pipeline lsattr_cmd = "/usr/bin/lsattr -d" xargs_cmd = f"/usr/bin/xargs -0 -n 1 {lsattr_cmd}" awk_script = r"""awk '$1 ~ /i/ && $NF !~ /:$/ { cmd="test -d \"" $NF "\" && echo DIRECTORY || echo FILE" cmd | getline type close(cmd) print type, $NF }'""" pipeline = f"{find_cmd} | {xargs_cmd} | {awk_script}" # Step 4: Execute pipeline output = run_sudo_as_user(username, ["/bin/sh", "-c", pipeline]) # Step 5: Parse output seen_paths = set() if output and output.strip(): for line in output.splitlines(): line = line.strip() if not line: continue parts = line.split(None, 1) if len(parts) != 2: continue path_type, path_abs = parts path_type = path_type.upper() path_abs = os.path.normpath(path_abs) # Include dest itself and all children if path_abs == dest_abs or path_abs.startswith(dest_abs + os.sep): if path_abs not in seen_paths: seen_paths.add(path_abs) immutable_paths.append((path_type, path_abs)) immutable_paths.sort() except Exception as e: error_str = str(e).lower() error_msg = str(e) if "operation not supported" in error_str: self.logger.warn("Filesystem does not support file attributes, proceeding without immutable path check") return immutable_paths if "permission denied" in error_str or "password is required" in error_str: error_msg = f"Permission denied while checking immutable paths for {dest}: {error_msg}" self.logger.error(error_msg) raise BartError(ErrorCode.PERMISSION_DENIED, error_msg) error_msg = f"Failed to check immutable paths for {dest}: {error_msg}" self.logger.error(error_msg) raise BartError(ErrorCode.COMMAND_FAILED, error_msg) return immutable_paths def check_multiple_paths(self, paths: List[str], homedir: str, exclude_paths: List[str] = None, username: str = None) -> Dict[str, List[Dict]]: """Check multiple paths for immutable files/directories.""" all_immutable_paths = {} homedir_abs = os.path.abspath(homedir) if not username: username = os.path.basename(homedir) for path in paths: restore_dest = self._normalize_path(path, homedir_abs, homedir_abs, username) immutable_paths_with_type = self.collect_immutable_paths(restore_dest, homedir, exclude_paths, username) # Convert to relative paths and format as objects immutable_paths_relative = [] for path_type, abs_path in immutable_paths_with_type: if abs_path.startswith(homedir_abs + os.sep): rel_path = abs_path[len(homedir_abs):].lstrip(os.sep) immutable_paths_relative.append({ "type": path_type.lower(), "path": rel_path }) elif abs_path == homedir_abs: if not os.path.isabs(path): rel_path = path else: rel_path = abs_path immutable_paths_relative.append({ "type": path_type.lower(), "path": rel_path }) else: immutable_paths_relative.append({ "type": path_type.lower(), "path": abs_path }) all_immutable_paths[path] = immutable_paths_relative return all_immutable_paths @classmethod def check_immutable_paths(cls, username: str, paths: List[str], cbs_job_id: int, is_cli: bool = False, exclude_paths: List[str] = None) -> Dict: """Main entry point for checking immutable paths.""" global activity_logger if isinstance(cbs_job_id, str): cbs_job_id = int(cbs_job_id) activity_logger = JobActivityLogger(cbs_job_id, echo_to_stdout=is_cli) activity_logger.info(f"<======== Starting Immutable Paths Check ========>") homedir = read_homedir(username) if not homedir: error_msg = f"User {username} does not exist on the server." activity_logger.error(error_msg) raise BartError(ErrorCode.USER_NOT_FOUND, error_msg) checker = cls(activity_logger) all_immutable_paths_dict = checker.check_multiple_paths(paths, homedir, exclude_paths, username) all_immutable_paths = [] for path_list in all_immutable_paths_dict.values(): all_immutable_paths.extend(path_list) total_immutable = len(all_immutable_paths) activity_logger.info(f"Immutable paths check completed. Found {total_immutable} immutable path(s)") return { "message": "Immutable paths check completed", "username": username, "total_immutable_paths": total_immutable, "immutable_paths": all_immutable_paths, } def main(): """Command-line interface for checking immutable paths.""" parser = argparse.ArgumentParser( description="Check for immutable paths (chattr +i) for a given user and paths", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Check a single path python -m app.utils.immutable_path_checker_v2 cvlmhste "public_html/wp-content" 1234 # Check multiple paths (comma-separated) python -m app.utils.immutable_path_checker_v2 cvlmhste "public_html/wp-content,public_html/wp-includes" 1234 # Check with absolute paths python -m app.utils.immutable_path_checker_v2 cvlmhste "/home1/cvlmhste/public_html/wp-content" 1234 # Check with exclusions python -m app.utils.immutable_path_checker_v2 cvlmhste "public_html" 1234 --exclude "public_html/wp-includes,public_html/wp-admin" """ ) parser.add_argument("username", help="Username to check immutable paths for") parser.add_argument("paths", help="Comma-separated list of paths to check (relative to homedir or absolute)") parser.add_argument("cbs_job_id", type=int, help="CBS job ID for logging") parser.add_argument("--exclude", dest="exclude_paths", help="Comma-separated list of paths to exclude from check") parser.add_argument("--json", action="store_true", help="Output results as JSON") args = parser.parse_args() paths_list = [p.strip() for p in args.paths.split(',') if p.strip()] if not paths_list: print("Error: No valid paths provided", file=sys.stderr) sys.exit(1) exclude_paths_list = None if args.exclude_paths: exclude_paths_list = [p.strip() for p in args.exclude_paths.split(',') if p.strip()] try: result = ImmutablePathChecker.check_immutable_paths( username=args.username, paths=paths_list, cbs_job_id=args.cbs_job_id, is_cli=True, exclude_paths=exclude_paths_list ) if args.json: print(json.dumps(result, indent=2)) else: print("\n" + "=" * 60) print("IMMUTABLE PATHS CHECK RESULTS") print("=" * 60) print(f"Username: {result['username']}") print(f"Total Immutable Paths: {result['total_immutable_paths']}") print(f"\nDetails:") immutable_paths = result['immutable_paths'] if immutable_paths: for imm_path_obj in immutable_paths: path_type = imm_path_obj.get('type', 'unknown') path = imm_path_obj.get('path', '') print(f" [{path_type.upper()}] {path}") else: print(" (no immutable paths)") print("=" * 60 + "\n") sys.exit(0 if result['total_immutable_paths'] == 0 else 1) except BartError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) except Exception as e: print(f"Unexpected error: {e}", file=sys.stderr) import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()
cải xoăn