ruạṛ
# -*- coding: utf-8 -*- # Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://opensource.org/licenses/UPL. import base64 import gzip import logging import os import pickle import time try: # python2 import xmlrpclib except ImportError: # python3 import xmlrpc.client as xmlrpclib import zlib import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from osms import config PICKLE_AUTH_FILENAME = '/var/lib/oracle-cloud-agent/plugins/osms/loginAuth.pkl' CLIENT_CAPS = { 'packages.runTransaction': {'version': '1', 'value': '1'}, 'packages.rollBack': {'version': '1', 'value': '1'}, 'packages.verify': {'version': '1', 'value': '1'}, 'packages.extended_profile': {'version': '2', 'value': '1'}, 'packages.update': {'version': '2', 'value': '2'}, } def get_client_caps_header(): headers = ['%s(%s)=%s' % (k, v['version'], v['value']) for k, v in CLIENT_CAPS.items()] return {'X-RHN-Client-Capability': ','.join(headers)} def read_cached_login(): try: with open(PICKLE_AUTH_FILENAME, 'rb') as pickle_auth: data = pickle.load(pickle_auth) created_time = data['time'] login_info = data['loginInfo'] id_ver = xmlrpclib.loads(config.getSystemId())[0][0]['system_id'] cached_id_ver = 'ID-%s' % login_info['X-RHN-Server-Id'] if id_ver != cached_id_ver: return None expire_time = created_time + float(login_info['X-RHN-Auth-Expire-Offset']) if time.time() > expire_time: logging.debug('Cached loginInfo has expired, created: %s, expire: %s.', created_time, expire_time) return None return login_info except Exception as err: logging.debug('Failed to read cached loginInfo: %s', err) return None def write_cached_login(login_info): try: data = { 'time': time.time(), 'loginInfo': login_info, } pickle_auth_dir = os.path.dirname(PICKLE_AUTH_FILENAME) if not os.access(pickle_auth_dir, os.W_OK): os.mkdir(pickle_auth_dir) os.chmod(pickle_auth_dir, int('0700', 8)) with open(PICKLE_AUTH_FILENAME, 'wb') as pickle_auth: os.chmod(PICKLE_AUTH_FILENAME, int('0600', 8)) pickle.dump(data, pickle_auth) logging.debug( 'Wrote cacched loginInfo at %s with expiration of %s seconds.', data['time'], login_info['X-RHN-Auth-Expire-Offset'] ) return True except Exception as err: logging.debug('Failed to write cached loginInfo: %s', err) return False def get_proxies(cfg, external=False): if external: key = 'enableExternalProxy' else: key = 'enableProxy' if not cfg[key]: return None proxy_host = config.getProxySetting() if not proxy_host: return None if cfg['enableProxyAuth']: proxy_user = cfg['proxyUser'] proxy_password = cfg['proxyPassword'] if proxy_user and proxy_password: proxy = 'http://%s:%s@%s' % (proxy_user, proxy_password, proxy_host) else: proxy = 'http://%s' % proxy_host return {'http': proxy} def get_proxy(cfg, external=False): proxy_url = None proxy_username = None proxy_password = None if external: key = 'enableExternalProxy' else: key = 'enableProxy' if cfg[key]: proxy_host = config.getProxySetting() if proxy_host: proxy_url = 'http://' + proxy_host if cfg['enableProxyAuth']: proxy_username = cfg['proxyUser'] proxy_password = cfg['proxyPassword'] return proxy_url, proxy_username, proxy_password def get_cacert(cfg): cacert = cfg['sslCACert'] if cacert and isinstance(cacert, list): cacert = cacert[0] if os.access(cacert, os.R_OK): return cacert return None def get_headers(cfg, login_info=None): headers = get_client_caps_header() if cfg['tenantId']: headers['X-Tenant-Id'] = cfg['tenantId'] headers['X-RHN-Transport-Capability'] = 'follow-redirects=3' if login_info: needed_headers = [ 'X-RHN-Server-Id', 'X-RHN-Auth-User-Id', 'X-RHN-Auth', 'X-RHN-Auth-Server-Time', 'X-RHN-Auth-Expire-Offset', ] for header in needed_headers: if header not in login_info: raise Exception('Missing required login information %s' % header) headers[header] = str(login_info[header]) return headers def decode_response_content(response): # requests and urllib3 will automatically decode content in 'gzip' and 'deflate' encoding. content = response.content if response.headers.get('Content-Transfer-Encoding', '').lower() == 'base64': content = base64.b64decode(content) if response.headers.get('Content-Encoding', '').lower() == 'x-zlib': content = zlib.decompress(content) if response.headers.get('Content-Encoding', '').lower() == 'x-gzip': content = gzip.decompress(content) return content def log_request(**kwargs): for key, value in kwargs.items(): logging.debug('request parameter %s: %s', key, value) def log_response(response): logging.debug('request method and url: %s %s', response.request.method, response.request.url) logging.debug('request headers: %s', response.request.headers) logging.debug('response status code: %s', response.status_code) logging.debug('response headers: %s', response.headers) class RequestsTransport(xmlrpclib.SafeTransport): def __init__(self, use_https=True, verify=None, proxies=None, timeout=120, headers=None, **kwargs): self.use_https = use_https self.verify = verify self.proxies = proxies self.timeout = timeout self.headers = headers self.last_response = None xmlrpclib.SafeTransport.__init__(self, **kwargs) def retry_adapter(self): retry_strategy = Retry( total=5, status_forcelist=[500, 502, 503, 504], method_whitelist=["HEAD", "GET", "OPTIONS", "POST"], backoff_factor=20 ) adapter = HTTPAdapter(max_retries=retry_strategy) return adapter def request(self, host, handler, request_body, verbose=False, headers=None): scheme = 'https' if self.use_https else 'http' url = '%s://%s%s' % (scheme, host, handler) session = requests.Session() session.mount(scheme, self.retry_adapter()) if self.headers: if headers: combined_headers = self.headers.update(headers) else: combined_headers = self.headers else: combined_headers = headers auth = None log_request( url=url, verify=self.verify, auth=auth, proxies=self.proxies, timeout=self.timeout, headers=combined_headers ) response = session.post( url, data=request_body, verify=self.verify, auth=auth, proxies=self.proxies, timeout=self.timeout, headers=combined_headers ) log_response(response) try: response.raise_for_status() except requests.RequestException as e: raise xmlrpclib.ProtocolError(url, response.status_code, str(e), response.headers) self.last_response = response content = decode_response_content(response) p, u = self.getparser() p.feed(content) p.close() return u.close() class OsmsServer(xmlrpclib.ServerProxy): def __init__(self, timeout=120, **kwargs): cfg = config.initUp2dateConfig() server_url = config.getServerlURL()[0] use_https = server_url.startswith('https') verify = None if use_https: verify = get_cacert(cfg) self.transport = RequestsTransport( use_https, verify=verify, proxies=get_proxies(cfg), timeout=timeout, headers=get_headers(cfg) ) xmlrpclib.ServerProxy.__init__(self, server_url, self.transport, **kwargs) self.login_info = None self.login() def __request(self, methodname, params): # Retry once on auth error try: return self.__request1(self, methodname, params) except xmlrpclib.ProtocolError as err: try: is_auth_error = abs(int(err.headers.get('X-RHN-Fault-Code'))) == 34 except Exception: is_auth_error = False if not is_auth_error: raise err logging.debug('retry on auth error') self.login(force_update=True) return self.__request1(self, methodname, params) def __request1(self, methodname, params): request = xmlrpclib.dumps( params, methodname, encoding=self.__encoding, allow_none=self.__allow_none ).encode(self.__encoding, 'xmlcharrefreplace') response = self.__transport.request( self.__host, self.__handler, request, verbose=self.__verbose, headers=self.login_info ) if len(response) == 1: response = response[0] return response def login(self, force_update=False): system_id = config.getSystemId() if not system_id: self.login_info = None return None if not force_update: login_info = read_cached_login() if not login_info: force_update = True if force_update: login_info = self.up2date.login(system_id) write_cached_login(login_info) if isinstance(self.login_info, dict): self.login_info.update(login_info) else: self.login_info = login_info return self.login_info
cải xoăn