diff options
Diffstat (limited to 'roles/openshift_health_checker/openshift_checks/logging')
6 files changed, 219 insertions, 110 deletions
diff --git a/roles/openshift_health_checker/openshift_checks/logging/curator.py b/roles/openshift_health_checker/openshift_checks/logging/curator.py index c9fc59896..f82ae64d7 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/curator.py +++ b/roles/openshift_health_checker/openshift_checks/logging/curator.py @@ -1,28 +1,21 @@ -""" -Module for performing checks on an Curator logging deployment -""" +"""Check for an aggregated logging Curator deployment""" -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck class Curator(LoggingCheck): - """Module that checks an integrated logging Curator deployment""" + """Check for an aggregated logging Curator deployment""" name = "curator" tags = ["health", "logging"] logging_namespace = None - def run(self, tmp, task_vars): - """Check various things and gather errors. Returns: result as hash""" - - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + def run(self): + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") curator_pods, error = super(Curator, self).get_pods_for_component( - self.module_executor, self.logging_namespace, "curator", - task_vars ) if error: return {"failed": True, "changed": False, "msg": error} diff --git a/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py b/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py index 01cb35b81..1e478c04d 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py +++ b/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py @@ -1,35 +1,30 @@ -""" -Module for performing checks on an Elasticsearch logging deployment -""" +"""Check for an aggregated logging Elasticsearch deployment""" import json import re -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck class Elasticsearch(LoggingCheck): - """Module that checks an integrated logging Elasticsearch deployment""" + """Check for an aggregated logging Elasticsearch deployment""" name = "elasticsearch" tags = ["health", "logging"] logging_namespace = None - def run(self, tmp, task_vars): + def run(self): """Check various things and gather errors. Returns: result as hash""" - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") es_pods, error = super(Elasticsearch, self).get_pods_for_component( - self.execute_module, self.logging_namespace, "es", - task_vars, ) if error: return {"failed": True, "changed": False, "msg": error} - check_error = self.check_elasticsearch(es_pods, task_vars) + check_error = self.check_elasticsearch(es_pods) if check_error: msg = ("The following Elasticsearch deployment issue was found:" @@ -41,7 +36,7 @@ class Elasticsearch(LoggingCheck): return {"failed": False, "changed": False, "msg": 'No problems found with Elasticsearch deployment.'} def _not_running_elasticsearch_pods(self, es_pods): - """Returns: list of running pods, list of errors about non-running pods""" + """Returns: list of pods that are not running, list of errors about non-running pods""" not_running = super(Elasticsearch, self).not_running_pods(es_pods) if not_running: return not_running, [( @@ -54,7 +49,7 @@ class Elasticsearch(LoggingCheck): ))] return not_running, [] - def check_elasticsearch(self, es_pods, task_vars): + def check_elasticsearch(self, es_pods): """Various checks for elasticsearch. Returns: error string""" not_running_pods, error_msgs = self._not_running_elasticsearch_pods(es_pods) running_pods = [pod for pod in es_pods if pod not in not_running_pods] @@ -65,10 +60,10 @@ class Elasticsearch(LoggingCheck): } if not pods_by_name: return 'No logging Elasticsearch pods were found. Is logging deployed?' - error_msgs += self._check_elasticsearch_masters(pods_by_name, task_vars) - error_msgs += self._check_elasticsearch_node_list(pods_by_name, task_vars) - error_msgs += self._check_es_cluster_health(pods_by_name, task_vars) - error_msgs += self._check_elasticsearch_diskspace(pods_by_name, task_vars) + error_msgs += self._check_elasticsearch_masters(pods_by_name) + error_msgs += self._check_elasticsearch_node_list(pods_by_name) + error_msgs += self._check_es_cluster_health(pods_by_name) + error_msgs += self._check_elasticsearch_diskspace(pods_by_name) return '\n'.join(error_msgs) @staticmethod @@ -76,14 +71,14 @@ class Elasticsearch(LoggingCheck): base = "exec {name} -- curl -s --cert {base}cert --key {base}key --cacert {base}ca -XGET '{url}'" return base.format(base="/etc/elasticsearch/secret/admin-", name=pod_name, url=url) - def _check_elasticsearch_masters(self, pods_by_name, task_vars): + def _check_elasticsearch_masters(self, pods_by_name): """Check that Elasticsearch masters are sane. Returns: list of error strings""" es_master_names = set() error_msgs = [] for pod_name in pods_by_name.keys(): # Compare what each ES node reports as master and compare for split brain get_master_cmd = self._build_es_curl_cmd(pod_name, "https://localhost:9200/_cat/master") - master_name_str = self._exec_oc(get_master_cmd, [], task_vars) + master_name_str = self._exec_oc(get_master_cmd, []) master_names = (master_name_str or '').split(' ') if len(master_names) > 1: es_master_names.add(master_names[1]) @@ -108,7 +103,7 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _check_elasticsearch_node_list(self, pods_by_name, task_vars): + def _check_elasticsearch_node_list(self, pods_by_name): """Check that reported ES masters are accounted for by pods. Returns: list of error strings""" if not pods_by_name: @@ -116,7 +111,7 @@ class Elasticsearch(LoggingCheck): # get ES cluster nodes node_cmd = self._build_es_curl_cmd(list(pods_by_name.keys())[0], 'https://localhost:9200/_nodes') - cluster_node_data = self._exec_oc(node_cmd, [], task_vars) + cluster_node_data = self._exec_oc(node_cmd, []) try: cluster_nodes = json.loads(cluster_node_data)['nodes'] except (ValueError, KeyError): @@ -138,12 +133,12 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _check_es_cluster_health(self, pods_by_name, task_vars): + def _check_es_cluster_health(self, pods_by_name): """Exec into the elasticsearch pods and check the cluster health. Returns: list of errors""" error_msgs = [] for pod_name in pods_by_name.keys(): cluster_health_cmd = self._build_es_curl_cmd(pod_name, 'https://localhost:9200/_cluster/health?pretty=true') - cluster_health_data = self._exec_oc(cluster_health_cmd, [], task_vars) + cluster_health_data = self._exec_oc(cluster_health_cmd, []) try: health_res = json.loads(cluster_health_data) if not health_res or not health_res.get('status'): @@ -162,7 +157,7 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _check_elasticsearch_diskspace(self, pods_by_name, task_vars): + def _check_elasticsearch_diskspace(self, pods_by_name): """ Exec into an ES pod and query the diskspace on the persistent volume. Returns: list of errors @@ -170,7 +165,7 @@ class Elasticsearch(LoggingCheck): error_msgs = [] for pod_name in pods_by_name.keys(): df_cmd = 'exec {} -- df --output=ipcent,pcent /elasticsearch/persistent'.format(pod_name) - disk_output = self._exec_oc(df_cmd, [], task_vars) + disk_output = self._exec_oc(df_cmd, []) lines = disk_output.splitlines() # expecting one header looking like 'IUse% Use%' and one body line body_re = r'\s*(\d+)%?\s+(\d+)%?\s*$' @@ -182,7 +177,7 @@ class Elasticsearch(LoggingCheck): continue inode_pct, disk_pct = re.match(body_re, lines[1]).groups() - inode_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_inode_pct', default='90') + inode_pct_thresh = self.get_var('openshift_check_efk_es_inode_pct', default='90') if int(inode_pct) >= int(inode_pct_thresh): error_msgs.append( 'Inode percent usage on the storage volume for logging ES pod "{pod}"\n' @@ -193,7 +188,7 @@ class Elasticsearch(LoggingCheck): limit=str(inode_pct_thresh), param='openshift_check_efk_es_inode_pct', )) - disk_pct_thresh = get_var(task_vars, 'openshift_check_efk_es_storage_pct', default='80') + disk_pct_thresh = self.get_var('openshift_check_efk_es_storage_pct', default='80') if int(disk_pct) >= int(disk_pct_thresh): error_msgs.append( 'Disk percent usage on the storage volume for logging ES pod "{pod}"\n' @@ -207,11 +202,9 @@ class Elasticsearch(LoggingCheck): return error_msgs - def _exec_oc(self, cmd_str, extra_args, task_vars): + def _exec_oc(self, cmd_str, extra_args): return super(Elasticsearch, self).exec_oc( - self.execute_module, self.logging_namespace, cmd_str, extra_args, - task_vars, ) diff --git a/roles/openshift_health_checker/openshift_checks/logging/fluentd.py b/roles/openshift_health_checker/openshift_checks/logging/fluentd.py index 627567293..063e707a9 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/fluentd.py +++ b/roles/openshift_health_checker/openshift_checks/logging/fluentd.py @@ -1,33 +1,29 @@ -""" -Module for performing checks on an Fluentd logging deployment -""" +"""Check for an aggregated logging Fluentd deployment""" import json -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck class Fluentd(LoggingCheck): - """Module that checks an integrated logging Fluentd deployment""" + """Check for an aggregated logging Fluentd deployment""" + name = "fluentd" tags = ["health", "logging"] logging_namespace = None - def run(self, tmp, task_vars): + def run(self): """Check various things and gather errors. Returns: result as hash""" - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") fluentd_pods, error = super(Fluentd, self).get_pods_for_component( - self.execute_module, self.logging_namespace, "fluentd", - task_vars, ) if error: return {"failed": True, "changed": False, "msg": error} - check_error = self.check_fluentd(fluentd_pods, task_vars) + check_error = self.check_fluentd(fluentd_pods) if check_error: msg = ("The following Fluentd deployment issue was found:" @@ -53,10 +49,9 @@ class Fluentd(LoggingCheck): ).format(label=node_selector) return fluentd_nodes, None - @staticmethod - def _check_node_labeling(nodes_by_name, fluentd_nodes, node_selector, task_vars): + def _check_node_labeling(self, nodes_by_name, fluentd_nodes, node_selector): """Note if nodes are not labeled as expected. Returns: error string""" - intended_nodes = get_var(task_vars, 'openshift_logging_fluentd_hosts', default=['--all']) + intended_nodes = self.get_var('openshift_logging_fluentd_hosts', default=['--all']) if not intended_nodes or '--all' in intended_nodes: intended_nodes = nodes_by_name.keys() nodes_missing_labels = set(intended_nodes) - set(fluentd_nodes.keys()) @@ -114,13 +109,15 @@ class Fluentd(LoggingCheck): )) return None - def check_fluentd(self, pods, task_vars): + def check_fluentd(self, pods): """Verify fluentd is running everywhere. Returns: error string""" - node_selector = get_var(task_vars, 'openshift_logging_fluentd_nodeselector', - default='logging-infra-fluentd=true') + node_selector = self.get_var( + 'openshift_logging_fluentd_nodeselector', + default='logging-infra-fluentd=true' + ) - nodes_by_name, error = self.get_nodes_by_name(task_vars) + nodes_by_name, error = self.get_nodes_by_name() if error: return error @@ -129,7 +126,7 @@ class Fluentd(LoggingCheck): return error error_msgs = [] - error = self._check_node_labeling(nodes_by_name, fluentd_nodes, node_selector, task_vars) + error = self._check_node_labeling(nodes_by_name, fluentd_nodes, node_selector) if error: error_msgs.append(error) error = self._check_nodes_have_fluentd(pods, fluentd_nodes) @@ -148,9 +145,9 @@ class Fluentd(LoggingCheck): return '\n'.join(error_msgs) - def get_nodes_by_name(self, task_vars): + def get_nodes_by_name(self): """Retrieve all the node definitions. Returns: dict(name: node), error string""" - nodes_json = self._exec_oc("get nodes -o json", [], task_vars) + nodes_json = self._exec_oc("get nodes -o json", []) try: nodes = json.loads(nodes_json) except ValueError: # no valid json - should not happen @@ -162,9 +159,9 @@ class Fluentd(LoggingCheck): for node in nodes['items'] }, None - def _exec_oc(self, cmd_str, extra_args, task_vars): - return super(Fluentd, self).exec_oc(self.execute_module, - self.logging_namespace, - cmd_str, - extra_args, - task_vars) + def _exec_oc(self, cmd_str, extra_args): + return super(Fluentd, self).exec_oc( + self.logging_namespace, + cmd_str, + extra_args, + ) diff --git a/roles/openshift_health_checker/openshift_checks/logging/kibana.py b/roles/openshift_health_checker/openshift_checks/logging/kibana.py index 442f407b1..60f94e106 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/kibana.py +++ b/roles/openshift_health_checker/openshift_checks/logging/kibana.py @@ -12,7 +12,6 @@ except ImportError: from urllib.error import HTTPError, URLError import urllib.request as urllib2 -from openshift_checks import get_var from openshift_checks.logging.logging import LoggingCheck @@ -24,22 +23,20 @@ class Kibana(LoggingCheck): logging_namespace = None - def run(self, tmp, task_vars): + def run(self): """Check various things and gather errors. Returns: result as hash""" - self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default="logging") + self.logging_namespace = self.get_var("openshift_logging_namespace", default="logging") kibana_pods, error = super(Kibana, self).get_pods_for_component( - self.execute_module, self.logging_namespace, "kibana", - task_vars, ) if error: return {"failed": True, "changed": False, "msg": error} check_error = self.check_kibana(kibana_pods) if not check_error: - check_error = self._check_kibana_route(task_vars) + check_error = self._check_kibana_route() if check_error: msg = ("The following Kibana deployment issue was found:" @@ -50,7 +47,7 @@ class Kibana(LoggingCheck): # TODO(lmeyer): run it all again for the ops cluster return {"failed": False, "changed": False, "msg": 'No problems found with Kibana deployment.'} - def _verify_url_internal(self, url, task_vars): + def _verify_url_internal(self, url): """ Try to reach a URL from the host. Returns: success (bool), reason (for failure) @@ -62,7 +59,7 @@ class Kibana(LoggingCheck): # TODO(lmeyer): give users option to validate certs status_code=302, ) - result = self.execute_module('uri', args, task_vars) + result = self.execute_module('uri', args) if result.get('failed'): return result['msg'] return None @@ -114,14 +111,14 @@ class Kibana(LoggingCheck): return None - def _get_kibana_url(self, task_vars): + def _get_kibana_url(self): """ Get kibana route or report error. Returns: url (or empty), reason for failure """ # Get logging url - get_route = self._exec_oc("get route logging-kibana -o json", [], task_vars) + get_route = self._exec_oc("get route logging-kibana -o json", []) if not get_route: return None, 'no_route_exists' @@ -139,7 +136,7 @@ class Kibana(LoggingCheck): return 'https://{}/'.format(host), None - def _check_kibana_route(self, task_vars): + def _check_kibana_route(self): """ Check to see if kibana route is up and working. Returns: error string @@ -160,12 +157,12 @@ class Kibana(LoggingCheck): ), ) - kibana_url, error = self._get_kibana_url(task_vars) + kibana_url, error = self._get_kibana_url() if not kibana_url: return known_errors.get(error, error) # first, check that kibana is reachable from the master. - error = self._verify_url_internal(kibana_url, task_vars) + error = self._verify_url_internal(kibana_url) if error: if 'urlopen error [Errno 111] Connection refused' in error: error = ( @@ -190,7 +187,7 @@ class Kibana(LoggingCheck): # in production we would like the kibana route to work from outside the # cluster too; but that may not be the case, so allow disabling just this part. - if not get_var(task_vars, "openshift_check_efk_kibana_external", default=True): + if not self.get_var("openshift_check_efk_kibana_external", default=True): return None error = self._verify_url_external(kibana_url) if error: @@ -221,9 +218,9 @@ class Kibana(LoggingCheck): return error return None - def _exec_oc(self, cmd_str, extra_args, task_vars): - return super(Kibana, self).exec_oc(self.execute_module, - self.logging_namespace, - cmd_str, - extra_args, - task_vars) + def _exec_oc(self, cmd_str, extra_args): + return super(Kibana, self).exec_oc( + self.logging_namespace, + cmd_str, + extra_args, + ) diff --git a/roles/openshift_health_checker/openshift_checks/logging/logging.py b/roles/openshift_health_checker/openshift_checks/logging/logging.py index 05b4d300c..43ba6c406 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/logging.py +++ b/roles/openshift_health_checker/openshift_checks/logging/logging.py @@ -5,39 +5,39 @@ Util functions for performing checks on an Elasticsearch, Fluentd, and Kibana st import json import os -from openshift_checks import OpenShiftCheck, OpenShiftCheckException, get_var +from openshift_checks import OpenShiftCheck, OpenShiftCheckException class LoggingCheck(OpenShiftCheck): - """Base class for logging component checks""" + """Base class for OpenShift aggregated logging component checks""" + + # FIXME: this should not be listed as a check, since it is not meant to be + # run by itself. name = "logging" + logging_namespace = "logging" - @classmethod - def is_active(cls, task_vars): - return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars) + def is_active(self): + logging_deployed = self.get_var("openshift_hosted_logging_deploy", default=False) + return logging_deployed and super(LoggingCheck, self).is_active() and self.is_first_master() - @staticmethod - def is_first_master(task_vars): - """Run only on first master and only when logging is configured. Returns: bool""" - logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=True) + def is_first_master(self): + """Determine if running on first master. Returns: bool""" # Note: It would be nice to use membership in oo_first_master group, however for now it # seems best to avoid requiring that setup and just check this is the first master. - hostname = get_var(task_vars, "ansible_ssh_host") or [None] - masters = get_var(task_vars, "groups", "masters", default=None) or [None] - return logging_deployed and masters[0] == hostname + hostname = self.get_var("ansible_ssh_host") or [None] + masters = self.get_var("groups", "masters", default=None) or [None] + return masters[0] == hostname - def run(self, tmp, task_vars): - pass + def run(self): + return {} - def get_pods_for_component(self, execute_module, namespace, logging_component, task_vars): + def get_pods_for_component(self, namespace, logging_component): """Get all pods for a given component. Returns: list of pods for component, error string""" pod_output = self.exec_oc( - execute_module, namespace, "get pods -l component={} -o json".format(logging_component), [], - task_vars ) try: pods = json.loads(pod_output) @@ -45,7 +45,7 @@ class LoggingCheck(OpenShiftCheck): raise ValueError() except ValueError: # successful run but non-parsing data generally means there were no pods in the namespace - return None, 'There are no pods in the {} namespace. Is logging deployed?'.format(namespace) + return None, 'No pods were found for the "{}" logging component.'.format(logging_component) return pods['items'], None @@ -54,23 +54,22 @@ class LoggingCheck(OpenShiftCheck): """Returns: list of pods not in a ready and running state""" return [ pod for pod in pods - if any( + if not pod.get("status", {}).get("containerStatuses") or any( container['ready'] is False for container in pod['status']['containerStatuses'] ) or not any( condition['type'] == 'Ready' and condition['status'] == 'True' - for condition in pod['status']['conditions'] + for condition in pod['status'].get('conditions', []) ) ] - @staticmethod - def exec_oc(execute_module=None, namespace="logging", cmd_str="", extra_args=None, task_vars=None): + def exec_oc(self, namespace="logging", cmd_str="", extra_args=None): """ Execute an 'oc' command in the remote host. Returns: output of command and namespace, or raises OpenShiftCheckException on error """ - config_base = get_var(task_vars, "openshift", "common", "config_base") + config_base = self.get_var("openshift", "common", "config_base") args = { "namespace": namespace, "config_file": os.path.join(config_base, "master", "admin.kubeconfig"), @@ -78,7 +77,7 @@ class LoggingCheck(OpenShiftCheck): "extra_args": list(extra_args) if extra_args else [], } - result = execute_module("ocutil", args, task_vars) + result = self.execute_module("ocutil", args) if result.get("failed"): msg = ( 'Unexpected error using `oc` to validate the logging stack components.\n' diff --git a/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py b/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py new file mode 100644 index 000000000..b24e88e05 --- /dev/null +++ b/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py @@ -0,0 +1,130 @@ +""" +Check for ensuring logs from pods can be queried in a reasonable amount of time. +""" + +import json +import time + +from uuid import uuid4 + +from openshift_checks import OpenShiftCheckException +from openshift_checks.logging.logging import LoggingCheck + + +ES_CMD_TIMEOUT_SECONDS = 30 + + +class LoggingIndexTime(LoggingCheck): + """Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time.""" + name = "logging_index_time" + tags = ["health", "logging"] + + logging_namespace = "logging" + + def run(self): + """Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs.""" + try: + log_index_timeout = int( + self.get_var("openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS) + ) + except ValueError: + return { + "failed": True, + "msg": ('Invalid value provided for "openshift_check_logging_index_timeout_seconds". ' + 'Value must be an integer representing an amount in seconds.'), + } + + running_component_pods = dict() + + # get all component pods + self.logging_namespace = self.get_var("openshift_logging_namespace", default=self.logging_namespace) + for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']): + pods, error = self.get_pods_for_component(self.logging_namespace, component) + + if error: + msg = 'Unable to retrieve pods for the {} logging component: {}' + return {"failed": True, "changed": False, "msg": msg.format(name, error)} + + running_pods = self.running_pods(pods) + + if not running_pods: + msg = ('No {} pods in the "Running" state were found.' + 'At least one pod is required in order to perform this check.') + return {"failed": True, "changed": False, "msg": msg.format(name)} + + running_component_pods[component] = running_pods + + uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0]) + self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout) + return {} + + def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs): + """Retry an Elasticsearch query every second until query success, or a defined + length of time has passed.""" + deadline = time.time() + timeout_secs + interval = 1 + while not self.query_es_from_es(es_pod, uuid): + if time.time() + interval > deadline: + msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s." + raise OpenShiftCheckException(msg.format(uuid, timeout_secs)) + time.sleep(interval) + + def curl_kibana_with_uuid(self, kibana_pod): + """curl Kibana with a unique uuid.""" + uuid = self.generate_uuid() + pod_name = kibana_pod["metadata"]["name"] + exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}" + exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid) + + error_str = self.exec_oc(self.logging_namespace, exec_cmd, []) + + try: + error_code = json.loads(error_str)["statusCode"] + except KeyError: + msg = ('invalid response returned from Kibana request (Missing "statusCode" key):\n' + 'Command: {}\nResponse: {}').format(exec_cmd, error_str) + raise OpenShiftCheckException(msg) + except ValueError: + msg = ('invalid response returned from Kibana request (Non-JSON output):\n' + 'Command: {}\nResponse: {}').format(exec_cmd, error_str) + raise OpenShiftCheckException(msg) + + if error_code != 404: + msg = 'invalid error code returned from Kibana request. Expecting error code "404", but got "{}" instead.' + raise OpenShiftCheckException(msg.format(error_code)) + + return uuid + + def query_es_from_es(self, es_pod, uuid): + """curl the Elasticsearch pod and look for a unique uuid in its logs.""" + pod_name = es_pod["metadata"]["name"] + exec_cmd = ( + "exec {pod_name} -- curl --max-time 30 -s -f " + "--cacert /etc/elasticsearch/secret/admin-ca " + "--cert /etc/elasticsearch/secret/admin-cert " + "--key /etc/elasticsearch/secret/admin-key " + "https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}" + ) + exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid) + result = self.exec_oc(self.logging_namespace, exec_cmd, []) + + try: + count = json.loads(result)["count"] + except KeyError: + msg = 'invalid response from Elasticsearch query:\n"{}"\nMissing "count" key:\n{}' + raise OpenShiftCheckException(msg.format(exec_cmd, result)) + except ValueError: + msg = 'invalid response from Elasticsearch query:\n"{}"\nNon-JSON output:\n{}' + raise OpenShiftCheckException(msg.format(exec_cmd, result)) + + return count + + @staticmethod + def running_pods(pods): + """Filter pods that are running.""" + return [pod for pod in pods if pod['status']['phase'] == 'Running'] + + @staticmethod + def generate_uuid(): + """Wrap uuid generator. Allows for testing with expected values.""" + return str(uuid4()) |