From 210fc2d3849a1baf9c1d8535044d92df23424274 Mon Sep 17 00:00:00 2001 From: Luke Meyer Date: Fri, 16 Jun 2017 17:24:01 -0400 Subject: openshift_checks: refactor to internalize task_vars Move task_vars into instance variable so we don't have to pass it around everywhere. Also store tmp. Make sure both are filled in on execute_module. In the process, is_active became an instance method, and task_vars is basically never used directly outside of test code. --- .../openshift_checks/logging/curator.py | 7 +--- .../openshift_checks/logging/elasticsearch.py | 43 ++++++++++------------ .../openshift_checks/logging/fluentd.py | 42 ++++++++++----------- .../openshift_checks/logging/kibana.py | 37 +++++++++---------- .../openshift_checks/logging/logging.py | 33 +++++++---------- .../openshift_checks/logging/logging_index_time.py | 28 +++++++------- 6 files changed, 85 insertions(+), 105 deletions(-) (limited to 'roles/openshift_health_checker/openshift_checks/logging') diff --git a/roles/openshift_health_checker/openshift_checks/logging/curator.py b/roles/openshift_health_checker/openshift_checks/logging/curator.py index 7e932cf98..f82ae64d7 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/curator.py +++ b/roles/openshift_health_checker/openshift_checks/logging/curator.py @@ -1,6 +1,5 @@ """Check for an aggregated logging Curator deployment""" -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck @@ -12,13 +11,11 @@ class Curator(LoggingCheck): logging_namespace = None - def run(self, tmp, task_vars): - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + def run(self): + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") curator_pods, error = super(Curator, self).get_pods_for_component( - self.execute_module, self.logging_namespace, "curator", - task_vars ) if error: return {"failed": True, "changed": False, "msg": error} diff --git a/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py b/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py index d57cb9376..1e478c04d 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py +++ b/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py @@ -3,7 +3,6 @@ import json import re -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck @@ -15,19 +14,17 @@ class Elasticsearch(LoggingCheck): logging_namespace = None - def run(self, tmp, task_vars): + def run(self): """Check various things and gather errors. Returns: result as hash""" - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") es_pods, error = super(Elasticsearch, self).get_pods_for_component( - self.execute_module, self.logging_namespace, "es", - task_vars, ) if error: return {"failed": True, "changed": False, "msg": error} - check_error = self.check_elasticsearch(es_pods, task_vars) + check_error = self.check_elasticsearch(es_pods) if check_error: msg = ("The following Elasticsearch deployment issue was found:" @@ -52,7 +49,7 @@ class Elasticsearch(LoggingCheck): ))] return not_running, [] - def check_elasticsearch(self, es_pods, task_vars): + def check_elasticsearch(self, es_pods): """Various checks for elasticsearch. Returns: error string""" not_running_pods, error_msgs = self._not_running_elasticsearch_pods(es_pods) running_pods = [pod for pod in es_pods if pod not in not_running_pods] @@ -63,10 +60,10 @@ class Elasticsearch(LoggingCheck): } if not pods_by_name: return 'No logging Elasticsearch pods were found. Is logging deployed?' - error_msgs += self._check_elasticsearch_masters(pods_by_name, task_vars) - error_msgs += self._check_elasticsearch_node_list(pods_by_name, task_vars) - error_msgs += self._check_es_cluster_health(pods_by_name, task_vars) - error_msgs += self._check_elasticsearch_diskspace(pods_by_name, task_vars) + error_msgs += self._check_elasticsearch_masters(pods_by_name) + error_msgs += self._check_elasticsearch_node_list(pods_by_name) + error_msgs += self._check_es_cluster_health(pods_by_name) + error_msgs += self._check_elasticsearch_diskspace(pods_by_name) return '\n'.join(error_msgs) @staticmethod @@ -74,14 +71,14 @@ class Elasticsearch(LoggingCheck): base = "exec {name} -- curl -s --cert {base}cert --key {base}key --cacert {base}ca -XGET '{url}'" return base.format(base="/etc/elasticsearch/secret/admin-", name=pod_name, url=url) - def _check_elasticsearch_masters(self, pods_by_name, task_vars): + def _check_elasticsearch_masters(self, pods_by_name): """Check that Elasticsearch masters are sane. Returns: list of error strings""" es_master_names = set() error_msgs = [] for pod_name in pods_by_name.keys(): # Compare what each ES node reports as master and compare for split brain get_master_cmd = self._build_es_curl_cmd(pod_name, "https://localhost:9200/_cat/master") - master_name_str = self._exec_oc(get_master_cmd, [], task_vars) + master_name_str = self._exec_oc(get_master_cmd, []) master_names = (master_name_str or '').split(' ') if len(master_names) > 1: es_master_names.add(master_names[1]) @@ -106,7 +103,7 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _check_elasticsearch_node_list(self, pods_by_name, task_vars): + def _check_elasticsearch_node_list(self, pods_by_name): """Check that reported ES masters are accounted for by pods. Returns: list of error strings""" if not pods_by_name: @@ -114,7 +111,7 @@ class Elasticsearch(LoggingCheck): # get ES cluster nodes node_cmd = self._build_es_curl_cmd(list(pods_by_name.keys())[0], 'https://localhost:9200/_nodes') - cluster_node_data = self._exec_oc(node_cmd, [], task_vars) + cluster_node_data = self._exec_oc(node_cmd, []) try: cluster_nodes = json.loads(cluster_node_data)['nodes'] except (ValueError, KeyError): @@ -136,12 +133,12 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _check_es_cluster_health(self, pods_by_name, task_vars): + def _check_es_cluster_health(self, pods_by_name): """Exec into the elasticsearch pods and check the cluster health. Returns: list of errors""" error_msgs = [] for pod_name in pods_by_name.keys(): cluster_health_cmd = self._build_es_curl_cmd(pod_name, 'https://localhost:9200/_cluster/health?pretty=true') - cluster_health_data = self._exec_oc(cluster_health_cmd, [], task_vars) + cluster_health_data = self._exec_oc(cluster_health_cmd, []) try: health_res = json.loads(cluster_health_data) if not health_res or not health_res.get('status'): @@ -160,7 +157,7 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _check_elasticsearch_diskspace(self, pods_by_name, task_vars): + def _check_elasticsearch_diskspace(self, pods_by_name): """ Exec into an ES pod and query the diskspace on the persistent volume. Returns: list of errors @@ -168,7 +165,7 @@ class Elasticsearch(LoggingCheck): error_msgs = [] for pod_name in pods_by_name.keys(): df_cmd = 'exec {} -- df --output=ipcent,pcent /elasticsearch/persistent'.format(pod_name) - disk_output = self._exec_oc(df_cmd, [], task_vars) + disk_output = self._exec_oc(df_cmd, []) lines = disk_output.splitlines() # expecting one header looking like 'IUse% Use%' and one body line body_re = r'\s*(\d+)%?\s+(\d+)%?\s*$' @@ -180,7 +177,7 @@ class Elasticsearch(LoggingCheck): continue inode_pct, disk_pct = re.match(body_re, lines[1]).groups() - inode_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_inode_pct', default='90') + inode_pct_thresh = self.get_var('openshift_check_efk_es_inode_pct', default='90') if int(inode_pct) >= int(inode_pct_thresh): error_msgs.append( 'Inode percent usage on the storage volume for logging ES pod "{pod}"\n' @@ -191,7 +188,7 @@ class Elasticsearch(LoggingCheck): limit=str(inode_pct_thresh), param='openshift_check_efk_es_inode_pct', )) - disk_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_storage_pct', default='80') + disk_pct_thresh = self.get_var('openshift_check_efk_es_storage_pct', default='80') if int(disk_pct) >= int(disk_pct_thresh): error_msgs.append( 'Disk percent usage on the storage volume for logging ES pod "{pod}"\n' @@ -205,11 +202,9 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _exec_oc(self, cmd_str, extra_args, task_vars): + def _exec_oc(self, cmd_str, extra_args): return super(Elasticsearch, self).exec_oc( - self.execute_module, self.logging_namespace, cmd_str, extra_args, - task_vars, ) diff --git a/roles/openshift_health_checker/openshift_checks/logging/fluentd.py b/roles/openshift_health_checker/openshift_checks/logging/fluentd.py index 1e1e7f2bd..063e707a9 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/fluentd.py +++ b/roles/openshift_health_checker/openshift_checks/logging/fluentd.py @@ -2,7 +2,6 @@ import json -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck @@ -14,19 +13,17 @@ class Fluentd(LoggingCheck): logging_namespace = None - def run(self, tmp, task_vars): + def run(self): """Check various things and gather errors. Returns: result as hash""" - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") fluentd_pods, error = super(Fluentd, self).get_pods_for_component( - self.execute_module, self.logging_namespace, "fluentd", - task_vars, ) if error: return {"failed": True, "changed": False, "msg": error} - check_error = self.check_fluentd(fluentd_pods, task_vars) + check_error = self.check_fluentd(fluentd_pods) if check_error: msg = ("The following Fluentd deployment issue was found:" @@ -52,10 +49,9 @@ class Fluentd(LoggingCheck): ).format(label=node_selector) return fluentd_nodes, None - @staticmethod - def _check_node_labeling(nodes_by_name, fluentd_nodes, node_selector, task_vars): + def _check_node_labeling(self, nodes_by_name, fluentd_nodes, node_selector): """Note if nodes are not labeled as expected. Returns: error string""" - intended_nodes = get_var(task_vars, 'openshift_logging_fluentd_hosts', default=['--all']) + intended_nodes = self.get_var('openshift_logging_fluentd_hosts', default=['--all']) if not intended_nodes or '--all' in intended_nodes: intended_nodes = nodes_by_name.keys() nodes_missing_labels = set(intended_nodes) - set(fluentd_nodes.keys()) @@ -113,13 +109,15 @@ class Fluentd(LoggingCheck): )) return None - def check_fluentd(self, pods, task_vars): + def check_fluentd(self, pods): """Verify fluentd is running everywhere. Returns: error string""" - node_selector = get_var(task_vars, 'openshift_logging_fluentd_nodeselector', - default='logging-infra-fluentd=true') + node_selector = self.get_var( + 'openshift_logging_fluentd_nodeselector', + default='logging-infra-fluentd=true' + ) - nodes_by_name, error = self.get_nodes_by_name(task_vars) + nodes_by_name, error = self.get_nodes_by_name() if error: return error @@ -128,7 +126,7 @@ class Fluentd(LoggingCheck): return error error_msgs = [] - error = self._check_node_labeling(nodes_by_name, fluentd_nodes, node_selector, task_vars) + error = self._check_node_labeling(nodes_by_name, fluentd_nodes, node_selector) if error: error_msgs.append(error) error = self._check_nodes_have_fluentd(pods, fluentd_nodes) @@ -147,9 +145,9 @@ class Fluentd(LoggingCheck): return '\n'.join(error_msgs) - def get_nodes_by_name(self, task_vars): + def get_nodes_by_name(self): """Retrieve all the node definitions. Returns: dict(name: node), error string""" - nodes_json = self._exec_oc("get nodes -o json", [], task_vars) + nodes_json = self._exec_oc("get nodes -o json", []) try: nodes = json.loads(nodes_json) except ValueError: # no valid json - should not happen @@ -161,9 +159,9 @@ class Fluentd(LoggingCheck): for node in nodes['items'] }, None - def _exec_oc(self, cmd_str, extra_args, task_vars): - return super(Fluentd, self).exec_oc(self.execute_module, - self.logging_namespace, - cmd_str, - extra_args, - task_vars) + def _exec_oc(self, cmd_str, extra_args): + return super(Fluentd, self).exec_oc( + self.logging_namespace, + cmd_str, + extra_args, + ) diff --git a/roles/openshift_health_checker/openshift_checks/logging/kibana.py b/roles/openshift_health_checker/openshift_checks/logging/kibana.py index 551e8dfa0..60f94e106 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/kibana.py +++ b/roles/openshift_health_checker/openshift_checks/logging/kibana.py @@ -12,7 +12,6 @@ except ImportError: from urllib.error import HTTPError, URLError import urllib.request as urllib2 -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck @@ -24,22 +23,20 @@ class Kibana(LoggingCheck): logging_namespace = None - def run(self, tmp, task_vars): + def run(self): """Check various things and gather errors. Returns: result as hash""" - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") kibana_pods, error = super(Kibana, self).get_pods_for_component( - self.execute_module, self.logging_namespace, "kibana", - task_vars, ) if error: return {"failed": True, "changed": False, "msg": error} check_error = self.check_kibana(kibana_pods) if not check_error: - check_error = self._check_kibana_route(task_vars) + check_error = self._check_kibana_route() if check_error: msg = ("The following Kibana deployment issue was found:" @@ -50,7 +47,7 @@ class Kibana(LoggingCheck): # TODO(lmeyer): run it all again for the ops cluster return {"failed": False, "changed": False, "msg": 'No problems found with Kibana deployment.'} - def _verify_url_internal(self, url, task_vars): + def _verify_url_internal(self, url): """ Try to reach a URL from the host. Returns: success (bool), reason (for failure) @@ -62,7 +59,7 @@ class Kibana(LoggingCheck): # TODO(lmeyer): give users option to validate certs status_code=302, ) - result = self.execute_module('uri', args, None, task_vars) + result = self.execute_module('uri', args) if result.get('failed'): return result['msg'] return None @@ -114,14 +111,14 @@ class Kibana(LoggingCheck): return None - def _get_kibana_url(self, task_vars): + def _get_kibana_url(self): """ Get kibana route or report error. Returns: url (or empty), reason for failure """ # Get logging url - get_route = self._exec_oc("get route logging-kibana -o json", [], task_vars) + get_route = self._exec_oc("get route logging-kibana -o json", []) if not get_route: return None, 'no_route_exists' @@ -139,7 +136,7 @@ class Kibana(LoggingCheck): return 'https://{}/'.format(host), None - def _check_kibana_route(self, task_vars): + def _check_kibana_route(self): """ Check to see if kibana route is up and working. Returns: error string @@ -160,12 +157,12 @@ class Kibana(LoggingCheck): ), ) - kibana_url, error = self._get_kibana_url(task_vars) + kibana_url, error = self._get_kibana_url() if not kibana_url: return known_errors.get(error, error) # first, check that kibana is reachable from the master. - error = self._verify_url_internal(kibana_url, task_vars) + error = self._verify_url_internal(kibana_url) if error: if 'urlopen error [Errno 111] Connection refused' in error: error = ( @@ -190,7 +187,7 @@ class Kibana(LoggingCheck): # in production we would like the kibana route to work from outside the # cluster too; but that may not be the case, so allow disabling just this part. - if not get_var(task_vars, "openshift_check_efk_kibana_external", default=True): + if not self.get_var("openshift_check_efk_kibana_external", default=True): return None error = self._verify_url_external(kibana_url) if error: @@ -221,9 +218,9 @@ class Kibana(LoggingCheck): return error return None - def _exec_oc(self, cmd_str, extra_args, task_vars): - return super(Kibana, self).exec_oc(self.execute_module, - self.logging_namespace, - cmd_str, - extra_args, - task_vars) + def _exec_oc(self, cmd_str, extra_args): + return super(Kibana, self).exec_oc( + self.logging_namespace, + cmd_str, + extra_args, + ) diff --git a/roles/openshift_health_checker/openshift_checks/logging/logging.py b/roles/openshift_health_checker/openshift_checks/logging/logging.py index 46fd4f5c7..a48e1c728 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/logging.py +++ b/roles/openshift_health_checker/openshift_checks/logging/logging.py @@ -5,7 +5,7 @@ Util functions for performing checks on an Elasticsearch, Fluentd, and Kibana st import json import os -from openshift_checks import OpenShiftCheck, OpenShiftCheckException, get_var +from openshift_checks import OpenShiftCheck, OpenShiftCheckException class LoggingCheck(OpenShiftCheck): @@ -14,31 +14,27 @@ class LoggingCheck(OpenShiftCheck): name = "logging" logging_namespace = "logging" - @classmethod - def is_active(cls, task_vars): - logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=False) - return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars) and logging_deployed + def is_active(self): + logging_deployed = self.get_var("openshift_hosted_logging_deploy", default=False) + return logging_deployed and super(LoggingCheck, self).is_active() and self.is_first_master() - @staticmethod - def is_first_master(task_vars): - """Run only on first master. Returns: bool""" + def is_first_master(self): + """Determine if running on first master. Returns: bool""" # Note: It would be nice to use membership in oo_first_master group, however for now it # seems best to avoid requiring that setup and just check this is the first master. - hostname = get_var(task_vars, "ansible_ssh_host") or [None] - masters = get_var(task_vars, "groups", "masters", default=None) or [None] - return masters and masters[0] == hostname + hostname = self.get_var("ansible_ssh_host") or [None] + masters = self.get_var("groups", "masters", default=None) or [None] + return masters[0] == hostname - def run(self, tmp, task_vars): + def run(self): pass - def get_pods_for_component(self, execute_module, namespace, logging_component, task_vars): + def get_pods_for_component(self, namespace, logging_component): """Get all pods for a given component. Returns: list of pods for component, error string""" pod_output = self.exec_oc( - execute_module, namespace, "get pods -l component={} -o json".format(logging_component), [], - task_vars ) try: pods = json.loads(pod_output) @@ -64,14 +60,13 @@ class LoggingCheck(OpenShiftCheck): ) ] - @staticmethod - def exec_oc(execute_module=None, namespace="logging", cmd_str="", extra_args=None, task_vars=None): + def exec_oc(self, namespace="logging", cmd_str="", extra_args=None): """ Execute an 'oc' command in the remote host. Returns: output of command and namespace, or raises OpenShiftCheckException on error """ - config_base = get_var(task_vars, "openshift", "common", "config_base") + config_base = self.get_var("openshift", "common", "config_base") args = { "namespace": namespace, "config_file": os.path.join(config_base, "master", "admin.kubeconfig"), @@ -79,7 +74,7 @@ class LoggingCheck(OpenShiftCheck): "extra_args": list(extra_args) if extra_args else [], } - result = execute_module("ocutil", args, None, task_vars) + result = self.execute_module("ocutil", args) if result.get("failed"): msg = ( 'Unexpected error using `oc` to validate the logging stack components.\n' diff --git a/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py b/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py index 2ddd7549d..b24e88e05 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py +++ b/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py @@ -7,7 +7,7 @@ import time from uuid import uuid4 -from openshift_checks import get_var, OpenShiftCheckException +from openshift_checks import OpenShiftCheckException from openshift_checks.logging.logging import LoggingCheck @@ -21,11 +21,11 @@ class LoggingIndexTime(LoggingCheck): logging_namespace = "logging" - def run(self, tmp, task_vars): + def run(self): """Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs.""" try: log_index_timeout = int( - get_var(task_vars, "openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS) + self.get_var("openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS) ) except ValueError: return { @@ -37,11 +37,9 @@ class LoggingIndexTime(LoggingCheck): running_component_pods = dict() # get all component pods - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default=self.logging_namespace) + self.logging_namespace = self.get_var("openshift_logging_namespace", default=self.logging_namespace) for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']): - pods, error = self.get_pods_for_component( - self.execute_module, self.logging_namespace, component, task_vars, - ) + pods, error = self.get_pods_for_component(self.logging_namespace, component) if error: msg = 'Unable to retrieve pods for the {} logging component: {}' @@ -56,29 +54,29 @@ class LoggingIndexTime(LoggingCheck): running_component_pods[component] = running_pods - uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0], task_vars) - self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout, task_vars) + uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0]) + self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout) return {} - def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs, task_vars): + def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs): """Retry an Elasticsearch query every second until query success, or a defined length of time has passed.""" deadline = time.time() + timeout_secs interval = 1 - while not self.query_es_from_es(es_pod, uuid, task_vars): + while not self.query_es_from_es(es_pod, uuid): if time.time() + interval > deadline: msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s." raise OpenShiftCheckException(msg.format(uuid, timeout_secs)) time.sleep(interval) - def curl_kibana_with_uuid(self, kibana_pod, task_vars): + def curl_kibana_with_uuid(self, kibana_pod): """curl Kibana with a unique uuid.""" uuid = self.generate_uuid() pod_name = kibana_pod["metadata"]["name"] exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}" exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid) - error_str = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars) + error_str = self.exec_oc(self.logging_namespace, exec_cmd, []) try: error_code = json.loads(error_str)["statusCode"] @@ -97,7 +95,7 @@ class LoggingIndexTime(LoggingCheck): return uuid - def query_es_from_es(self, es_pod, uuid, task_vars): + def query_es_from_es(self, es_pod, uuid): """curl the Elasticsearch pod and look for a unique uuid in its logs.""" pod_name = es_pod["metadata"]["name"] exec_cmd = ( @@ -108,7 +106,7 @@ class LoggingIndexTime(LoggingCheck): "https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}" ) exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid) - result = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars) + result = self.exec_oc(self.logging_namespace, exec_cmd, []) try: count = json.loads(result)["count"] -- cgit v1.2.3