#!/usr/bin/env python # ___ ___ _ _ ___ ___ _ _____ ___ ___ # / __| __| \| | __| _ \ /_\_ _| __| \ # | (_ | _|| .` | _|| / / _ \| | | _|| |) | # \___|___|_|\_|___|_|_\/_/_\_\_|_|___|___/_ _____ # | \ / _ \ | \| |/ _ \_ _| | __| \_ _|_ _| # | |) | (_) | | .` | (_) || | | _|| |) | | | | # |___/ \___/ |_|\_|\___/ |_| |___|___/___| |_| ''' module for managing yaml files ''' import os import re import yaml # This is here because of a bug that causes yaml # to incorrectly handle timezone info on timestamps def timestamp_constructor(_, node): ''' return timestamps as strings''' return str(node.value) yaml.add_constructor(u'tag:yaml.org,2002:timestamp', timestamp_constructor) class YeditException(Exception): ''' Exception class for Yedit ''' pass class Yedit(object): ''' Class to modify yaml files ''' re_valid_key = r"(((\[-?\d+\])|([a-zA-Z-./]+)).?)+$" re_key = r"(?:\[(-?\d+)\])|([a-zA-Z-./]+)" def __init__(self, filename=None, content=None, content_type='yaml'): self.content = content self.filename = filename self.__yaml_dict = content self.content_type = content_type if self.filename and not self.content: self.load(content_type=self.content_type) @property def yaml_dict(self): ''' getter method for yaml_dict ''' return self.__yaml_dict @yaml_dict.setter def yaml_dict(self, value): ''' setter method for yaml_dict ''' self.__yaml_dict = value @staticmethod def remove_entry(data, key): ''' remove data at location key ''' if not (key and re.match(Yedit.re_valid_key, key) and isinstance(data, (list, dict))): return None key_indexes = re.findall(Yedit.re_key, key) for arr_ind, dict_key in key_indexes[:-1]: if dict_key and isinstance(data, dict): data = data.get(dict_key, None) elif arr_ind and isinstance(data, list) and int(arr_ind) <= len(data) - 1: data = data[int(arr_ind)] else: return None # process last index for remove # expected list entry if key_indexes[-1][0]: if isinstance(data, list) and int(key_indexes[-1][0]) <= len(data) - 1: del data[int(key_indexes[-1][0])] return True # expected dict entry elif key_indexes[-1][1]: if isinstance(data, dict): del data[key_indexes[-1][1]] return True @staticmethod def add_entry(data, key, item=None): ''' Get an item from a dictionary with key notation a.b.c d = {'a': {'b': 'c'}}} key = a.b return c ''' if not (key and re.match(Yedit.re_valid_key, key) and isinstance(data, (list, dict))): return None curr_data = data key_indexes = re.findall(Yedit.re_key, key) for arr_ind, dict_key in key_indexes[:-1]: if dict_key: if isinstance(data, dict) and data.has_key(dict_key): data = data[dict_key] continue data[dict_key] = {} data = data[dict_key] elif arr_ind and isinstance(data, list) and int(arr_ind) <= len(data) - 1: data = data[int(arr_ind)] else: return None # process last index for add # expected list entry if key_indexes[-1][0] and isinstance(data, list) and int(key_indexes[-1][0]) <= len(data) - 1: data[int(key_indexes[-1][0])] = item # expected dict entry elif key_indexes[-1][1] and isinstance(data, dict): data[key_indexes[-1][1]] = item return curr_data @staticmethod def get_entry(data, key): ''' Get an item from a dictionary with key notation a.b.c d = {'a': {'b': 'c'}}} key = a.b return c ''' if not (key and re.match(Yedit.re_valid_key, key) and isinstance(data, (list, dict))): return None key_indexes = re.findall(Yedit.re_key, key) for arr_ind, dict_key in key_indexes: if dict_key and isinstance(data, dict): data = data.get(dict_key, None) elif arr_ind and isinstance(data, list) and int(arr_ind) <= len(data) - 1: data = data[int(arr_ind)] else: return None return data def write(self): ''' write to file ''' if not self.filename: raise YeditException('Please specify a filename.') with open(self.filename, 'w') as yfd: yfd.write(yaml.safe_dump(self.yaml_dict, default_flow_style=False)) def read(self): ''' write to file ''' # check if it exists if not self.exists(): return None contents = None with open(self.filename) as yfd: contents = yfd.read() return contents def exists(self): ''' return whether file exists ''' if os.path.exists(self.filename): return True return False def load(self, content_type='yaml'): ''' return yaml file ''' contents = self.read() if not contents: return None # check if it is yaml try: if content_type == 'yaml': self.yaml_dict = yaml.load(contents) elif content_type == 'json': self.yaml_dict = json.loads(contents) except yaml.YAMLError as _: # Error loading yaml or json return None return self.yaml_dict def get(self, key): ''' get a specified key''' try: entry = Yedit.get_entry(self.yaml_dict, key) except KeyError as _: entry = None return entry def delete(self, key): ''' remove key from a dict''' try: entry = Yedit.get_entry(self.yaml_dict, key) except KeyError as _: entry = None if not entry: return (False, self.yaml_dict) result = Yedit.remove_entry(self.yaml_dict, key) if not result: return (False, self.yaml_dict) return (True, self.yaml_dict) def put(self, key, value): ''' put key, value into a dict ''' try: entry = Yedit.get_entry(self.yaml_dict, key) except KeyError as _: entry = None if entry == value: return (False, self.yaml_dict) result = Yedit.add_entry(self.yaml_dict, key, value) if not result: return (False, self.yaml_dict) return (True, self.yaml_dict) def create(self, key, value): ''' create a yaml file ''' if not self.exists(): self.yaml_dict = {key: value} return (True, self.yaml_dict) return (False, self.yaml_dict) def main(): ''' ansible oc module for secrets ''' module = AnsibleModule( argument_spec=dict( state=dict(default='present', type='str', choices=['present', 'absent', 'list']), debug=dict(default=False, type='bool'), src=dict(default=None, type='str'), content=dict(default=None, type='dict'), key=dict(default=None, type='str'), value=dict(default=None, type='str'), value_format=dict(default='yaml', choices=['yaml', 'json'], type='str'), ), #mutually_exclusive=[["src", "content"]], supports_check_mode=True, ) state = module.params['state'] yamlfile = Yedit(module.params['src'], module.params['content']) rval = yamlfile.load() if not rval and state != 'present': module.fail_json(msg='Error opening file [%s]. Verify that the' + \ ' file exists, that it is has correct permissions, and is valid yaml.') if state == 'list': module.exit_json(changed=False, results=rval, state="list") if state == 'absent': rval = yamlfile.delete(module.params['key']) module.exit_json(changed=rval[0], results=rval[1], state="absent") if state == 'present': if module.params['value_format'] == 'yaml': value = yaml.load(module.params['value']) elif module.params['value_format'] == 'json': value = json.loads(module.params['value']) if rval: rval = yamlfile.put(module.params['key'], value) if rval[0]: yamlfile.write() module.exit_json(changed=rval[0], results=rval[1], state="present") if not module.params['content']: rval = yamlfile.create(module.params['key'], value) else: rval = yamlfile.load() yamlfile.write() module.exit_json(changed=rval[0], results=rval[1], state="present") module.exit_json(failed=True, changed=False, results='Unknown state passed. %s' % state, state="unknown") # pylint: disable=redefined-builtin, unused-wildcard-import, wildcard-import, locally-disabled # import module snippets. This are required from ansible.module_utils.basic import * main()