#!/usr/bin/env python """ Pre-upgrade checks that must be run on a master before proceeding with upgrade. """ # This is a script not a python module: # pylint: disable=invalid-name # NOTE: This script should not require any python libs other than what is # in the standard library. __license__ = "ASL 2.0" import json import os import subprocess import re # The maximum length of container.ports.name ALLOWED_LENGTH = 15 # The valid structure of container.ports.name ALLOWED_CHARS = re.compile('^[a-z0-9][a-z0-9\\-]*[a-z0-9]$') AT_LEAST_ONE_LETTER = re.compile('[a-z]') # look at OS_PATH for the full path. Default ot 'oc' OC_PATH = os.getenv('OC_PATH', 'oc') def validate(value): """ validate verifies that value matches required conventions Rules of container.ports.name validation: * must be less that 16 chars * at least one letter * only a-z0-9- * hyphens can not be leading or trailing or next to each other :Parameters: - `value`: Value to validate """ if len(value) > ALLOWED_LENGTH: return False if '--' in value: return False # We search since it can be anywhere if not AT_LEAST_ONE_LETTER.search(value): return False # We match because it must start at the beginning if not ALLOWED_CHARS.match(value): return False return True def list_items(kind): """ list_items returns a list of items from the api :Parameters: - `kind`: Kind of item to access """ response = subprocess.check_output([OC_PATH, 'get', '--all-namespaces', '-o', 'json', kind]) items = json.loads(response) return items.get("items", []) def get(obj, *paths): """ Gets an object :Parameters: - `obj`: A dictionary structure - `path`: All other non-keyword arguments """ ret_obj = obj for path in paths: if ret_obj.get(path, None) is None: return [] ret_obj = ret_obj[path] return ret_obj # pylint: disable=too-many-arguments def pretty_print_errors(namespace, kind, item_name, container_name, invalid_label, port_name, valid): """ Prints out results in human friendly way. :Parameters: - `namespace`: Namespace of the resource - `kind`: Kind of the resource - `item_name`: Name of the resource - `container_name`: Name of the container. May be "" when kind=Service. - `port_name`: Name of the port - `invalid_label`: The label of the invalid port. Port.name/targetPort - `valid`: True if the port is valid """ if not valid: if len(container_name) > 0: print('%s/%s -n %s (Container="%s" %s="%s")' % ( kind, item_name, namespace, container_name, invalid_label, port_name)) else: print('%s/%s -n %s (%s="%s")' % ( kind, item_name, namespace, invalid_label, port_name)) def print_validation_header(): """ Prints the error header. Should run on the first error to avoid overwhelming the user. """ print """\ At least one port name does not validate. Valid port names: * must be less that 16 chars * have at least one letter * only a-z0-9- * do not start or end with - * Dashes may not be next to eachother ('--') """ def main(): """ main is the main entry point to this script """ try: # the comma at the end suppresses the newline print "Checking for oc ...", subprocess.check_output([OC_PATH, 'whoami']) print "found" except: print( 'Unable to run "%s whoami"\n' 'Please ensure OpenShift is running, and "oc" is on your system ' 'path.\n' 'You can override the path with the OC_PATH environment variable.' % OC_PATH) raise SystemExit(1) # Where the magic happens first_error = True for kind, path in [ ('replicationcontrollers', ("spec", "template", "spec", "containers")), ('pods', ("spec", "containers")), ('deploymentconfigs', ("spec", "template", "spec", "containers"))]: for item in list_items(kind): namespace = item["metadata"]["namespace"] item_name = item["metadata"]["name"] for container in get(item, *path): container_name = container["name"] for port in get(container, "ports"): port_name = port.get("name", None) if not port_name: # Unnamed ports are OK continue valid = validate(port_name) if not valid and first_error: first_error = False print_validation_header() pretty_print_errors( namespace, kind, item_name, container_name, "Port.name", port_name, valid) # Services follow a different flow for item in list_items('services'): namespace = item["metadata"]["namespace"] item_name = item["metadata"]["name"] for port in get(item, "spec", "ports"): port_name = port.get("targetPort", None) if isinstance(port_name, int) or port_name is None: # Integer only or unnamed ports are OK continue valid = validate(port_name) if not valid and first_error: first_error = False print_validation_header() pretty_print_errors( namespace, "services", item_name, "", "targetPort", port_name, valid) # If we had at least 1 error then exit with 1 if not first_error: raise SystemExit(1) if __name__ == '__main__': main()