+class OpenShiftCLIError(Exception):
+ '''Exception class for openshiftcli'''
+ pass
+ADDITIONAL_PATH_LOOKUPS = ['/usr/local/bin', os.path.expanduser('~/bin')]
+def locate_oc_binary():
+ ''' Find and return oc binary file '''
+ #
+ # oc can be in /usr/local/bin in some cases, but that may not
+ # be in $PATH due to ansible/sudo
+ paths = os.environ.get("PATH", os.defpath).split(os.pathsep) + ADDITIONAL_PATH_LOOKUPS
+ oc_binary = 'oc'
+ # Use shutil.which if it is available, otherwise fallback to a naive path search
+ try:
+ which_result = shutil.which(oc_binary, path=os.pathsep.join(paths))
+ if which_result is not None:
+ oc_binary = which_result
+ except AttributeError:
+ for path in paths:
+ if os.path.exists(os.path.join(path, oc_binary)):
+ oc_binary = os.path.join(path, oc_binary)
+ break
+ return oc_binary
+# pylint: disable=too-few-public-methods
+class OpenShiftCLI(object):
+ ''' Class to wrap the command line tools '''
+ def __init__(self,
+ namespace,
+ kubeconfig='/etc/origin/master/admin.kubeconfig',
+ verbose=False,
+ all_namespaces=False):
+ ''' Constructor for OpenshiftCLI '''
+ self.namespace = namespace
+ self.verbose = verbose
+ self.kubeconfig = Utils.create_tmpfile_copy(kubeconfig)
+ self.all_namespaces = all_namespaces
+ self.oc_binary = locate_oc_binary()
+ # Pylint allows only 5 arguments to be passed.
+ # pylint: disable=too-many-arguments
+ def _replace_content(self, resource, rname, content, force=False, sep='.'):
+ ''' replace the current object with the content '''
+ res = self._get(resource, rname)
+ if not res['results']:
+ return res
+ fname = Utils.create_tmpfile(rname + '-')
+ yed = Yedit(fname, res['results'][0], separator=sep)
+ changes = []
+ for key, value in content.items():
+ changes.append(yed.put(key, value))
+ if any([change[0] for change in changes]):
+ yed.write()
+ atexit.register(Utils.cleanup, [fname])
+ return self._replace(fname, force)
+ return {'returncode': 0, 'updated': False}
+ def _replace(self, fname, force=False):
+ '''replace the current object with oc replace'''
+ # We are removing the 'resourceVersion' to handle
+ # a race condition when modifying oc objects
+ yed = Yedit(fname)
+ results = yed.delete('metadata.resourceVersion')
+ if results[0]:
+ yed.write()
+ cmd = ['replace', '-f', fname]
+ if force:
+ cmd.append('--force')
+ return self.openshift_cmd(cmd)
+ def _create_from_content(self, rname, content):
+ '''create a temporary file and then call oc create on it'''
+ fname = Utils.create_tmpfile(rname + '-')
+ yed = Yedit(fname, content=content)
+ yed.write()
+ atexit.register(Utils.cleanup, [fname])
+ return self._create(fname)
+ def _create(self, fname):
+ '''call oc create on a filename'''
+ return self.openshift_cmd(['create', '-f', fname])
+ def _delete(self, resource, name=None, selector=None):
+ '''call oc delete on a resource'''
+ cmd = ['delete', resource]
+ if selector is not None:
+ cmd.append('--selector={}'.format(selector))
+ elif name is not None:
+ cmd.append(name)
+ else:
+ raise OpenShiftCLIError('Either name or selector is required when calling delete.')
+ return self.openshift_cmd(cmd)
+ def _process(self, template_name, create=False, params=None, template_data=None): # noqa: E501
+ '''process a template
+ template_name: the name of the template to process
+ create: whether to send to oc create after processing
+ params: the parameters for the template
+ template_data: the incoming template's data; instead of a file
+ '''
+ cmd = ['process']
+ if template_data:
+ cmd.extend(['-f', '-'])
+ else:
+ cmd.append(template_name)
+ if params:
+ param_str = ["{}={}".format(key, str(value).replace("'", r'"')) for key, value in params.items()]
+ cmd.append('-v')
+ cmd.extend(param_str)
+ results = self.openshift_cmd(cmd, output=True, input_data=template_data)
+ if results['returncode'] != 0 or not create:
+ return results
+ fname = Utils.create_tmpfile(template_name + '-')
+ yed = Yedit(fname, results['results'])
+ yed.write()
+ atexit.register(Utils.cleanup, [fname])
+ return self.openshift_cmd(['create', '-f', fname])
+ def _get(self, resource, name=None, selector=None):
+ '''return a resource by name '''
+ cmd = ['get', resource]
+ if selector is not None:
+ cmd.append('--selector={}'.format(selector))
+ elif name is not None:
+ cmd.append(name)
+ cmd.extend(['-o', 'json'])
+ rval = self.openshift_cmd(cmd, output=True)
+ # Ensure results are retuned in an array
+ if 'items' in rval:
+ rval['results'] = rval['items']
+ elif not isinstance(rval['results'], list):
+ rval['results'] = [rval['results']]
+ return rval
+ def _schedulable(self, node=None, selector=None, schedulable=True):
+ ''' perform oadm manage-node scheduable '''
+ cmd = ['manage-node']
+ if node:
+ cmd.extend(node)
+ else:
+ cmd.append('--selector={}'.format(selector))
+ cmd.append('--schedulable={}'.format(schedulable))
+ return self.openshift_cmd(cmd, oadm=True, output=True, output_type='raw') # noqa: E501
+ def _list_pods(self, node=None, selector=None, pod_selector=None):
+ ''' perform oadm list pods
+ node: the node in which to list pods
+ selector: the label selector filter if provided
+ pod_selector: the pod selector filter if provided
+ '''
+ cmd = ['manage-node']
+ if node:
+ cmd.extend(node)
+ else:
+ cmd.append('--selector={}'.format(selector))
+ if pod_selector:
+ cmd.append('--pod-selector={}'.format(pod_selector))
+ cmd.extend(['--list-pods', '-o', 'json'])
+ return self.openshift_cmd(cmd, oadm=True, output=True, output_type='raw')
+ # pylint: disable=too-many-arguments
+ def _evacuate(self, node=None, selector=None, pod_selector=None, dry_run=False, grace_period=None, force=False):
+ ''' perform oadm manage-node evacuate '''
+ cmd = ['manage-node']
+ if node:
+ cmd.extend(node)
+ else:
+ cmd.append('--selector={}'.format(selector))
+ if dry_run:
+ cmd.append('--dry-run')
+ if pod_selector:
+ cmd.append('--pod-selector={}'.format(pod_selector))
+ if grace_period:
+ cmd.append('--grace-period={}'.format(int(grace_period)))
+ if force:
+ cmd.append('--force')
+ cmd.append('--evacuate')
+ return self.openshift_cmd(cmd, oadm=True, output=True, output_type='raw')
+ def _version(self):
+ ''' return the openshift version'''
+ return self.openshift_cmd(['version'], output=True, output_type='raw')
+ def _import_image(self, url=None, name=None, tag=None):
+ ''' perform image import '''
+ cmd = ['import-image']
+ image = '{0}'.format(name)
+ if tag:
+ image += ':{0}'.format(tag)
+ cmd.append(image)
+ if url:
+ cmd.append('--from={0}/{1}'.format(url, image))
+ cmd.append('-n{0}'.format(self.namespace))
+ cmd.append('--confirm')
+ return self.openshift_cmd(cmd)
+ def _run(self, cmds, input_data):
+ ''' Actually executes the command. This makes mocking easier. '''
+ curr_env = os.environ.copy()
+ curr_env.update({'KUBECONFIG': self.kubeconfig})
+ proc = subprocess.Popen(cmds,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=curr_env)
+ stdout, stderr = proc.communicate(input_data)
+ return proc.returncode, stdout.decode('utf-8'), stderr.decode('utf-8')
+ # pylint: disable=too-many-arguments,too-many-branches
+ def openshift_cmd(self, cmd, oadm=False, output=False, output_type='json', input_data=None):
+ '''Base command for oc '''
+ cmds = [self.oc_binary]
+ if oadm:
+ cmds.append('adm')
+ cmds.extend(cmd)
+ if self.all_namespaces:
+ cmds.extend(['--all-namespaces'])
+ elif self.namespace is not None and self.namespace.lower() not in ['none', 'emtpy']: # E501
+ cmds.extend(['-n', self.namespace])
+ if self.verbose:
+ print(' '.join(cmds))
+ try:
+ returncode, stdout, stderr = self._run(cmds, input_data)
+ except OSError as ex:
+ returncode, stdout, stderr = 1, '', 'Failed to execute {}: {}'.format(subprocess.list2cmdline(cmds), ex)
+ rval = {"returncode": returncode,
+ "cmd": ' '.join(cmds)}
+ if output_type == 'json':
+ rval['results'] = {}
+ if output and stdout:
+ try:
+ rval['results'] = json.loads(stdout)
+ except ValueError as verr:
+ if "No JSON object could be decoded" in verr.args:
+ rval['err'] = verr.args
+ elif output_type == 'raw':
+ rval['results'] = stdout if output else ''
+ if self.verbose:
+ print("STDOUT: {0}".format(stdout))
+ print("STDERR: {0}".format(stderr))
+ if 'err' in rval or returncode != 0:
+ rval.update({"stderr": stderr,
+ "stdout": stdout})
+ return rval
+class Utils(object):
+ ''' utilities for openshiftcli modules '''
+ @staticmethod
+ def _write(filename, contents):
+ ''' Actually write the file contents to disk. This helps with mocking. '''
+ with open(filename, 'w') as sfd:
+ sfd.write(contents)
+ @staticmethod
+ def create_tmp_file_from_contents(rname, data, ftype='yaml'):
+ ''' create a file in tmp with name and contents'''
+ tmp = Utils.create_tmpfile(prefix=rname)
+ if ftype == 'yaml':
+ # AUDIT:no-member makes sense here due to ruamel.YAML/PyYAML usage
+ # pylint: disable=no-member
+ if hasattr(yaml, 'RoundTripDumper'):
+ Utils._write(tmp, yaml.dump(data, Dumper=yaml.RoundTripDumper))
+ else:
+ Utils._write(tmp, yaml.safe_dump(data, default_flow_style=False))
+ elif ftype == 'json':
+ Utils._write(tmp, json.dumps(data))
+ else:
+ Utils._write(tmp, data)
+ # Register cleanup when module is done
+ atexit.register(Utils.cleanup, [tmp])
+ return tmp
+ @staticmethod
+ def create_tmpfile_copy(inc_file):
+ '''create a temporary copy of a file'''
+ tmpfile = Utils.create_tmpfile('lib_openshift-')
+ Utils._write(tmpfile, open(inc_file).read())
+ # Cleanup the tmpfile
+ atexit.register(Utils.cleanup, [tmpfile])
+ return tmpfile
+ @staticmethod
+ def create_tmpfile(prefix='tmp'):
+ ''' Generates and returns a temporary file name '''
+ with tempfile.NamedTemporaryFile(prefix=prefix, delete=False) as tmp:
+ return
+ @staticmethod
+ def create_tmp_files_from_contents(content, content_type=None):
+ '''Turn an array of dict: filename, content into a files array'''
+ if not isinstance(content, list):
+ content = [content]
+ files = []
+ for item in content:
+ path = Utils.create_tmp_file_from_contents(item['path'] + '-',
+ item['data'],
+ ftype=content_type)
+ files.append({'name': os.path.basename(item['path']),
+ 'path': path})
+ return files
+ @staticmethod
+ def cleanup(files):
+ '''Clean up on exit '''
+ for sfile in files:
+ if os.path.exists(sfile):
+ if os.path.isdir(sfile):
+ shutil.rmtree(sfile)
+ elif os.path.isfile(sfile):
+ os.remove(sfile)
+ @staticmethod
+ def exists(results, _name):
+ ''' Check to see if the results include the name '''
+ if not results:
+ return False
+ if Utils.find_result(results, _name):
+ return True
+ return False
+ @staticmethod
+ def find_result(results, _name):
+ ''' Find the specified result by name'''
+ rval = None
+ for result in results:
+ if 'metadata' in result and result['metadata']['name'] == _name:
+ rval = result
+ break
+ return rval
+ @staticmethod
+ def get_resource_file(sfile, sfile_type='yaml'):
+ ''' return the service file '''
+ contents = None
+ with open(sfile) as sfd:
+ contents =
+ if sfile_type == 'yaml':
+ # AUDIT:no-member makes sense here due to ruamel.YAML/PyYAML usage
+ # pylint: disable=no-member
+ if hasattr(yaml, 'RoundTripLoader'):
+ contents = yaml.load(contents, yaml.RoundTripLoader)
+ else:
+ contents = yaml.safe_load(contents)
+ elif sfile_type == 'json':
+ contents = json.loads(contents)
+ return contents
+ @staticmethod
+ def filter_versions(stdout):
+ ''' filter the oc version output '''
+ version_dict = {}
+ version_search = ['oc', 'openshift', 'kubernetes']
+ for line in stdout.strip().split('\n'):
+ for term in version_search:
+ if not line:
+ continue
+ if line.startswith(term):
+ version_dict[term] = line.split()[-1]
+ # horrible hack to get openshift version in Openshift 3.2
+ # By default "oc version in 3.2 does not return an "openshift" version
+ if "openshift" not in version_dict:
+ version_dict["openshift"] = version_dict["oc"]
+ return version_dict
+ @staticmethod
+ def add_custom_versions(versions):
+ ''' create custom versions strings '''
+ versions_dict = {}
+ for tech, version in versions.items():
+ # clean up "-" from version
+ if "-" in version:
+ version = version.split("-")[0]
+ if version.startswith('v'):
+ versions_dict[tech + '_numeric'] = version[1:].split('+')[0]
+ # "v3.3.0.33" is what we have, we want "3.3"
+ versions_dict[tech + '_short'] = version[1:4]
+ return versions_dict
+ @staticmethod
+ def openshift_installed():
+ ''' check if openshift is installed '''
+ import rpm
+ transaction_set = rpm.TransactionSet()
+ rpmquery = transaction_set.dbMatch("name", "atomic-openshift")
+ return rpmquery.count() > 0
+ # Disabling too-many-branches. This is a yaml dictionary comparison function
+ # pylint: disable=too-many-branches,too-many-return-statements,too-many-statements
+ @staticmethod
+ def check_def_equal(user_def, result_def, skip_keys=None, debug=False):
+ ''' Given a user defined definition, compare it with the results given back by our query. '''
+ # Currently these values are autogenerated and we do not need to check them
+ skip = ['metadata', 'status']
+ if skip_keys:
+ skip.extend(skip_keys)
+ for key, value in result_def.items():
+ if key in skip:
+ continue
+ # Both are lists
+ if isinstance(value, list):
+ if key not in user_def:
+ if debug:
+ print('User data does not have key [%s]' % key)
+ print('User data: %s' % user_def)
+ return False
+ if not isinstance(user_def[key], list):
+ if debug:
+ print('user_def[key] is not a list key=[%s] user_def[key]=%s' % (key, user_def[key]))
+ return False
+ if len(user_def[key]) != len(value):
+ if debug:
+ print("List lengths are not equal.")
+ print("key=[%s]: user_def[%s] != value[%s]" % (key, len(user_def[key]), len(value)))
+ print("user_def: %s" % user_def[key])
+ print("value: %s" % value)
+ return False
+ for values in zip(user_def[key], value):
+ if isinstance(values[0], dict) and isinstance(values[1], dict):
+ if debug:
+ print('sending list - list')
+ print(type(values[0]))
+ print(type(values[1]))
+ result = Utils.check_def_equal(values[0], values[1], skip_keys=skip_keys, debug=debug)
+ if not result:
+ print('list compare returned false')
+ return False
+ elif value != user_def[key]:
+ if debug:
+ print('value should be identical')
+ print(user_def[key])
+ print(value)
+ return False
+ # recurse on a dictionary
+ elif isinstance(value, dict):
+ if key not in user_def:
+ if debug:
+ print("user_def does not have key [%s]" % key)
+ return False
+ if not isinstance(user_def[key], dict):
+ if debug:
+ print("dict returned false: not instance of dict")
+ return False
+ # before passing ensure keys match
+ api_values = set(value.keys()) - set(skip)
+ user_values = set(user_def[key].keys()) - set(skip)
+ if api_values != user_values:
+ if debug:
+ print("keys are not equal in dict")
+ print(user_values)
+ print(api_values)
+ return False
+ result = Utils.check_def_equal(user_def[key], value, skip_keys=skip_keys, debug=debug)
+ if not result:
+ if debug:
+ print("dict returned false")
+ print(result)
+ return False
+ # Verify each key, value pair is the same
+ else:
+ if key not in user_def or value != user_def[key]:
+ if debug:
+ print("value not equal; user_def does not have key")
+ print(key)
+ print(value)
+ if key in user_def:
+ print(user_def[key])
+ return False
+ if debug:
+ print('returning true')
+ return True
+class OpenShiftCLIConfig(object):
+ '''Generic Config'''
+ def __init__(self, rname, namespace, kubeconfig, options):
+ self.kubeconfig = kubeconfig
+ = rname
+ self.namespace = namespace
+ self._options = options
+ @property
+ def config_options(self):
+ ''' return config options '''
+ return self._options
+ def to_option_list(self, ascommalist=''):
+ '''return all options as a string
+ if ascommalist is set to the name of a key, and
+ the value of that key is a dict, format the dict
+ as a list of comma delimited key=value pairs'''
+ return self.stringify(ascommalist)
+ def stringify(self, ascommalist=''):
+ ''' return the options hash as cli params in a string
+ if ascommalist is set to the name of a key, and
+ the value of that key is a dict, format the dict
+ as a list of comma delimited key=value pairs '''
+ rval = []
+ for key in sorted(self.config_options.keys()):
+ data = self.config_options[key]
+ if data['include'] \
+ and (data['value'] is not None or isinstance(data['value'], int)):
+ if key == ascommalist:
+ val = ','.join(['{}={}'.format(kk, vv) for kk, vv in sorted(data['value'].items())])
+ else:
+ val = data['value']
+ rval.append('--{}={}'.format(key.replace('_', '-'), val))
+ return rval
+# pylint: disable=too-many-public-methods
+class ClusterRole(Yedit):
+ ''' Class to model an openshift ClusterRole'''
+ rules_path = "rules"
+ def __init__(self, name=None, content=None):
+ ''' Constructor for clusterrole '''
+ if content is None:
+ content = ClusterRole.builder(name).yaml_dict
+ super(ClusterRole, self).__init__(content=content)
+ self.__rules = Rule.parse_rules(self.get(ClusterRole.rules_path)) or []
+ @property
+ def rules(self):
+ return self.__rules
+ @rules.setter
+ def rules(self, data):
+ self.__rules = data
+ self.put(ClusterRole.rules_path, self.__rules)
+ def rule_exists(self, inc_rule):
+ '''attempt to find the inc_rule in the rules list'''
+ for rule in self.rules:
+ if rule == inc_rule:
+ return True
+ return False
+ def compare(self, other, verbose=False):
+ '''compare function for clusterrole'''
+ for rule in other.rules:
+ if rule not in self.rules:
+ if verbose:
+ print('Rule in other not found in self. [{}]'.format(rule))
+ return False
+ for rule in self.rules:
+ if rule not in other.rules:
+ if verbose:
+ print('Rule in self not found in other. [{}]'.format(rule))
+ return False
+ return True
+ @staticmethod
+ def builder(name='default_clusterrole', rules=None):
+ '''return a clusterrole with name and/or rules'''
+ if rules is None:
+ rules = [{'apiGroups': [""],
+ 'attributeRestrictions': None,
+ 'verbs': [],
+ 'resources': []}]
+ content = {
+ 'apiVersion': 'v1',
+ 'kind': 'ClusterRole',
+ 'metadata': {'name': '{}'.format(name)},
+ 'rules': rules,
+ }
+ return ClusterRole(content=content)
+class DeploymentConfig(Yedit):
+ ''' Class to model an openshift DeploymentConfig'''
+ default_deployment_config = '''
+apiVersion: v1
+kind: DeploymentConfig
+ name: default_dc
+ namespace: default
+ replicas: 0
+ selector:
+ default_dc: default_dc
+ strategy:
+ resources: {}
+ rollingParams:
+ intervalSeconds: 1
+ maxSurge: 0
+ maxUnavailable: 25%
+ timeoutSeconds: 600
+ updatePercent: -25
+ updatePeriodSeconds: 1
+ type: Rolling
+ template:
+ metadata:
+ spec:
+ containers:
+ - env:
+ - name: default
+ value: default
+ image: default
+ imagePullPolicy: IfNotPresent
+ name: default_dc
+ ports:
+ - containerPort: 8000
+ hostPort: 8000
+ protocol: TCP
+ name: default_port
+ resources: {}
+ terminationMessagePath: /dev/termination-log
+ dnsPolicy: ClusterFirst
+ hostNetwork: true
+ nodeSelector:
+ type: compute
+ restartPolicy: Always
+ securityContext: {}
+ serviceAccount: default
+ serviceAccountName: default
+ terminationGracePeriodSeconds: 30
+ triggers:
+ - type: ConfigChange
+ replicas_path = "spec.replicas"
+ env_path = "spec.template.spec.containers[0].env"
+ volumes_path = "spec.template.spec.volumes"
+ container_path = "spec.template.spec.containers"
+ volume_mounts_path = "spec.template.spec.containers[0].volumeMounts"
+ def __init__(self, content=None):
+ ''' Constructor for deploymentconfig '''
+ if not content:
+ content = DeploymentConfig.default_deployment_config
+ super(DeploymentConfig, self).__init__(content=content)
+ def add_env_value(self, key, value):
+ ''' add key, value pair to env array '''
+ rval = False
+ env = self.get_env_vars()
+ if env:
+ env.append({'name': key, 'value': value})
+ rval = True
+ else:
+ result = self.put(DeploymentConfig.env_path, {'name': key, 'value': value})
+ rval = result[0]
+ return rval
+ def exists_env_value(self, key, value):
+ ''' return whether a key, value pair exists '''
+ results = self.get_env_vars()
+ if not results:
+ return False
+ for result in results:
+ if result['name'] == key and result['value'] == value:
+ return True
+ return False
+ def exists_env_key(self, key):
+ ''' return whether a key, value pair exists '''
+ results = self.get_env_vars()
+ if not results:
+ return False
+ for result in results:
+ if result['name'] == key:
+ return True
+ return False
+ def get_env_var(self, key):
+ '''return a environment variables '''
+ results = self.get(DeploymentConfig.env_path) or []
+ if not results:
+ return None
+ for env_var in results:
+ if env_var['name'] == key:
+ return env_var
+ return None
+ def get_env_vars(self):
+ '''return a environment variables '''
+ return self.get(DeploymentConfig.env_path) or []
+ def delete_env_var(self, keys):
+ '''delete a list of keys '''
+ if not isinstance(keys, list):
+ keys = [keys]
+ env_vars_array = self.get_env_vars()
+ modified = False
+ idx = None
+ for key in keys:
+ for env_idx, env_var in enumerate(env_vars_array):
+ if env_var['name'] == key:
+ idx = env_idx
+ break
+ if idx:
+ modified = True
+ del env_vars_array[idx]
+ if modified:
+ return True
+ return False
+ def update_env_var(self, key, value):
+ '''place an env in the env var list'''
+ env_vars_array = self.get_env_vars()
+ idx = None
+ for env_idx, env_var in enumerate(env_vars_array):
+ if env_var['name'] == key:
+ idx = env_idx
+ break
+ if idx:
+ env_vars_array[idx]['value'] = value
+ else:
+ self.add_env_value(key, value)
+ return True
+ def exists_volume_mount(self, volume_mount):
+ ''' return whether a volume mount exists '''
+ exist_volume_mounts = self.get_volume_mounts()
+ if not exist_volume_mounts:
+ return False
+ volume_mount_found = False
+ for exist_volume_mount in exist_volume_mounts:
+ if exist_volume_mount['name'] == volume_mount['name']:
+ volume_mount_found = True
+ break
+ return volume_mount_found
+ def exists_volume(self, volume):
+ ''' return whether a volume exists '''
+ exist_volumes = self.get_volumes()
+ volume_found = False
+ for exist_volume in exist_volumes:
+ if exist_volume['name'] == volume['name']:
+ volume_found = True
+ break
+ return volume_found
+ def find_volume_by_name(self, volume, mounts=False):
+ ''' return the index of a volume '''
+ volumes = []
+ if mounts:
+ volumes = self.get_volume_mounts()
+ else:
+ volumes = self.get_volumes()
+ for exist_volume in volumes:
+ if exist_volume['name'] == volume['name']:
+ return exist_volume
+ return None
+ def get_replicas(self):
+ ''' return replicas setting '''
+ return self.get(DeploymentConfig.replicas_path)
+ def get_volume_mounts(self):
+ '''return volume mount information '''
+ return self.get_volumes(mounts=True)
+ def get_volumes(self, mounts=False):
+ '''return volume mount information '''
+ if mounts:
+ return self.get(DeploymentConfig.volume_mounts_path) or []
+ return self.get(DeploymentConfig.volumes_path) or []
+ def delete_volume_by_name(self, volume):
+ '''delete a volume '''
+ modified = False
+ exist_volume_mounts = self.get_volume_mounts()
+ exist_volumes = self.get_volumes()
+ del_idx = None
+ for idx, exist_volume in enumerate(exist_volumes):
+ if 'name' in exist_volume and exist_volume['name'] == volume['name']:
+ del_idx = idx
+ break
+ if del_idx != None:
+ del exist_volumes[del_idx]
+ modified = True
+ del_idx = None
+ for idx, exist_volume_mount in enumerate(exist_volume_mounts):
+ if 'name' in exist_volume_mount and exist_volume_mount['name'] == volume['name']:
+ del_idx = idx
+ break
+ if del_idx != None:
+ del exist_volume_mounts[idx]
+ modified = True
+ return modified
+ def add_volume_mount(self, volume_mount):
+ ''' add a volume or volume mount to the proper location '''
+ exist_volume_mounts = self.get_volume_mounts()
+ if not exist_volume_mounts and volume_mount:
+ self.put(DeploymentConfig.volume_mounts_path, [volume_mount])
+ else:
+ exist_volume_mounts.append(volume_mount)
+ def add_volume(self, volume):
+ ''' add a volume or volume mount to the proper location '''
+ exist_volumes = self.get_volumes()
+ if not volume:
+ return
+ if not exist_volumes:
+ self.put(DeploymentConfig.volumes_path, [volume])
+ else:
+ exist_volumes.append(volume)
+ def update_replicas(self, replicas):
+ ''' update replicas value '''
+ self.put(DeploymentConfig.replicas_path, replicas)
+ def update_volume(self, volume):
+ '''place an env in the env var list'''
+ exist_volumes = self.get_volumes()
+ if not volume:
+ return False
+ # update the volume
+ update_idx = None
+ for idx, exist_vol in enumerate(exist_volumes):
+ if exist_vol['name'] == volume['name']:
+ update_idx = idx
+ break
+ if update_idx != None:
+ exist_volumes[update_idx] = volume
+ else:
+ self.add_volume(volume)
+ return True
+ def update_volume_mount(self, volume_mount):
+ '''place an env in the env var list'''
+ modified = False
+ exist_volume_mounts = self.get_volume_mounts()
+ if not volume_mount:
+ return False
+ # update the volume mount
+ for exist_vol_mount in exist_volume_mounts:
+ if exist_vol_mount['name'] == volume_mount['name']:
+ if 'mountPath' in exist_vol_mount and \
+ str(exist_vol_mount['mountPath']) != str(volume_mount['mountPath']):
+ exist_vol_mount['mountPath'] = volume_mount['mountPath']
+ modified = True
+ break
+ if not modified:
+ self.add_volume_mount(volume_mount)
+ modified = True
+ return modified
+ def needs_update_volume(self, volume, volume_mount):
+ ''' verify a volume update is needed '''
+ exist_volume = self.find_volume_by_name(volume)
+ exist_volume_mount = self.find_volume_by_name(volume, mounts=True)
+ results = []
+ results.append(exist_volume['name'] == volume['name'])
+ if 'secret' in volume:
+ results.append('secret' in exist_volume)
+ results.append(exist_volume['secret']['secretName'] == volume['secret']['secretName'])
+ results.append(exist_volume_mount['name'] == volume_mount['name'])
+ results.append(exist_volume_mount['mountPath'] == volume_mount['mountPath'])
+ elif 'emptyDir' in volume:
+ results.append(exist_volume_mount['name'] == volume['name'])
+ results.append(exist_volume_mount['mountPath'] == volume_mount['mountPath'])
+ elif 'persistentVolumeClaim' in volume:
+ pvc = 'persistentVolumeClaim'
+ results.append(pvc in exist_volume)
+ if results[-1]:
+ results.append(exist_volume[pvc]['claimName'] == volume[pvc]['claimName'])
+ if 'claimSize' in volume[pvc]:
+ results.append(exist_volume[pvc]['claimSize'] == volume[pvc]['claimSize'])
+ elif 'hostpath' in volume:
+ results.append('hostPath' in exist_volume)
+ results.append(exist_volume['hostPath']['path'] == volume_mount['mountPath'])
+ return not all(results)
+ def needs_update_replicas(self, replicas):
+ ''' verify whether a replica update is needed '''
+ current_reps = self.get(DeploymentConfig.replicas_path)
+ return not current_reps == replicas
+ ''' Handle route options '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ sname,
+ namespace,
+ kubeconfig):
+ ''' constructor for handling group options '''
+ self.kubeconfig = kubeconfig
+ = sname
+ self.namespace = namespace
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' return a service as a dict '''
+['apiVersion'] = 'v1'
+['kind'] = 'Group'
+['metadata'] = {}
+['metadata']['name'] =
+['users'] = None
+# pylint: disable=too-many-instance-attributes
+class Group(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ kind = 'group'
+ def __init__(self, content):
+ '''Group constructor'''
+ super(Group, self).__init__(content=content)
+from __future__ import print_function
+import atexit
+import copy
+import json
+import os
+import re
+import shutil
+import subprocess
+import tempfile
+# pylint: disable=import-error
+ import ruamel.yaml as yaml
+except ImportError:
+ import yaml
+from ansible.module_utils.basic import AnsibleModule
+class ProjectConfig(OpenShiftCLIConfig):
+ ''' project config object '''
+ def __init__(self, rname, namespace, kubeconfig, project_options):
+ super(ProjectConfig, self).__init__(rname, None, kubeconfig, project_options)
+class Project(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ annotations_path = "metadata.annotations"
+ kind = 'Project'
+ annotation_prefix = ''
+ def __init__(self, content):
+ '''Project constructor'''
+ super(Project, self).__init__(content=content)
+ def get_annotations(self):
+ ''' return the annotations'''
+ return self.get(Project.annotations_path) or {}
+ def add_annotations(self, inc_annos):
+ ''' add an annotation to the other annotations'''
+ if not isinstance(inc_annos, list):
+ inc_annos = [inc_annos]
+ annos = self.get_annotations()
+ if not annos:
+ self.put(Project.annotations_path, inc_annos)
+ else:
+ for anno in inc_annos:
+ for key, value in anno.items():
+ annos[key] = value
+ return True
+ def find_annotation(self, key):
+ ''' find an annotation'''
+ annotations = self.get_annotations()
+ for anno in annotations:
+ if Project.annotation_prefix + key == anno:
+ return annotations[anno]
+ return None
+ def delete_annotation(self, inc_anno_keys):
+ ''' remove an annotation from a project'''
+ if not isinstance(inc_anno_keys, list):
+ inc_anno_keys = [inc_anno_keys]
+ annos = self.get(Project.annotations_path) or {}
+ if not annos:
+ return True
+ removed = False
+ for inc_anno in inc_anno_keys:
+ anno = self.find_annotation(inc_anno)
+ if anno:
+ del annos[Project.annotation_prefix + anno]
+ removed = True
+ return removed
+ def update_annotation(self, key, value):
+ ''' remove an annotation for a project'''
+ annos = self.get(Project.annotations_path) or {}
+ if not annos:
+ return True
+ updated = False
+ anno = self.find_annotation(key)
+ if anno:
+ annos[Project.annotation_prefix + key] = value
+ updated = True
+ else:
+ self.add_annotations({Project.annotation_prefix + key: value})
+ return updated
+class PersistentVolumeClaimConfig(object):
+ ''' Handle pvc options '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ sname,
+ namespace,
+ kubeconfig,
+ access_modes=None,
+ vol_capacity='1G',
+ selector=None,
+ storage_class_name=None):
+ ''' constructor for handling pvc options '''
+ self.kubeconfig = kubeconfig
+ = sname
+ self.namespace = namespace
+ self.access_modes = access_modes
+ self.vol_capacity = vol_capacity
+ = {}
+ self.selector = selector
+ self.storage_class_name = storage_class_name
+ self.create_dict()
+ def create_dict(self):
+ ''' return a service as a dict '''
+ # version
+['apiVersion'] = 'v1'
+ # kind
+['kind'] = 'PersistentVolumeClaim'
+ # metadata
+['metadata'] = {}
+['metadata']['name'] =
+ # spec
+['spec'] = {}
+['spec']['accessModes'] = ['ReadWriteOnce']
+ if self.access_modes:
+['spec']['accessModes'] = self.access_modes
+ if self.selector:
+['spec']['selector'] = {'matchLabels': self.selector}
+ # storage capacity
+['spec']['resources'] = {}
+['spec']['resources']['requests'] = {}
+['spec']['resources']['requests']['storage'] = self.vol_capacity
+ if self.storage_class_name:
+['spec']['storageClassName'] = self.storage_class_name
+# pylint: disable=too-many-instance-attributes,too-many-public-methods
+class PersistentVolumeClaim(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ access_modes_path = "spec.accessModes"
+ volume_capacity_path = ""
+ volume_name_path = "spec.volumeName"
+ bound_path = "status.phase"
+ kind = 'PersistentVolumeClaim'
+ selector_path = "spec.selector.matchLabels"
+ storage_class_name_path = "spec.storageClassName"
+ def __init__(self, content):
+ '''PersistentVolumeClaim constructor'''
+ super(PersistentVolumeClaim, self).__init__(content=content)
+ self._access_modes = None
+ self._volume_capacity = None
+ self._volume_name = None
+ self._selector = None
+ self._storage_class_name = None
+ @property
+ def storage_class_name(self):
+ ''' storage_class_name property '''
+ if self._storage_class_name is None:
+ self._storage_class_name = self.get_storage_class_name()
+ return self._storage_class_name
+ @storage_class_name.setter
+ def storage_class_name(self, data):
+ ''' storage_class_name property setter'''
+ self._storage_class_name = data
+ @property
+ def volume_name(self):
+ ''' volume_name property '''
+ if self._volume_name is None:
+ self._volume_name = self.get_volume_name()
+ return self._volume_name
+ @volume_name.setter
+ def volume_name(self, data):
+ ''' volume_name property setter'''
+ self._volume_name = data
+ @property
+ def selector(self):
+ ''' selector property '''
+ if self._selector is None:
+ self._selector = self.get_selector()
+ if not isinstance(self._selector, dict):
+ self._selector = dict(self._selector)
+ return self._selector
+ @selector.setter
+ def selector(self, data):
+ ''' selector property setter'''
+ if not isinstance(data, dict):
+ data = dict(data)
+ self._selector = data
+ @property
+ def access_modes(self):
+ ''' access_modes property '''
+ if self._access_modes is None:
+ self._access_modes = self.get_access_modes()
+ if not isinstance(self._access_modes, list):
+ self._access_modes = list(self._access_modes)
+ return self._access_modes
+ @access_modes.setter
+ def access_modes(self, data):
+ ''' access_modes property setter'''
+ if not isinstance(data, list):
+ data = list(data)
+ self._access_modes = data
+ @property
+ def volume_capacity(self):
+ ''' volume_capacity property '''
+ if self._volume_capacity is None:
+ self._volume_capacity = self.get_volume_capacity()
+ return self._volume_capacity
+ @volume_capacity.setter
+ def volume_capacity(self, data):
+ ''' volume_capacity property setter'''
+ self._volume_capacity = data
+ def get_storage_class_name(self):
+ '''get storage_class_name'''
+ return self.get(PersistentVolumeClaim.storage_class_name_path) or []
+ def get_selector(self):
+ '''get selector'''
+ return self.get(PersistentVolumeClaim.selector_path) or []
+ def get_access_modes(self):
+ '''get access_modes'''
+ return self.get(PersistentVolumeClaim.access_modes_path) or []
+ def get_volume_capacity(self):
+ '''get volume_capacity'''
+ return self.get(PersistentVolumeClaim.volume_capacity_path) or []
+ def get_volume_name(self):
+ '''get volume_name'''
+ return self.get(PersistentVolumeClaim.volume_name_path) or []
+ def is_bound(self):
+ '''return whether volume is bound'''
+ return self.get(PersistentVolumeClaim.bound_path) or []
+ #### ADD #####
+ def add_access_mode(self, inc_mode):
+ ''' add an access_mode'''
+ if self.access_modes:
+ self.access_modes.append(inc_mode)
+ else:
+ self.put(PersistentVolumeClaim.access_modes_path, [inc_mode])
+ return True
+ #### /ADD #####
+ #### Remove #####
+ def remove_access_mode(self, inc_mode):
+ ''' remove an access_mode'''
+ try:
+ self.access_modes.remove(inc_mode)
+ except ValueError as _:
+ return False
+ return True
+ #### /REMOVE #####
+ #### UPDATE #####
+ def update_access_mode(self, inc_mode):
+ ''' update an access_mode'''
+ try:
+ index = self.access_modes.index(inc_mode)
+ except ValueError as _:
+ return self.add_access_mode(inc_mode)
+ self.access_modes[index] = inc_mode
+ return True
+ #### /UPDATE #####
+ #### FIND ####
+ def find_access_mode(self, inc_mode):
+ ''' find a user '''
+ index = None
+ try:
+ index = self.access_modes.index(inc_mode)
+ except ValueError as _:
+ return index
+ return index
+class ReplicationController(DeploymentConfig):
+ ''' Class to model a replicationcontroller openshift object.
+ Currently we are modeled after a deployment config since they
+ are very similar. In the future, when the need arises we
+ will add functionality to this class.
+ '''
+ replicas_path = "spec.replicas"
+ env_path = "spec.template.spec.containers[0].env"
+ volumes_path = "spec.template.spec.volumes"
+ container_path = "spec.template.spec.containers"
+ volume_mounts_path = "spec.template.spec.containers[0].volumeMounts"
+ def __init__(self, content):
+ ''' Constructor for ReplicationController '''
+ super(ReplicationController, self).__init__(content=content)
+class RoleBindingConfig(object):
+ ''' Handle rolebinding config '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ name,
+ namespace,
+ kubeconfig,
+ group_names=None,
+ role_ref=None,
+ subjects=None,
+ usernames=None):
+ ''' constructor for handling rolebinding options '''
+ self.kubeconfig = kubeconfig
+ = name
+ self.namespace = namespace
+ self.group_names = group_names
+ self.role_ref = role_ref
+ self.subjects = subjects
+ self.usernames = usernames
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' create a default rolebinding as a dict '''
+['apiVersion'] = 'v1'
+['kind'] = 'RoleBinding'
+['groupNames'] = self.group_names
+['metadata']['name'] =
+['metadata']['namespace'] = self.namespace
+['roleRef'] = self.role_ref
+['subjects'] = self.subjects
+['userNames'] = self.usernames
+# pylint: disable=too-many-instance-attributes,too-many-public-methods
+class RoleBinding(Yedit):
+ ''' Class to model a rolebinding openshift object'''
+ group_names_path = "groupNames"
+ role_ref_path = "roleRef"
+ subjects_path = "subjects"
+ user_names_path = "userNames"
+ kind = 'RoleBinding'
+ def __init__(self, content):
+ '''RoleBinding constructor'''
+ super(RoleBinding, self).__init__(content=content)
+ self._subjects = None
+ self._role_ref = None
+ self._group_names = None
+ self._user_names = None
+ @property
+ def subjects(self):
+ ''' subjects property '''
+ if self._subjects is None:
+ self._subjects = self.get_subjects()
+ return self._subjects
+ @subjects.setter
+ def subjects(self, data):
+ ''' subjects property setter'''
+ self._subjects = data
+ @property
+ def role_ref(self):
+ ''' role_ref property '''
+ if self._role_ref is None:
+ self._role_ref = self.get_role_ref()
+ return self._role_ref
+ @role_ref.setter
+ def role_ref(self, data):
+ ''' role_ref property setter'''
+ self._role_ref = data
+ @property
+ def group_names(self):
+ ''' group_names property '''
+ if self._group_names is None:
+ self._group_names = self.get_group_names()
+ return self._group_names
+ @group_names.setter
+ def group_names(self, data):
+ ''' group_names property setter'''
+ self._group_names = data
+ @property
+ def user_names(self):
+ ''' user_names property '''
+ if self._user_names is None:
+ self._user_names = self.get_user_names()
+ return self._user_names
+ @user_names.setter
+ def user_names(self, data):
+ ''' user_names property setter'''
+ self._user_names = data
+ def get_group_names(self):
+ ''' return groupNames '''
+ return self.get(RoleBinding.group_names_path) or []
+ def get_user_names(self):
+ ''' return usernames '''
+ return self.get(RoleBinding.user_names_path) or []
+ def get_role_ref(self):
+ ''' return role_ref '''
+ return self.get(RoleBinding.role_ref_path) or {}
+ def get_subjects(self):
+ ''' return subjects '''
+ return self.get(RoleBinding.subjects_path) or []
+ #### ADD #####
+ def add_subject(self, inc_subject):
+ ''' add a subject '''
+ if self.subjects:
+ # pylint: disable=no-member
+ self.subjects.append(inc_subject)
+ else:
+ self.put(RoleBinding.subjects_path, [inc_subject])
+ return True
+ def add_role_ref(self, inc_role_ref):
+ ''' add a role_ref '''
+ if not self.role_ref:
+ self.put(RoleBinding.role_ref_path, {"name": inc_role_ref})
+ return True
+ return False
+ def add_group_names(self, inc_group_names):
+ ''' add a group_names '''
+ if self.group_names:
+ # pylint: disable=no-member
+ self.group_names.append(inc_group_names)
+ else:
+ self.put(RoleBinding.group_names_path, [inc_group_names])
+ return True
+ def add_user_name(self, inc_user_name):
+ ''' add a username '''
+ if self.user_names:
+ # pylint: disable=no-member
+ self.user_names.append(inc_user_name)
+ else:
+ self.put(RoleBinding.user_names_path, [inc_user_name])
+ return True
+ #### /ADD #####
+ #### Remove #####
+ def remove_subject(self, inc_subject):
+ ''' remove a subject '''
+ try:
+ # pylint: disable=no-member
+ self.subjects.remove(inc_subject)
+ except ValueError as _:
+ return False
+ return True
+ def remove_role_ref(self, inc_role_ref):
+ ''' remove a role_ref '''
+ if self.role_ref and self.role_ref['name'] == inc_role_ref:
+ del self.role_ref['name']
+ return True
+ return False
+ def remove_group_name(self, inc_group_name):
+ ''' remove a groupname '''
+ try:
+ # pylint: disable=no-member
+ self.group_names.remove(inc_group_name)
+ except ValueError as _:
+ return False
+ return True
+ def remove_user_name(self, inc_user_name):
+ ''' remove a username '''
+ try:
+ # pylint: disable=no-member
+ self.user_names.remove(inc_user_name)
+ except ValueError as _:
+ return False
+ return True
+ #### /REMOVE #####
+ #### UPDATE #####
+ def update_subject(self, inc_subject):
+ ''' update a subject '''
+ try:
+ # pylint: disable=no-member
+ index = self.subjects.index(inc_subject)
+ except ValueError as _:
+ return self.add_subject(inc_subject)
+ self.subjects[index] = inc_subject
+ return True
+ def update_group_name(self, inc_group_name):
+ ''' update a groupname '''
+ try:
+ # pylint: disable=no-member
+ index = self.group_names.index(inc_group_name)
+ except ValueError as _:
+ return self.add_group_names(inc_group_name)
+ self.group_names[index] = inc_group_name
+ return True
+ def update_user_name(self, inc_user_name):
+ ''' update a username '''
+ try:
+ # pylint: disable=no-member
+ index = self.user_names.index(inc_user_name)
+ except ValueError as _:
+ return self.add_user_name(inc_user_name)
+ self.user_names[index] = inc_user_name
+ return True
+ def update_role_ref(self, inc_role_ref):
+ ''' update a role_ref '''
+ self.role_ref['name'] = inc_role_ref
+ return True
+ #### /UPDATE #####
+ #### FIND ####
+ def find_subject(self, inc_subject):
+ ''' find a subject '''
+ index = None
+ try:
+ # pylint: disable=no-member
+ index = self.subjects.index(inc_subject)
+ except ValueError as _:
+ return index
+ return index
+ def find_group_name(self, inc_group_name):
+ ''' find a group_name '''
+ index = None
+ try:
+ # pylint: disable=no-member
+ index = self.group_names.index(inc_group_name)
+ except ValueError as _:
+ return index
+ return index
+ def find_user_name(self, inc_user_name):
+ ''' find a user_name '''
+ index = None
+ try:
+ # pylint: disable=no-member
+ index = self.user_names.index(inc_user_name)
+ except ValueError as _:
+ return index
+ return index
+ def find_role_ref(self, inc_role_ref):
+ ''' find a user_name '''
+ if self.role_ref and self.role_ref['name'] == inc_role_ref['name']:
+ return self.role_ref
+ return None
+class RouteConfig(object):
+ ''' Handle route options '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ sname,
+ namespace,
+ kubeconfig,
+ labels=None,
+ destcacert=None,
+ cacert=None,
+ cert=None,
+ key=None,
+ host=None,
+ tls_termination=None,
+ service_name=None,
+ wildcard_policy=None,
+ weight=None,
+ port=None):
+ ''' constructor for handling route options '''
+ self.kubeconfig = kubeconfig
+ = sname
+ self.namespace = namespace
+ self.labels = labels
+ = host
+ self.tls_termination = tls_termination
+ self.destcacert = destcacert
+ self.cacert = cacert
+ self.cert = cert
+ self.key = key
+ self.service_name = service_name
+ self.port = port
+ = {}
+ self.wildcard_policy = wildcard_policy
+ if wildcard_policy is None:
+ self.wildcard_policy = 'None'
+ self.weight = weight
+ if weight is None:
+ self.weight = 100
+ self.create_dict()
+ def create_dict(self):
+ ''' return a service as a dict '''
+['apiVersion'] = 'v1'
+['kind'] = 'Route'
+['metadata'] = {}
+['metadata']['name'] =
+['metadata']['namespace'] = self.namespace
+ if self.labels:
+['metadata']['labels'] = self.labels
+['spec'] = {}
+['spec']['host'] =
+ if self.tls_termination:
+['spec']['tls'] = {}
+['spec']['tls']['termination'] = self.tls_termination
+ if self.tls_termination != 'passthrough':
+['spec']['tls']['key'] = self.key
+['spec']['tls']['caCertificate'] = self.cacert
+['spec']['tls']['certificate'] = self.cert
+ if self.tls_termination == 'reencrypt':
+['spec']['tls']['destinationCACertificate'] = self.destcacert
+['spec']['to'] = {'kind': 'Service',
+ 'name': self.service_name,
+ 'weight': self.weight}
+['spec']['wildcardPolicy'] = self.wildcard_policy
+ if self.port:
+['spec']['port'] = {}
+['spec']['port']['targetPort'] = self.port
+# pylint: disable=too-many-instance-attributes,too-many-public-methods
+class Route(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ wildcard_policy = "spec.wildcardPolicy"
+ host_path = ""
+ port_path = "spec.port.targetPort"
+ service_path = ""
+ weight_path = ""
+ cert_path = "spec.tls.certificate"
+ cacert_path = "spec.tls.caCertificate"
+ destcacert_path = "spec.tls.destinationCACertificate"
+ termination_path = "spec.tls.termination"
+ key_path = "spec.tls.key"
+ kind = 'route'
+ def __init__(self, content):
+ '''Route constructor'''
+ super(Route, self).__init__(content=content)
+ def get_destcacert(self):
+ ''' return cert '''
+ return self.get(Route.destcacert_path)
+ def get_cert(self):
+ ''' return cert '''
+ return self.get(Route.cert_path)
+ def get_key(self):
+ ''' return key '''
+ return self.get(Route.key_path)
+ def get_cacert(self):
+ ''' return cacert '''
+ return self.get(Route.cacert_path)
+ def get_service(self):
+ ''' return service name '''
+ return self.get(Route.service_path)
+ def get_weight(self):
+ ''' return service weight '''
+ return self.get(Route.weight_path)
+ def get_termination(self):
+ ''' return tls termination'''
+ return self.get(Route.termination_path)
+ def get_host(self):
+ ''' return host '''
+ return self.get(Route.host_path)
+ def get_port(self):
+ ''' return port '''
+ return self.get(Route.port_path)
+ def get_wildcard_policy(self):
+ ''' return wildcardPolicy '''
+ return self.get(Route.wildcard_policy)
+ '''class to represent a clusterrole rule
+ Example Rule Object's yaml:
+ - apiGroups:
+ - ""
+ attributeRestrictions: null
+ resources:
+ - persistentvolumes
+ verbs:
+ - create
+ - delete
+ - deletecollection
+ - get
+ - list
+ - patch
+ - update
+ - watch
+ '''
+ def __init__(self,
+ api_groups=None,
+ attr_restrictions=None,
+ resources=None,
+ verbs=None):
+ self.__api_groups = api_groups if api_groups is not None else [""]
+ self.__verbs = verbs if verbs is not None else []
+ self.__resources = resources if resources is not None else []
+ self.__attribute_restrictions = attr_restrictions if attr_restrictions is not None else None
+ @property
+ def verbs(self):
+ '''property for verbs'''
+ if self.__verbs is None:
+ return []
+ return self.__verbs
+ @verbs.setter
+ def verbs(self, data):
+ '''setter for verbs'''
+ self.__verbs = data
+ @property
+ def api_groups(self):
+ '''property for api_groups'''
+ if self.__api_groups is None:
+ return []
+ return self.__api_groups
+ @api_groups.setter
+ def api_groups(self, data):
+ '''setter for api_groups'''
+ self.__api_groups = data
+ @property
+ def resources(self):
+ '''property for resources'''
+ if self.__resources is None:
+ return []
+ return self.__resources
+ @resources.setter
+ def resources(self, data):
+ '''setter for resources'''
+ self.__resources = data
+ @property
+ def attribute_restrictions(self):
+ '''property for attribute_restrictions'''
+ return self.__attribute_restrictions
+ @attribute_restrictions.setter
+ def attribute_restrictions(self, data):
+ '''setter for attribute_restrictions'''
+ self.__attribute_restrictions = data
+ def add_verb(self, inc_verb):
+ '''add a verb to the verbs array'''
+ self.verbs.append(inc_verb)
+ def add_api_group(self, inc_apigroup):
+ '''add an api_group to the api_groups array'''
+ self.api_groups.append(inc_apigroup)
+ def add_resource(self, inc_resource):
+ '''add an resource to the resources array'''
+ self.resources.append(inc_resource)
+ def remove_verb(self, inc_verb):
+ '''add a verb to the verbs array'''
+ try:
+ self.verbs.remove(inc_verb)
+ return True
+ except ValueError:
+ pass
+ return False
+ def remove_api_group(self, inc_api_group):
+ '''add a verb to the verbs array'''
+ try:
+ self.api_groups.remove(inc_api_group)
+ return True
+ except ValueError:
+ pass
+ return False
+ def remove_resource(self, inc_resource):
+ '''add a verb to the verbs array'''
+ try:
+ self.resources.remove(inc_resource)
+ return True
+ except ValueError:
+ pass
+ return False
+ def __eq__(self, other):
+ '''return whether rules are equal'''
+ return (self.attribute_restrictions == other.attribute_restrictions and
+ self.api_groups == other.api_groups and
+ self.resources == other.resources and
+ self.verbs == other.verbs)
+ @staticmethod
+ def parse_rules(inc_rules):
+ '''create rules from an array'''
+ results = []
+ for rule in inc_rules:
+ results.append(Rule(rule.get('apiGroups', ['']),
+ rule.get('attributeRestrictions', None),
+ rule.get('resources', []),
+ rule.get('verbs', [])))
+ return results
+class SecurityContextConstraintsConfig(object):
+ ''' Handle scc options '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ sname,
+ kubeconfig,
+ options=None,
+ fs_group='MustRunAs',
+ default_add_capabilities=None,
+ groups=None,
+ priority=None,
+ required_drop_capabilities=None,
+ run_as_user='MustRunAsRange',
+ se_linux_context='MustRunAs',
+ supplemental_groups='RunAsAny',
+ users=None,
+ annotations=None):
+ ''' constructor for handling scc options '''
+ self.kubeconfig = kubeconfig
+ = sname
+ self.options = options
+ self.fs_group = fs_group
+ self.default_add_capabilities = default_add_capabilities
+ self.groups = groups
+ self.priority = priority
+ self.required_drop_capabilities = required_drop_capabilities
+ self.run_as_user = run_as_user
+ self.se_linux_context = se_linux_context
+ self.supplemental_groups = supplemental_groups
+ self.users = users
+ self.annotations = annotations
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' assign the correct properties for a scc dict '''
+ # allow options
+ if self.options:
+ for key, value in self.options.items():
+[key] = value
+ else:
+['allowHostDirVolumePlugin'] = False
+['allowHostIPC'] = False
+['allowHostNetwork'] = False
+['allowHostPID'] = False
+['allowHostPorts'] = False
+['allowPrivilegedContainer'] = False
+['allowedCapabilities'] = None
+ # version
+['apiVersion'] = 'v1'
+ # kind
+['kind'] = 'SecurityContextConstraints'
+ # defaultAddCapabilities
+['defaultAddCapabilities'] = self.default_add_capabilities
+ # fsGroup
+['fsGroup']['type'] = self.fs_group
+ # groups
+['groups'] = []
+ if self.groups:
+['groups'] = self.groups
+ # metadata
+['metadata'] = {}
+['metadata']['name'] =
+ if self.annotations:
+ for key, value in self.annotations.items():
+['metadata'][key] = value
+ # priority
+['priority'] = self.priority
+ # requiredDropCapabilities
+['requiredDropCapabilities'] = self.required_drop_capabilities
+ # runAsUser
+['runAsUser'] = {'type': self.run_as_user}
+ # seLinuxContext
+['seLinuxContext'] = {'type': self.se_linux_context}
+ # supplementalGroups
+['supplementalGroups'] = {'type': self.supplemental_groups}
+ # users
+['users'] = []
+ if self.users:
+['users'] = self.users
+# pylint: disable=too-many-instance-attributes,too-many-public-methods,no-member
+class SecurityContextConstraints(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ default_add_capabilities_path = "defaultAddCapabilities"
+ fs_group_path = "fsGroup"
+ groups_path = "groups"
+ priority_path = "priority"
+ required_drop_capabilities_path = "requiredDropCapabilities"
+ run_as_user_path = "runAsUser"
+ se_linux_context_path = "seLinuxContext"
+ supplemental_groups_path = "supplementalGroups"
+ users_path = "users"
+ kind = 'SecurityContextConstraints'
+ def __init__(self, content):
+ '''SecurityContextConstraints constructor'''
+ super(SecurityContextConstraints, self).__init__(content=content)
+ self._users = None
+ self._groups = None
+ @property
+ def users(self):
+ ''' users property getter '''
+ if self._users is None:
+ self._users = self.get_users()
+ return self._users
+ @property
+ def groups(self):
+ ''' groups property getter '''
+ if self._groups is None:
+ self._groups = self.get_groups()
+ return self._groups
+ @users.setter
+ def users(self, data):
+ ''' users property setter'''
+ self._users = data
+ @groups.setter
+ def groups(self, data):
+ ''' groups property setter'''
+ self._groups = data
+ def get_users(self):
+ '''get scc users'''
+ return self.get(SecurityContextConstraints.users_path) or []
+ def get_groups(self):
+ '''get scc groups'''
+ return self.get(SecurityContextConstraints.groups_path) or []
+ def add_user(self, inc_user):
+ ''' add a user '''
+ if self.users:
+ self.users.append(inc_user)
+ else:
+ self.put(SecurityContextConstraints.users_path, [inc_user])
+ return True
+ def add_group(self, inc_group):
+ ''' add a group '''
+ if self.groups:
+ self.groups.append(inc_group)
+ else:
+ self.put(SecurityContextConstraints.groups_path, [inc_group])
+ return True
+ def remove_user(self, inc_user):
+ ''' remove a user '''
+ try:
+ self.users.remove(inc_user)
+ except ValueError as _:
+ return False
+ return True
+ def remove_group(self, inc_group):
+ ''' remove a group '''
+ try:
+ self.groups.remove(inc_group)
+ except ValueError as _:
+ return False
+ return True
+ def update_user(self, inc_user):
+ ''' update a user '''
+ try:
+ index = self.users.index(inc_user)
+ except ValueError as _:
+ return self.add_user(inc_user)
+ self.users[index] = inc_user
+ return True
+ def update_group(self, inc_group):
+ ''' update a group '''
+ try:
+ index = self.groups.index(inc_group)
+ except ValueError as _:
+ return self.add_group(inc_group)
+ self.groups[index] = inc_group
+ return True
+ def find_user(self, inc_user):
+ ''' find a user '''
+ index = None
+ try:
+ index = self.users.index(inc_user)
+ except ValueError as _:
+ return index
+ return index
+ def find_group(self, inc_group):
+ ''' find a group '''
+ index = None
+ try:
+ index = self.groups.index(inc_group)
+ except ValueError as _:
+ return index
+ return index
+class SecretConfig(object):
+ ''' Handle secret options '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ sname,
+ namespace,
+ kubeconfig,
+ secrets=None,
+ stype=None,
+ annotations=None):
+ ''' constructor for handling secret options '''
+ self.kubeconfig = kubeconfig
+ = sname
+ self.type = stype
+ self.namespace = namespace
+ self.secrets = secrets
+ self.annotations = annotations
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' assign the correct properties for a secret dict '''
+['apiVersion'] = 'v1'
+['kind'] = 'Secret'
+['type'] = self.type
+['metadata'] = {}
+['metadata']['name'] =
+['metadata']['namespace'] = self.namespace
+['data'] = {}
+ if self.secrets:
+ for key, value in self.secrets.items():
+['data'][key] = value
+ if self.annotations:
+['metadata']['annotations'] = self.annotations
+# pylint: disable=too-many-instance-attributes
+class Secret(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ secret_path = "data"
+ kind = 'secret'
+ def __init__(self, content):
+ '''secret constructor'''
+ super(Secret, self).__init__(content=content)
+ self._secrets = None
+ @property
+ def secrets(self):
+ '''secret property getter'''
+ if self._secrets is None:
+ self._secrets = self.get_secrets()
+ return self._secrets
+ @secrets.setter
+ def secrets(self):
+ '''secret property setter'''
+ if self._secrets is None:
+ self._secrets = self.get_secrets()
+ return self._secrets
+ def get_secrets(self):
+ ''' returns all of the defined secrets '''
+ return self.get(Secret.secret_path) or {}
+ def add_secret(self, key, value):
+ ''' add a secret '''
+ if self.secrets:
+ self.secrets[key] = value
+ else:
+ self.put(Secret.secret_path, {key: value})
+ return True
+ def delete_secret(self, key):
+ ''' delete secret'''
+ try:
+ del self.secrets[key]
+ except KeyError as _:
+ return False
+ return True
+ def find_secret(self, key):
+ ''' find secret'''
+ rval = None
+ try:
+ rval = self.secrets[key]
+ except KeyError as _:
+ return None
+ return {'key': key, 'value': rval}
+ def update_secret(self, key, value):
+ ''' update a secret'''
+ if key in self.secrets:
+ self.secrets[key] = value
+ else:
+ self.add_secret(key, value)
+ return True
+class ServiceConfig(object):
+ ''' Handle service options '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ sname,
+ namespace,
+ ports,
+ selector=None,
+ labels=None,
+ cluster_ip=None,
+ portal_ip=None,
+ session_affinity=None,
+ service_type=None,
+ external_ips=None):
+ ''' constructor for handling service options '''
+ = sname
+ self.namespace = namespace
+ self.ports = ports
+ self.selector = selector
+ self.labels = labels
+ self.cluster_ip = cluster_ip
+ self.portal_ip = portal_ip
+ self.session_affinity = session_affinity
+ self.service_type = service_type
+ self.external_ips = external_ips
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' instantiates a service dict '''
+['apiVersion'] = 'v1'
+['kind'] = 'Service'
+['metadata'] = {}
+['metadata']['name'] =
+['metadata']['namespace'] = self.namespace
+ if self.labels:
+['metadata']['labels'] = {}
+ for lab, lab_value in self.labels.items():
+['metadata']['labels'][lab] = lab_value
+['spec'] = {}
+ if self.ports:
+['spec']['ports'] = self.ports
+ else:
+['spec']['ports'] = []
+ if self.selector:
+['spec']['selector'] = self.selector
+['spec']['sessionAffinity'] = self.session_affinity or 'None'
+ if self.cluster_ip:
+['spec']['clusterIP'] = self.cluster_ip
+ if self.portal_ip:
+['spec']['portalIP'] = self.portal_ip
+ if self.service_type:
+['spec']['type'] = self.service_type
+ if self.external_ips:
+['spec']['externalIPs'] = self.external_ips
+# pylint: disable=too-many-instance-attributes,too-many-public-methods
+class Service(Yedit):
+ ''' Class to model the oc service object '''
+ port_path = "spec.ports"
+ portal_ip = "spec.portalIP"
+ cluster_ip = "spec.clusterIP"
+ selector_path = 'spec.selector'
+ kind = 'Service'
+ external_ips = "spec.externalIPs"
+ def __init__(self, content):
+ '''Service constructor'''
+ super(Service, self).__init__(content=content)
+ def get_ports(self):
+ ''' get a list of ports '''
+ return self.get(Service.port_path) or []
+ def get_selector(self):
+ ''' get the service selector'''
+ return self.get(Service.selector_path) or {}
+ def add_ports(self, inc_ports):
+ ''' add a port object to the ports list '''
+ if not isinstance(inc_ports, list):
+ inc_ports = [inc_ports]
+ ports = self.get_ports()
+ if not ports:
+ self.put(Service.port_path, inc_ports)
+ else:
+ ports.extend(inc_ports)
+ return True
+ def find_ports(self, inc_port):
+ ''' find a specific port '''
+ for port in self.get_ports():
+ if port['port'] == inc_port['port']:
+ return port
+ return None
+ def delete_ports(self, inc_ports):
+ ''' remove a port from a service '''
+ if not isinstance(inc_ports, list):
+ inc_ports = [inc_ports]
+ ports = self.get(Service.port_path) or []
+ if not ports:
+ return True
+ removed = False
+ for inc_port in inc_ports:
+ port = self.find_ports(inc_port)
+ if port:
+ ports.remove(port)
+ removed = True
+ return removed
+ def add_cluster_ip(self, sip):
+ '''add cluster ip'''
+ self.put(Service.cluster_ip, sip)
+ def add_portal_ip(self, pip):
+ '''add cluster ip'''
+ self.put(Service.portal_ip, pip)
+ def get_external_ips(self):
+ ''' get a list of external_ips '''
+ return self.get(Service.external_ips) or []
+ def add_external_ips(self, inc_external_ips):
+ ''' add an external_ip to the external_ips list '''
+ if not isinstance(inc_external_ips, list):
+ inc_external_ips = [inc_external_ips]
+ external_ips = self.get_external_ips()
+ if not external_ips:
+ self.put(Service.external_ips, inc_external_ips)
+ else:
+ external_ips.extend(inc_external_ips)
+ return True
+ def find_external_ips(self, inc_external_ip):
+ ''' find a specific external IP '''
+ val = None
+ try:
+ idx = self.get_external_ips().index(inc_external_ip)
+ val = self.get_external_ips()[idx]
+ except ValueError:
+ pass
+ return val
+ def delete_external_ips(self, inc_external_ips):
+ ''' remove an external IP from a service '''
+ if not isinstance(inc_external_ips, list):
+ inc_external_ips = [inc_external_ips]
+ external_ips = self.get(Service.external_ips) or []
+ if not external_ips:
+ return True
+ removed = False
+ for inc_external_ip in inc_external_ips:
+ external_ip = self.find_external_ips(inc_external_ip)
+ if external_ip:
+ external_ips.remove(external_ip)
+ removed = True
+ return removed
+ '''Service account config class
+ This class stores the options and returns a default service account
+ '''
+ # pylint: disable=too-many-arguments
+ def __init__(self, sname, namespace, kubeconfig, secrets=None, image_pull_secrets=None):
+ = sname
+ self.kubeconfig = kubeconfig
+ self.namespace = namespace
+ self.secrets = secrets or []
+ self.image_pull_secrets = image_pull_secrets or []
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' instantiate a properly structured volume '''
+['apiVersion'] = 'v1'
+['kind'] = 'ServiceAccount'
+['metadata'] = {}
+['metadata']['name'] =
+['metadata']['namespace'] = self.namespace
+['secrets'] = []
+ if self.secrets:
+ for sec in self.secrets:
+['secrets'].append({"name": sec})
+['imagePullSecrets'] = []
+ if self.image_pull_secrets:
+ for sec in self.image_pull_secrets:
+['imagePullSecrets'].append({"name": sec})
+class ServiceAccount(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ image_pull_secrets_path = "imagePullSecrets"
+ secrets_path = "secrets"
+ def __init__(self, content):
+ '''ServiceAccount constructor'''
+ super(ServiceAccount, self).__init__(content=content)
+ self._secrets = None
+ self._image_pull_secrets = None
+ @property
+ def image_pull_secrets(self):
+ ''' property for image_pull_secrets '''
+ if self._image_pull_secrets is None:
+ self._image_pull_secrets = self.get(ServiceAccount.image_pull_secrets_path) or []
+ return self._image_pull_secrets
+ @image_pull_secrets.setter
+ def image_pull_secrets(self, secrets):
+ ''' property for secrets '''
+ self._image_pull_secrets = secrets
+ @property
+ def secrets(self):
+ ''' property for secrets '''
+ if not self._secrets:
+ self._secrets = self.get(ServiceAccount.secrets_path) or []
+ return self._secrets
+ @secrets.setter
+ def secrets(self, secrets):
+ ''' property for secrets '''
+ self._secrets = secrets
+ def delete_secret(self, inc_secret):
+ ''' remove a secret '''
+ remove_idx = None
+ for idx, sec in enumerate(self.secrets):
+ if sec['name'] == inc_secret:
+ remove_idx = idx
+ break
+ if remove_idx:
+ del self.secrets[remove_idx]
+ return True
+ return False
+ def delete_image_pull_secret(self, inc_secret):
+ ''' remove a image_pull_secret '''
+ remove_idx = None
+ for idx, sec in enumerate(self.image_pull_secrets):
+ if sec['name'] == inc_secret:
+ remove_idx = idx
+ break
+ if remove_idx:
+ del self.image_pull_secrets[remove_idx]
+ return True
+ return False
+ def find_secret(self, inc_secret):
+ '''find secret'''
+ for secret in self.secrets:
+ if secret['name'] == inc_secret:
+ return secret
+ return None
+ def find_image_pull_secret(self, inc_secret):
+ '''find secret'''
+ for secret in self.image_pull_secrets:
+ if secret['name'] == inc_secret:
+ return secret
+ return None
+ def add_secret(self, inc_secret):
+ '''add secret'''
+ if self.secrets:
+ self.secrets.append({"name": inc_secret}) # pylint: disable=no-member
+ else:
+ self.put(ServiceAccount.secrets_path, [{"name": inc_secret}])
+ def add_image_pull_secret(self, inc_secret):
+ '''add image_pull_secret'''
+ if self.image_pull_secrets:
+ self.image_pull_secrets.append({"name": inc_secret}) # pylint: disable=no-member
+ else:
+ self.put(ServiceAccount.image_pull_secrets_path, [{"name": inc_secret}])
+class StorageClassConfig(object):
+ ''' Handle service options '''
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ name,
+ provisioner,
+ parameters=None,
+ annotations=None,
+ default_storage_class="false",
+ api_version='v1',
+ kubeconfig='/etc/origin/master/admin.kubeconfig'):
+ ''' constructor for handling storageclass options '''
+ = name
+ self.parameters = parameters
+ self.annotations = annotations
+ self.provisioner = provisioner
+ self.api_version = api_version
+ self.default_storage_class = str(default_storage_class).lower()
+ self.kubeconfig = kubeconfig
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' instantiates a storageclass dict '''
+['apiVersion'] = self.api_version
+['kind'] = 'StorageClass'
+['metadata'] = {}
+['metadata']['name'] =
+['metadata']['annotations'] = {}
+ if self.annotations is not None:
+['metadata']['annotations'] = self.annotations
+['metadata']['annotations'][''] = \
+ self.default_storage_class
+['provisioner'] = self.provisioner
+['parameters'] = {}
+ if self.parameters is not None:
+ # default to aws if no params were passed
+ else:
+['parameters']['type'] = 'gp2'
+# pylint: disable=too-many-instance-attributes,too-many-public-methods
+class StorageClass(Yedit):
+ ''' Class to model the oc storageclass object '''
+ annotations_path = "metadata.annotations"
+ provisioner_path = "provisioner"
+ parameters_path = "parameters"
+ kind = 'StorageClass'
+ def __init__(self, content):
+ '''StorageClass constructor'''
+ super(StorageClass, self).__init__(content=content)
+ def get_annotations(self):
+ ''' get a list of ports '''
+ return self.get(StorageClass.annotations_path) or {}
+ def get_parameters(self):
+ ''' get the service selector'''
+ return self.get(StorageClass.parameters_path) or {}
+ ''' Handle user options '''
+ def __init__(self,
+ kubeconfig,
+ username,
+ full_name):
+ ''' constructor for handling user options '''
+ self.kubeconfig = kubeconfig
+ self.username = username
+ self.full_name = full_name
+ = {}
+ self.create_dict()
+ def create_dict(self):
+ ''' return a user as a dict '''
+['apiVersion'] = 'v1'
+['fullName'] = self.full_name
+['groups'] = None
+['identities'] = None
+['kind'] = 'User'
+['metadata'] = {}
+['metadata']['name'] = self.username
+# pylint: disable=too-many-instance-attributes
+class User(Yedit):
+ ''' Class to wrap the oc command line tools '''
+ kind = 'user'
+ def __init__(self, content):
+ '''User constructor'''
+ super(User, self).__init__(content=content)
+ ''' Class to represent an openshift volume object'''
+ volume_mounts_path = {"pod": "spec.containers[0].volumeMounts",
+ "dc": "spec.template.spec.containers[0].volumeMounts",
+ "rc": "spec.template.spec.containers[0].volumeMounts",
+ }
+ volumes_path = {"pod": "spec.volumes",
+ "dc": "spec.template.spec.volumes",
+ "rc": "spec.template.spec.volumes",
+ }
+ @staticmethod
+ def create_volume_structure(volume_info):
+ ''' return a properly structured volume '''
+ volume_mount = None
+ volume = {'name': volume_info['name']}
+ volume_type = volume_info['type'].lower()
+ if volume_type == 'secret':
+ volume['secret'] = {}
+ volume[volume_info['type']] = {'secretName': volume_info['secret_name']}
+ volume_mount = {'mountPath': volume_info['path'],
+ 'name': volume_info['name']}
+ elif volume_type == 'emptydir':
+ volume['emptyDir'] = {}
+ volume_mount = {'mountPath': volume_info['path'],
+ 'name': volume_info['name']}
+ elif volume_type == 'pvc' or volume_type == 'persistentvolumeclaim':
+ volume['persistentVolumeClaim'] = {}
+ volume['persistentVolumeClaim']['claimName'] = volume_info['claimName']
+ volume['persistentVolumeClaim']['claimSize'] = volume_info['claimSize']
+ elif volume_type == 'hostpath':
+ volume['hostPath'] = {}
+ volume['hostPath']['path'] = volume_info['path']
+ elif volume_type == 'configmap':
+ volume['configMap'] = {}
+ volume['configMap']['name'] = volume_info['configmap_name']
+ volume_mount = {'mountPath': volume_info['path'],
+ 'name': volume_info['name']}
+ return (volume, volume_mount)