#!/usr/bin/env python """ Pre-upgrade checks that must be run on a master before proceeding with upgrade. """ # This is a script not a python module: # pylint: disable=invalid-name # NOTE: This script should not require any python libs other than what is # in the standard library. __license__ = "ASL 2.0" import json import os import subprocess import re # The maximum length of container.ports.name ALLOWED_LENGTH = 15 # The valid structure of container.ports.name ALLOWED_CHARS = re.compile('^[a-z0-9][a-z0-9\\-]*[a-z0-9]$') AT_LEAST_ONE_LETTER = re.compile('[a-z]') # look at OS_PATH for the full path. Default ot 'oc' OC_PATH = os.getenv('OC_PATH', 'oc') def validate(value): """ validate verifies that value matches required conventions Rules of container.ports.name validation: * must be less that 16 chars * at least one letter * only a-z0-9- * hyphens can not be leading or trailing or next to each other :Parameters: - `value`: Value to validate """ if len(value) > ALLOWED_LENGTH: return False if '--' in value: return False # We search since it can be anywhere if not AT_LEAST_ONE_LETTER.search(value): return False # We match because it must start at the beginning if not ALLOWED_CHARS.match(value): return False return True def list_items(kind): """ list_items returns a list of items from the api :Parameters: - `kind`: Kind of item to access """ response = subprocess.check_output([OC_PATH, 'get', '--all-namespaces', '-o', 'json', kind]) items = json.loads(response) return items.get("items", []) def get(obj, *paths): """ Gets an object :Parameters: - `obj`: A dictionary structure - `path`: All other non-keyword arguments """ ret_obj = obj for path in paths: if ret_obj.get(path, None) is None: return [] ret_obj = ret_obj[path] return ret_obj # pylint: disable=too-many-arguments def pretty_print_errors(namespace, kind, item_name, container_name, invalid_label, port_name, valid): """ Prints out results in human friendly way. :Parameters: - `namespace`: Namespace of the resource - `kind`: Kind of the resource - `item_name`: Name of the resource - `container_name`: Name of the container. May be "" when kind=Service. - `port_name`: Name of the port - `invalid_label`: The label of the invalid port. Port.name/targetPort - `valid`: True if the port is valid """ if not valid: if len(container_name) > 0: print('%s/%s -n %s (Container="%s" %s="%s")' % ( kind, item_name, namespace, container_name, invalid_label, port_name)) else: print('%s/%s -n %s (%s="%s")' % ( kind, item_name, namespace, invalid_label, port_name)) def print_validation_header(): """ Prints the error header. Should run on the first error to avoid overwhelming the user. """ print """\ At least one port name is invalid and must be corrected before upgrading. Please update or remove any resources with invalid port names. Valid port names must: * be less that 16 characters * have at least one letter * contain only a-z0-9- * not start or end with - * not contain dashes next to each other ('--') """ def main(): """ main is the main entry point to this script """ try: # the comma at the end suppresses the newline print "Checking for oc ...", subprocess.check_output([OC_PATH, 'whoami']) print "found" except: print( 'Unable to run "%s whoami"\n' 'Please ensure OpenShift is running, and "oc" is on your system ' 'path.\n' 'You can override the path with the OC_PATH environment variable.' % OC_PATH) raise SystemExit(1) # Where the magic happens first_error = True for kind, path in [ ('deploymentconfigs', ("spec", "template", "spec", "containers")), ('replicationcontrollers', ("spec", "template", "spec", "containers")), ('pods', ("spec", "containers"))]: for item in list_items(kind): namespace = item["metadata"]["namespace"] item_name = item["metadata"]["name"] for container in get(item, *path): container_name = container["name"] for port in get(container, "ports"): port_name = port.get("name", None) if not port_name: # Unnamed ports are OK continue valid = validate(port_name) if not valid and first_error: first_error = False print_validation_header() pretty_print_errors( namespace, kind, item_name, container_name, "Port.name", port_name, valid) # Services follow a different flow for item in list_items('services'): namespace = item["metadata"]["namespace"] item_name = item["metadata"]["name"] for port in get(item, "spec", "ports"): port_name = port.get("targetPort", None) if isinstance(port_name, int) or port_name is None: # Integer only or unnamed ports are OK continue valid = validate(port_name) if not valid and first_error: first_error = False print_validation_header() pretty_print_errors( namespace, "services", item_name, "", "targetPort", port_name, valid) # If we had at least 1 error then exit with 1 if not first_error: raise SystemExit(1) if __name__ == '__main__': main()