ruạṛ
#!/usr/bin/env python import grp import tarfile import time import re from json import dump from os import environ, listdir, popen, unlink, path, stat from pwd import getpwnam from stat import S_IMODE environ['TZ'] = 'UTC' class Tar: def __init__(self, filename, ops): self.filename = filename self.tar = tarfile.open(self.filename, 'w:gz') self.ops = ops def __call__(self): cmds = {} checks = {} fname = 'cmd-result.json' cname = 'check-result.json' for operation in self.ops: operation(self, cmds, checks) dump(cmds, open(fname, 'w'), sort_keys=True, indent=4, separators=(',', ': ')) dump(checks, open(cname, 'w'), sort_keys=True, indent=4, separators=(',', ': ')) self.tar.add(fname) self.tar.add(cname) unlink(fname) unlink(cname) def add(self, filename): self.tar.add(filename) def __del__(self): self.tar.close() def close(self): del self class Copydir: def __init__(self, directory): self.files = [] for fname in listdir(directory): self.files.append(directory + '/' + fname) def __call__(self, tar, cmdDict, checkDict): for fname in self.files: tar.add(fname) class Copyfile: def __init__(self, filename): self.filename = filename def __call__(self, tar, cmdDict, checkDict): if path.exists(self.filename): tar.add(path.realpath(self.filename)) class Cmd: def __init__(self, cmd, sudo=False): self.cmd = cmd if not sudo: self.sudo = '' else: self.sudo = '/usr/bin/sudo' def __call__(self, tar, cmdDict, checkDict): cmdDict[self.cmd] = popen(self.sudo + ' ' + self.cmd).read().strip() class Check: def __init__(self, function, key, error_repr=None): self.function = function self.key = key self.error_repr = error_repr def __call__(self, tar, cmdDict, checkDict): try: checkDict[self.key] = self.function() except Exception as e: if self.error_repr is None: checkDict[self.key] = repr(e) else: checkDict[self.key] = self.error_repr class Pmap: def __init__(self, process_name=None): self.process_name = process_name self.pids = {} for process in listdir('/proc'): if process[0] >= '0' and process[0] <= '9': self.pids[process] = open('/proc/%s/cmdline' % process).read() def __call__(self, tar, cmdDict, checkDict): for pid in self.pids: pname = self.pids[pid].split('\000')[0].split('/')[-1] if self.process_name == pname: cmd = 'pmap -x %s # pmap for %s' % (pid, pname) Cmd(cmd, True)(tar, cmdDict, checkDict) class RunState(Pmap): def __init__(self, match, cmdSuffix='RunState'): Pmap.__init__(self) self.match = match self.cmdSuffix = cmdSuffix def __call__(self, tar, cmdDict, checkDict): res = '' for process in self.pids: if self.match in self.pids[process]: for line in open('/proc/' + process + '/status').readlines(): if line[0:len('State:')] == 'State:': res = res + self.pids[process] + ' ' + line cmdDict['Internal(%s)' % self.cmdSuffix] = res class If: def __init__(self, cond_, then_, else_=lambda tar, cmdDict, checkDict: None): self.cond_ = cond_ self.then_ = then_ self.else_ = else_ def __call__(self, tar, cmdDict, checkDict): if callable(self.cond_): if self.cond_(tar, cmdDict, checkDict): return self.then_(tar, cmdDict, checkDict) elif self.cond_: return self.then_(tar, cmdDict, checkDict) return self.else_(tar, cmdDict, checkDict) class Progn: def __init__(self, *ops): self.ops = ops def __call__(self, tar, cmdDict, checkDict): for op in self.ops: op(tar, cmdDict, checkDict) class Or: def __init__(self, *ops): self.ops = ops def __call__(self, tar, cmdDict, checkDict): for op in self.ops: if callable(op): res = op(tar, cmdDict, checkDict) if res: return res else: if op: return op return False def correct_ownership_p(owner, group, directory): dirstat = stat(directory) if getpwnam(owner).pw_uid == dirstat.st_uid and getpwnam( group).pw_gid == dirstat.st_gid: return 'expected' return 'unexpected' def group_available(): gmax = -1 gsmax = -1 gmin = 123456789 gsmin = 123456789 for line in open('/etc/login.defs').readlines(): line = line.strip().split() if len(line) == 2: if line[0] == 'SYS_GID_MAX': if int(line[1]) > gsmax: gsmax = int(line[1]) if line[0] == 'GID_MAX': if int(line[1]) > gmax: gmax = int(line[1]) if line[0] == 'SYS_GID_MIN': if int(line[1]) < gsmin: gsmin = int(line[1]) if line[0] == 'GID_MIN': if int(line[1]) < gmin: gmin = int(line[1]) avail = False if gmin == 123456789: gmin = 1000 if gmax == -1: gmax = 60000 if gsmax == -1: gsmax = gmin - 1 if gsmin == 123456789: gsmin = 101 if gsmax <= gsmin: return 'unexpected(login.defs configured incorrectly)' for gid in range(gsmax, gsmin - 1, -1): try: grp.getgrgid(gid) except: avail = True break if avail: return 'expected' return 'unexpected(all GIDs used)' def clock_skew(): with open('/var/log/oracle-cloud-agent/agent.log', 'r') as f: for line in f: if 'is not within allowed clock skew' in line: return 'detected' return 'not detected' def check_output(cmd, expected_output): return "expected" if popen( cmd).read().strip() == expected_output.strip() else "unexpected" def check_prelinking(): cmd = 'rpm -q prelink' prelink_output = popen(cmd).read().strip() if re.search(r'^prelink-.*', prelink_output): return 'Detected and config file is error free' if check_prelink_config( ) else 'Detected and config file have errors' return 'not detected' def check_prelink_config(): filename = '/etc/prelink.conf.d/oracle-cloud-agent-prelink.conf' try: file = open(filename, 'r') prelink_config = set(file.read().splitlines()) if '-b /usr/lib{,64}/oracle-cloud-agent' in prelink_config and '-b /usr/libexec{,64}/oracle-cloud-agent' in prelink_config: return True return False except: return False def check_cmd_output(cmd, string): cmd_result = popen(cmd).read().strip() return True if cmd_result == string else False operations = [ Copydir('/var/log/oracle-cloud-agent'), Cmd('uname -a'), Cmd('uptime'), Cmd('sestatus'), Cmd('sysctl crypto.fips_enabled'), Copyfile('/etc/system-release'), Copyfile('/etc/oracle-cloud-agent/agent.yml'), Copyfile('/etc/oracle-cloud-agent/updater.yml'), Copyfile('/var/lib/oracle-cloud-agent/pool2/install.log'), Cmd('rpm -qi oracle-cloud-agent'), Cmd('rpm -V oracle-cloud-agent'), # Check the service status. Connect stderr to stdout so that OL6 users # don't see the warning message for systemctl not found. Cmd('systemctl is-enabled oracle-cloud-agent-updater.service 2>&1'), Cmd('systemctl is-active oracle-cloud-agent-updater.service 2>&1'), Cmd('systemctl is-enabled oracle-cloud-agent.service 2>&1'), Cmd('systemctl is-active oracle-cloud-agent.service 2>&1'), Cmd('ps auxww'), Cmd('visudo -c 2>&1', True), Cmd('grep oracle-cloud-agent /etc/passwd'), Cmd('grep adm /etc/passwd'), Cmd('tail -100 /var/log/oracle-cloud-agent/agent.log'), Cmd('tail -100 /var/log/oracle-cloud-agent/updater.log'), Cmd('curl -s -H "Authorization: Bearer Oracle" http://169.254.169.254/opc/v1/instance/' ), Check( lambda: "expected" if S_IMODE(stat('/var/log').st_mode) == 0o755 else "unexpected", '/var/log directory permissions'), Check( lambda: "expected" if S_IMODE(stat('/var/log/oracle-cloud-agent').st_mode) == 0o2750 else "unexpected", '/var/log/oracle-cloud-agent directory permissions'), Check( lambda: "expected" if S_IMODE(stat('/var/log/oracle-cloud-agent/agent.log').st_mode ) == 0o644 else 'unexpected', '/var/log/oracle-cloud-agent/agent.log permissions'), Check( lambda: "expected" if S_IMODE(stat('/var/log/oracle-cloud-agent/updater.log').st_mode ) == 0o644 else 'unexpected', '/var/log/oracle-cloud-agent/updater.log permissions'), Check(lambda: correct_ownership_p('root', 'root', '/var/log'), '/var/log directory ownership'), Check( lambda: correct_ownership_p('oracle-cloud-agent', 'adm', '/var/log/oracle-cloud-agent'), '/var/log/oracle-cloud-agent directory ownership'), Check( lambda: correct_ownership_p('oracle-cloud-agent', 'adm', '/var/log/oracle-cloud-agent/agent.log'), '/var/log/oracle-cloud-agent/agent.log file ownership'), Check( lambda: correct_ownership_p('oracle-cloud-agent-updater', 'adm', '/var/log/oracle-cloud-agent/updater.log'), '/var/log/oracle-cloud-agent/updater.log file ownership'), Check(lambda: getpwnam('oracle-cloud-agent') and "exists", 'oracle-cloud-agent user', 'missing'), Check(lambda: getpwnam('adm') and "exists", 'adm user', 'missing'), Check(group_available, 'checking GID availability'), Check(clock_skew, 'checking clock skew'), Check(check_prelinking, 'checking prelinking'), If( Or(lambda tar, cmds, checks: len(cmds['uname -a'].split('el6uek')) == 2, lambda tar, cmds, checks: len(cmds['uname -a'].split('.el6.')) == 2), Progn( Check( lambda: "ok" if (stat('/var/log/btmp').st_size % 384 == 0) else "corrupt", 'btmp size'), Copyfile('/etc/init/oracle-cloud-agent.conf'), Copyfile('/etc/init/oracle-cloud-agent-updater.conf'))), Pmap('oracle-cloud-agent'), Pmap('oracle-cloud-agent-updater'), Copyfile('/etc/multipath.conf'), RunState('oracle-cloud-agent') ] if __name__ == '__main__': outfile = time.strftime('oca-diag-%Y-%m-%d.%H-%M-%S.tar.gz', time.localtime()) tar_file = Tar(outfile, operations) tar_file() del tar_file print('Diagnostics collected in ' + outfile)
cải xoăn