Source code for resource.config

import subprocess as sp
import time
from os.path import expanduser as expand_user
from resource.base import Base_Resource

from docstring import docstring
from lib.http import HTTP_Method
from lib.parser import *
from lib.response import *
from schema.config import *
from schema.response import *
from utils.datetime import datetime_to_str
from utils.exception import extract_info
from utils.json import loads
from utils.sequence import is_list, wrap

File_Not_Found_Error = FileNotFoundError

__all__ = [
    'Config_Resource'
]


[docs]class Config_Resource(Base_Resource): tag = dict(name='config', description='Configuration at run-time.') routes = '/config', parsers = dict(json=json_parser, properties=property_parser, xml=xml_parser, yaml=yaml_parser)
[docs] @docstring(source='config/post.yaml') def on_post(self, req, resp): req_data = req.media or {} resp_data, valid = Config_Request_Schema(many=is_list(req_data), method=HTTP_Method.POST).validate(data=req_data) if valid: req_data_wrap = wrap(req_data) if len(req_data_wrap) > 0: for config in req_data_wrap: for cfg, cfg_list in config.items(): for data in wrap(cfg_list): output = {} if cfg == 'actions': output = self.__actions(data) schema = Config_Action_Response_Schema elif cfg == 'parameters': output = self.__parameters(data) schema = Config_Parameter_Response_Schema elif cfg == 'resources': output = self.__resources(data) schema = Config_Resource_Response_Schema if isinstance(output, Base_Response): output.add(resp) else: output_data = data.copy() id = output_data.pop('id', None) output.update(id=id, data=output_data, timestamp=datetime_to_str()) resp_data, valid = schema( many=False, method=HTTP_Method.POST, unknown='INCLUDE').validate(data=output) if valid: Content_Response(output).add(resp) else: resp_data.add(resp) else: msg = f'No content to apply configurations with the {{request}}' No_Content_Response(msg, request=req_data).apply(resp) else: resp_data.apply(resp)
def __actions(self, data): cmd = data.get('cmd', None) daemon = data.get('daemon', False) output_format = data.get('output_format', 'plain') output = dict(type='action') run = ' '.join([cmd] + wrap(data.get('args', []))) start = time.time() proc = self.__run_cmd(cmd=run, daemon=daemon, output=output) if daemon: output.update(error=False, return_code=0) else: output.update(error=proc.returncode != 0, return_code=proc.returncode, duration=time.time() - start) self.__set_std(proc.stdout, output, 'stdout', output_format) self.__set_std(proc.stderr, output, 'stderr', output_format) return output def __parameters(self, data): schema = data.get('schema', None) source = data.get('source', None) path = wrap(data.get('path', [])) value = data.get('value', None) output = dict(type='parameter') try: source = expand_user(source) output.update(self.parsers.get(schema) (schema, source, path, value)) return output except File_Not_Found_Error as e: msg = f'Source {source} not found' self.log.exception(msg, e) return Not_Found_Response(msg, e, type='parameter', data=data) except Exception as e: msg = f'Source {source} not accessible' self.log.exception(msg, e) return Bad_Request_Response(e, message=msg, type='parameter', data=data) def __resources(self, data): path = data.get('path', None) content = data.get('content', None) output = dict(type='resource') try: fix_path = expand_user(path) with open(fix_path, "w") as file: file.write(content) output.update(path=path, content=content) return output except FileNotFoundError as e: msg = f'Path {path} not found' self.log.exception(msg, e) return Not_Found_Response(msg, e, type='resource', data=data) except Exception as e: msg = f'Path {path} not accessible' self.log.exception(msg, e) return Bad_Request_Response(e, message=msg, type='resource', data=data) def __set_std(self, data, output, key, output_format): if data: data = data.strip() if output_format == 'plain': output[key] = data elif output_format == 'lines': output[key] = data.splitlines() else: try: output[key] = loads(data) except Exception as e: msg = f'Not valid JSON for {key}' self.log.exception(msg, e) output.update(description=msg, exception=extract_info(e)) output[key] = data def __run_cmd(self, cmd, daemon, output): if not daemon: return sp.run(cmd, check=False, shell=True, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True) else: return sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sp.PIPE, start_new_session=True)