WARNING: This file is programatically generated.
This file is parsed by pylint-ignore
to determine which
Pylint messages
should be ignored.
- Do not edit this file manually.
- To update, use
pylint-ignore --update-ignorefile
The recommended approach to using pylint-ignore
is:
- If a message refers to a valid issue, update your code rather than ignoring the message.
- If a message should always be ignored (globally), then to do so
via the usual
pylintrc
orsetup.cfg
files rather than thispylint-ignore.md
file. - If a message is a false positive, add a comment of this form to your code:
# pylint:disable=<symbol> ; explain why this is a false positive
- E0001: syntax-error (1x)
- E1101: no-member (1x)
- W0201: attribute-defined-outside-init (1x)
- W0212: protected-access (8x)
- W0511: fixme (19x)
- W0611: unused-import (1x)
- W0622: redefined-builtin (4x)
- W0703: broad-except (15x)
- W0707: raise-missing-from (1x)
- W1113: keyword-arg-before-vararg (2x)
- C0123: unidiomatic-typecheck (1x)
- R0801: duplicate-code (16x)
- R0903: too-few-public-methods (8x)
- R0912: too-many-branches (15x)
- R0915: too-many-statements (10x)
- R1702: too-many-nested-blocks (4x)
- R1722: consider-using-sys-exit (1x)
message: invalid syntax (<unknown>, line 28)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
26: StateReplyCheck,
27: )
> 28: <<<<<<< HEAD
29: from .exceptions import ConfigError, InvalidUsage
30: from .util import str2total_seconds
message: Class '' has no 'callbacks' member
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
51: def start_metrics_server(port, callbacks=None, addr=""):
...
53: handler = CallbackMetricsHandler.factory(REGISTRY)
54: if callbacks is not None:
> 55: handler.callbacks += callbacks
56: httpd = _ThreadingSimpleServer((addr, port), handler)
57: t = threading.Thread(target=httpd.serve_forever)
message: Attribute 'task' defined outside __init__
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
49: def start(self, loop):
...
57: self.loop = loop
58: self.queue = asyncio.Queue(maxsize=self.queue_size, loop=loop)
> 59: self.task = asyncio.ensure_future(self.consume(), loop=loop)
60: self.started = True
61:
message: Access to a protected member _checks of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-16T09:28:17
170: def add_result(self, result):
...
182: self._result.update(result.results)
183: self._status.update(result.status)
> 184: self._checks.update(result._checks)
185: self._state.update(result._state)
186: self._embedded.update(result._embedded)
message: Access to a protected member _state of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
170: def add_result(self, result):
...
183: self._status.update(result.status)
184: self._checks.update(result._checks)
> 185: self._state.update(result._state)
186: self._embedded.update(result._embedded)
187: if self._error:
message: Access to a protected member _embedded of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
170: def add_result(self, result):
...
184: self._checks.update(result._checks)
185: self._state.update(result._state)
> 186: self._embedded.update(result._embedded)
187: if self._error:
188: if result._error:
message: Access to a protected member _error of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
170: def add_result(self, result):
...
186: self._embedded.update(result._embedded)
187: if self._error:
> 188: if result._error:
189: self._error = self._error + " ;" + result._error
190: else:
message: Access to a protected member _error of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
170: def add_result(self, result):
...
187: if self._error:
188: if result._error:
> 189: self._error = self._error + " ;" + result._error
190: else:
191: self._error = result._error
message: Access to a protected member _error of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
170: def add_result(self, result):
...
189: self._error = self._error + " ;" + result._error
190: else:
> 191: self._error = result._error
192: if self._msg:
193: self._msg.append(result._msg)
message: Access to a protected member _msg of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
170: def add_result(self, result):
...
191: self._error = result._error
192: if self._msg:
> 193: self._msg.append(result._msg)
194: else:
195: self._msg = result._msg
message: Access to a protected member _msg of a client class
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
170: def add_result(self, result):
...
193: self._msg.append(result._msg)
194: else:
> 195: self._msg = result._msg
196:
197: def _add_reply(self, name: str, result: Dict[str, Tuple[str, int]]):
message: TODO: we should smarten up the signal handling here. It should be added
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
22:
23:
> 24: # TODO: we should smarten up the signal handling here. It should be added
25: # directly to the event loop below, this will add the handler at import time.
26: def signal_handler(*_):
message: TODO: pretty much all logging messages config out of this module are ignored
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
101: logger = logging.getLogger(__name__)
102:
> 103: # TODO: pretty much all logging messages config out of this module are ignored
104: # as the default level has not yet been applied, some workaround should be
105: # figured out. For the moment, just uncomment the line below
message: TODO: should this be logged here?
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
95: def remove_hosts(self, hosts: List[str]) -> bool:
...
115: f"Could not remove from blocklist. Requested hosts {bad_hosts} unknown."
116: )
> 117: logger.debug(msg) # TODO: should this be logged here?
118:
119: raise InvalidUsage(
message: TODO: This will be used by certain kotekan endpoints that do not accept
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
54: def main_loop(
...
119:
120: # Parse URL query parameters
> 121: # TODO: This will be used by certain kotekan endpoints that do not accept
122: # POST but need parameters specified. If we find another scheme to
123: # make this work we should remove this feature as it is somewhat
message: TODO: Add an option to overwrite values only if present in request?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
43: def __init__(self, name, conf, forwarder, state):
...
166: f"and in `send_state`)."
167: )
> 168: # TODO: Add an option to overwrite values only if present in request?
169: except KeyError:
170: # That the values are being sent from the state doesn't mean they need to
message: TODO: Add an option to overwrite values only if present in request?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
44: def __init__(self, name, conf, forwarder, state):
...
167: f"and in `send_state`)."
168: )
> 169: # TODO: Add an option to overwrite values only if present in request?
170: except KeyError:
171: # That the values are being sent from the state doesn't mean they need to
message: TODO: Add an option to overwrite values only if present in request?
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
44: def __init__(self, name, conf, forwarder, state):
...
167: f"and in `send_state`)."
168: )
> 169: # TODO: Add an option to overwrite values only if present in request?
170: except KeyError:
171: # That the values are being sent from the state doesn't mean they need to
message: TODO: Add an option to overwrite values only if present in request?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
45: def __init__(self, name, conf, forwarder, state):
...
168: f"and in `send_state`)."
169: )
> 170: # TODO: Add an option to overwrite values only if present in request?
171: except KeyError:
172: # That the values are being sent from the state doesn't mean they need to
message: TODO: raise log level in failure case?
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
168: def _call_endpoints_on_start(self):
...
189: result = self.redis_sync.blpop(f"{name}:res")[1]
190: self.redis_sync.delete(f"{name}:res")
> 191: # TODO: raise log level in failure case?
192: logger.debug(f"Called /{endpoint.name} on start, result: {result}")
193:
message: TODO: validate the endpoint config in here
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
273: def _load_endpoint_config(config):
...
298: conf["name"] = name
299:
> 300: # TODO: validate the endpoint config in here
301: config["endpoints"].append(conf)
message: TODO: move into config.py
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
292: def _load_config(self, config_path: os.PathLike):
...
330:
331: # Validate slack posting rules
> 332: # TODO: move into config.py
333: for rdict in self.config["slack_rules"]:
334: if "logger" not in rdict or "channel" not in rdict:
message: TODO: should we do that concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
283: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
463:
464: # Forward the request to group and then to other coco endpoints
> 465: # TODO: should we do that concurrently?
466: for forward in self.forwards_external:
467: result_forward = await forward.trigger(
message: TODO: should we do that concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
282: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
468:
469: # Forward the request to group and then to other coco endpoints
> 470: # TODO: should we do that concurrently?
471: for forward in self.forwards_external:
472: result_forward = await forward.trigger(
message: TODO: should we do that concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
289: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
469:
470: # Forward the request to group and then to other coco endpoints
> 471: # TODO: should we do that concurrently?
472: for forward in self.forwards_external:
473: result_forward = await forward.trigger(
message: TODO: should we do that concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
288: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
474:
475: # Forward the request to group and then to other coco endpoints
> 476: # TODO: should we do that concurrently?
477: for forward in self.forwards_external:
478: result_forward = await forward.trigger(
message: TODO: run these concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
283: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
492: result_forward = await forward.trigger(self.type, {}, hosts)
493: result.embed(forward.name, result_forward)
> 494: # TODO: run these concurrently?
495:
496: if self.get_state:
message: TODO: run these concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
282: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
497: result_forward = await forward.trigger(self.type, {}, hosts)
498: result.embed(forward.name, result_forward)
> 499: # TODO: run these concurrently?
500:
501: if self.get_state:
message: TODO: run these concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
289: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
498: result_forward = await forward.trigger(self.type, {}, hosts)
499: result.embed(forward.name, result_forward)
> 500: # TODO: run these concurrently?
501:
502: if self.get_state:
message: TODO: run these concurrently?
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
288: def _load_checks(self, check_dict: Dict) -> List[Check]:
...
503: result_forward = await forward.trigger(self.type, {}, hosts)
504: result.embed(forward.name, result_forward)
> 505: # TODO: run these concurrently?
506:
507: if self.get_state:
message: Unused InternalError imported from exceptions
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
26: StateReplyCheck,
27: )
> 28: from .exceptions import ConfigError, InvalidUsage, InternalError
29:
30: ON_FAILURE_ACTIONS = ["call", "call_single_host"]
message: Redefining built-in 'callable'
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
688: def __init__(
...
690: name: str,
691: type_: Union[str, List[str]],
> 692: callable: Callable[[sanic.request.Request], Optional[dict]],
693: ):
694: self.name = name
message: Redefining built-in 'callable'
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
693: def __init__(
...
695: name: str,
696: type_: Union[str, List[str]],
> 697: callable: Callable[[sanic.request.Request], Optional[dict]],
698: ):
699: self.name = name
message: Redefining built-in 'callable'
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
694: def __init__(
...
696: name: str,
697: type_: Union[str, List[str]],
> 698: callable: Callable[[sanic.request.Request], Optional[dict]],
699: ):
700: self.name = name
message: Redefining built-in 'callable'
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
699: def __init__(
...
701: name: str,
702: type_: Union[str, List[str]],
> 703: callable: Callable[[sanic.request.Request], Optional[dict]],
704: ):
705: self.name = name
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
48: def __init__(self, conf, reset=False, check_config=False):
...
136: try:
137: self.qworker.start()
> 138: except Exception:
139: self.qworker.join()
140:
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
146: def __del__(self):
...
154: try:
155: self.redis_sync.rpush("queue", "coco_shutdown")
> 156: except Exception as e:
157: logger.error(
158: f"Failed sending shutdown command to worker (have to kill it): {type(e)}: {e}"
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
54: def main_loop(
...
155: # Unexpected exceptions are returned as HTTP 500 errors, and dump a
156: # traceback
> 157: except Exception as e:
158: etype = e.__class__.__qualname__
159: msg = e.args[0] if e.args else None
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
132: def __init__(self, *args, token=None, **kwargs):
...
158: )
159: )
> 160: except Exception as e:
161: print(
162: "Sending message to slack server failed: {}\nThis was the message:\n\t{}".format(
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
204: def emit(self, record):
...
208: payload["channel"] = self.channel
209: self.queue.push(payload)
> 210: except Exception:
211: self.handleError(record)
212:
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
148: def add_condition(self, condition):
...
211: ) as r:
212: r.raise_for_status()
> 213: except Exception as e:
214: logger.error(
215: f"Scheduler failed calling {self.name}: ({e}). Has coco's sanic server crashed?"
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
210: def init_metrics(self):
...
309: except AsyncioTimeoutError:
310: return host, ("Timeout", 0)
> 311: except Exception as e:
312: return host, (str(e), 0)
313: finally:
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
550: def client_send_request(host, port, metrics_port, args):
...
587: try:
588: q_size = await metric.get("coco_queue_length_total", metrics_port, host)
> 589: except Exception as err:
590: if not isinstance(err, asyncio.CancelledError):
591: print(f"Couldn't get queue fill level from cocod: {err}")
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
555: def client_send_request(host, port, metrics_port, args):
...
592: try:
593: q_size = await metric.get("coco_queue_length_total", metrics_port, host)
> 594: except Exception as err:
595: if not isinstance(err, asyncio.CancelledError):
596: print(f"Couldn't get queue fill level from cocod: {err}")
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
556: def client_send_request(host, port, metrics_port, args):
...
593: try:
594: q_size = await metric.get("coco_queue_length_total", metrics_port, host)
> 595: except Exception as err:
596: if not isinstance(err, asyncio.CancelledError):
597: print(f"Couldn't get queue fill level from cocod: {err}")
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
561: def client_send_request(host, port, metrics_port, args):
...
598: try:
599: q_size = await metric.get("coco_queue_length_total", metrics_port, host)
> 600: except Exception as err:
601: if not isinstance(err, asyncio.CancelledError):
602: print(f"Couldn't get queue fill level from cocod: {err}")
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
550: def client_send_request(host, port, metrics_port, args):
...
615: except ContentTypeError:
616: result = {"Error": await resp.text()}
> 617: except Exception as e:
618: return False, f"coco-client: Sending request failed: {e}"
619: else:
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
555: def client_send_request(host, port, metrics_port, args):
...
620: except ContentTypeError:
621: result = {"Error": await resp.text()}
> 622: except Exception as e:
623: return False, f"coco-client: Sending request failed: {e}"
624: else:
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
556: def client_send_request(host, port, metrics_port, args):
...
621: except ContentTypeError:
622: result = {"Error": await resp.text()}
> 623: except Exception as e:
624: return False, f"coco-client: Sending request failed: {e}"
625: else:
message: Catching too general exception Exception
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
561: def client_send_request(host, port, metrics_port, args):
...
626: except ContentTypeError:
627: result = {"Error": await resp.text()}
> 628: except Exception as e:
629: return False, f"coco-client: Sending request failed: {e}"
630: else:
message: Consider explicitly re-raising using the 'from' keyword
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
54: def main_loop(
...
128: endpoint = endpoints[endpoint_name]
129: except KeyError:
> 130: raise InvalidPath(f"Endpoint /{endpoint_name} not found.")
131:
132: # Check that it is being requested with the correct method
message: Keyword argument before variable positional arguments list in the definition of __init__ function
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
191: class SlackLogHandler(logging.Handler):
...
192: """Logging handler to post to Slack."""
193:
> 194: def __init__(self, channel, queue=None, *args, **kwargs):
195: super().__init__(*args, **kwargs)
196:
message: Keyword argument before variable positional arguments list in the definition of __init__ function
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
227: class SlackLogFormatter(logging.Formatter):
...
233: """
234:
> 235: def __init__(self, title=None, *args, **kwargs):
236: super().__init__(*args, **kwargs)
237: self.title = title
message: Using type() instead of isinstance() for a typecheck.
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
190: def merge_dict_tree(a, b):
...
210:
211: # Different types should return b
> 212: if type(a) != type(b):
213: return b
214:
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:279
==coco.endpoint_REMOTE_1678491:273
)
)
self.has_external_forwards = True
# Internal forwards
forward_to_coco = forward_dict.get("coco", None)
self._load_internal_forward(forward_to_coco, self.forwards_internal)
def _load_checks(self, check_dict: Dict) -> List[Check]:
checks = list()
if not check_dict:
return checks
try:
name = check_dict["name"]
except KeyError as e:
raise ConfigError(
f"Name missing from forward reply check block: {check_dict}."
) from e
save_to_state = check_dict.get("save_reply_to_state", None)
if save_to_state:
if not isinstance(save_to_state, str):
raise ConfigError(
f"'save_reply_to_state' in check for '{name}' in '{self.name}"
f".conf' is of type '{type(save_to_state).__name__}' "
f"(expected str)."
)
logger.debug(
f"Endpoint {self.name} will save replies to state: {save_to_state}."
)
on_failure = check_dict.get("on_failure", None)
if on_failure:
for action, endpoint in on_failure.items():
if not isinstance(endpoint, str):
raise ConfigError(
f"'on_failure'-endpoint in forward to '{name}' in "
f"'{self.name}.conf' is of type "
f"'{type(endpoint).__name__}' (expected str)."
)
if action not in ON_FAILURE_ACTIONS:
raise ConfigError(
f"Unknown 'on_failure'-action in '{name}' ('{self.name}."
f"conf'): {action}. Use one of {ON_FAILURE_ACTIONS}."
)
reply = check_dict.get("reply", None)
if reply:
if not isinstance(reply, dict):
raise ConfigError(
f"Value 'reply' defining checks in '{name}' has type "
f"{type(reply).__name__} (expected dict)."
)
values = reply.get("value", None)
types = reply.get("type", None)
identical = reply.get("identical", None)
state = reply.get("state", None)
state_hash = reply.get("state_hash", None)
num_hosts_warning = check_dict.get("num_hosts_warning", None)
if not (values or types or identical or state or state_hash):
logger.info(
f"In {self.name}.conf '{name}' has a 'reply' block, but it's empty."
)
return checks
if values:
checks.append(
ValueReplyCheck(
name,
values,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if types:
checks.append(
TypeReplyCheck(
name,
types,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if identical:
checks.append(
IdenticalReplyCheck(
name,
identical,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state:
checks.append(
StateReplyCheck(
name,
state,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state_hash:
checks.append(
StateHashReplyCheck(
name,
state_hash,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
return checks
async def call(self, request, hosts=None, params=None):
"""
Call the endpoint.
Returns
-------
:class:`Result`
The result of the endpoint call.
"""
self.logger.debug("endpoint called")
if params is None:
params = []
if self.enforce_group:
hosts = None
result = Result(self.name)
if self.before:
for forward in self.before:
result_forward = await forward.trigger(self.type, {}, hosts)
result.embed(forward.name, result_forward)
# TODO: run these concurrently?
# Only forward values we expect
filtered_request = copy(self.values)
if request is None:
request = dict()
if filtered_request:
for key, value in filtered_request.items():
try:
if not isinstance(request[key], value):
msg = (
f"{self.name} received value '{key}'' of type "
f"{type(request[key]).__name__} (expected {value.__name__})."
)
self.logger.warning(msg)
raise InvalidUsage(msg)
except KeyError as e:
msg = f"{self.name} requires value '{key}'."
self.logger.warning(msg)
raise InvalidUsage(msg) from e
# save the state change:
if self.save_state:
for path in self.save_state:
self.state.write(path, request.get(key), key)
filtered_request[key] = request.pop(key)
# log the request content
msg = ""
if filtered_request:
for k, v in filtered_request.items():
msg = f"{msg}{k}: {v}\n"
msg = msg[:-1]
self.logger.info(msg)
# Send values from state if not found in request (some type checking is done in constructor
# and when state changed)
if self.send_state:
send_state = self.state.read(self.send_state)
if filtered_request:
send_state.update(filtered_request)
filtered_request = send_state
# Forward the request to group and then to other coco endpoints
# TODO: should we do that concurrently?
for forward in self.forwards_external:
result_forward = await forward.trigger(
self.type, filtered_request, hosts, params
)
result.add_result(result_forward)
for forward in self.forwards_internal:
result_forward = await forward.trigger(
self.type, filtered_request, hosts, params
)
result.embed(forward.name, result_forward)
# Look for result type parameter in request
if request:
result.type = request.pop("coco_report_type", self.report_type)
else:
result.type = self.report_type
# Report any additional values in the request
if request:
for key in request.keys():
msg = f"Found additional value '{key}' in request to /{self.name}."
self.logger.warning(msg)
result.add_message(msg)
if self.after:
for forward in self.after:
result_forward = await forward.trigger(self.type, {}, hosts)
result.embed(forward.name, result_forward)
# TODO: run these concurrently?
if self.get_state:
result.state(self.state.extract(self.get_state))
if result.success:
if self.set_state:
for path, value in self.set_state.items():
self.state.write(path, value)
self.write_timestamp()
self.logger.debug("Success!")
return result
def write_timestamp(self):
"""
Write a Unix timestamp (float) to the state.
Does nothing if the endpoint doesn't have a path specified in `timestamp`.
"""
if not self.timestamp_path:
return
self.state.write(self.timestamp_path, time.time())
self.logger.debug(
f"/{self.name} saved timestamp to state: {self.timestamp_path}"
)
def client_call(self, host, port, metrics_port, args):
"""
Call from a client.
Send a request to coco daemon at <host>. Return the reply as json or an error string.
Parameters
----------
host : str
Address of coco daemon.
port : int
Port of coco daemon.
metrics_port : int
Port of the prometheus server
args : :class:`Namespace`
Is expected to include all values of the endpoint.
"""
data = copy(self.values)
if data:
for key, type_ in data.items():
data[key] = self._parse_container_arg(key, type_, vars(args)[key])
else:
data = dict()
args.endpoint = self.name
args.type = self.type
args.data = data
return self.client_send_request(host, port, metrics_port, args)
@staticmethod
def client_send_request(host, port, metrics_port, args):
"""
Send a request to an endpoint.
Parameters
----------
host : str
Host.
port : int
Port.
metrics_port : int
Port of the prometheus server
endpoint : str
Endpoint name.
type : str
HTTP request type.
data : json
JSON data.
args : :class:`argparse.Namespace`
Namespace populated by argparse. May contain the report type
(`report : str`), the refresh time for the client in seconds
('client-refresh-time' : int) and ('silent' : boolean) to suppress printing
anything but the result.
Returns
-------
bool
Success
json or str
The reply
"""
data = args.data
endpoint = args.endpoint
type_ = args.type
data["coco_report_type"] = args.report
async def print_queue_size(metric_request_count):
try:
q_size = await metric.get("coco_queue_length_total", metrics_port, host)
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
print(f"Couldn't get queue fill level from cocod: {err}")
return
print(
f"\rThere are {int(q_size)} requests in the queue.",
sep=" ",
end="",
flush=True,
)
if metric_request_count % 2:
print(" ", sep=" ", end="", flush=True)
else:
print(".", sep=" ", end="", flush=True)
async def send_request():
url = f"http://{host}:{port}/{endpoint}"
if not args.silent:
print("Sending request...")
async with ClientSession() as session:
try:
command = getattr(session, type_.lower())
async with command(url, json=data) as resp:
try:
result = await resp.json()
except ContentTypeError:
result = {"Error": await resp.text()}
except Exception as e:
return False, f"coco-client: Sending request failed: {e}"
else:
return True, result
async def request_and_wait():
"""
Send the request and while waiting get and print queue fill level metric.
Returns
-------
Done task for request result.
"""
main_request = asyncio.create_task(send_request())
if not args.silent:
metric_request_count = 0
while True:
metric_request_count = metric_request_count + 1
queue_size = asyncio.create_task(
print_queue_size(metric_request_count)
)
# Wait until either main request or request for metric is done
done, _ = await asyncio.wait(
{main_request, queue_size}, return_when=asyncio.FIRST_COMPLETED
)
if main_request in done:
queue_size.cancel()
print("\n")
break
# Wait a moment before getting metric again
wait = asyncio.create_task(asyncio.sleep(args.client_refresh_time))
# Cancel waiting in case main request is done
done, _ = await asyncio.wait(
{main_request, wait}, return_when=asyncio.FIRST_COMPLETED
)
if main_request in done:
wait.cancel()
print("\n")
break
return await main_request
return asyncio.run(request_and_wait())
@staticmethod
def _parse_container_arg(key, type_, arg):
if type_ in (list, dict):
try:
value = json.loads(arg)
except json.JSONDecodeError as e:
raise InvalidUsage(f"Failure parsing argument '{key}': {e}") from e
return value
return arg
class LocalEndpoint:
"""An endpoint that will execute a callable solely within coco.
Parameters
----------
name
Endpoint name.
type_
Type of request to accept. Either a string ("POST", ...) or a list of
strings.
callable
A callable that will be called to execute the endpoint.
"""
call_on_start = False
def __init__(
self,
name: str,
type_: Union[str, List[str]],
callable: Callable[[sanic.request.Request], Optional[dict]],
):
self.name = name
self.type = type_
self.callable = callable
self.schedule = None
async def call(self, request, **_):
"""Call the local endpoint."""
return await self.callable(request)
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint_BASE_1678491:274
==coco.endpoint_LOCAL_1678491:280
)
)
self.has_external_forwards = True
# Internal forwards
forward_to_coco = forward_dict.get("coco", None)
self._load_internal_forward(forward_to_coco, self.forwards_internal)
def _load_checks(self, check_dict: Dict) -> List[Check]:
checks = list()
if not check_dict:
return checks
try:
name = check_dict["name"]
except KeyError as e:
raise ConfigError(
f"Name missing from forward reply check block: {check_dict}."
) from e
save_to_state = check_dict.get("save_reply_to_state", None)
if save_to_state:
if not isinstance(save_to_state, str):
raise ConfigError(
f"'save_reply_to_state' in check for '{name}' in '{self.name}"
f".conf' is of type '{type(save_to_state).__name__}' "
f"(expected str)."
)
logger.debug(
f"Endpoint {self.name} will save replies to state: {save_to_state}."
)
on_failure = check_dict.get("on_failure", None)
if on_failure:
for action, endpoint in on_failure.items():
if not isinstance(endpoint, str):
raise ConfigError(
f"'on_failure'-endpoint in forward to '{name}' in "
f"'{self.name}.conf' is of type "
f"'{type(endpoint).__name__}' (expected str)."
)
if action not in ON_FAILURE_ACTIONS:
raise ConfigError(
f"Unknown 'on_failure'-action in '{name}' ('{self.name}."
f"conf'): {action}. Use one of {ON_FAILURE_ACTIONS}."
)
reply = check_dict.get("reply", None)
if reply:
if not isinstance(reply, dict):
raise ConfigError(
f"Value 'reply' defining checks in '{name}' has type "
f"{type(reply).__name__} (expected dict)."
)
values = reply.get("value", None)
types = reply.get("type", None)
identical = reply.get("identical", None)
state = reply.get("state", None)
state_hash = reply.get("state_hash", None)
num_hosts_warning = check_dict.get("num_hosts_warning", None)
if not (values or types or identical or state or state_hash):
logger.info(
f"In {self.name}.conf '{name}' has a 'reply' block, but it's empty."
)
return checks
if values:
checks.append(
ValueReplyCheck(
name,
values,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if types:
checks.append(
TypeReplyCheck(
name,
types,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if identical:
checks.append(
IdenticalReplyCheck(
name,
identical,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state:
checks.append(
StateReplyCheck(
name,
state,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state_hash:
checks.append(
StateHashReplyCheck(
name,
state_hash,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
return checks
async def call(self, request, hosts=None, params=None):
"""
Call the endpoint.
Returns
-------
:class:`Result`
The result of the endpoint call.
"""
self.logger.info("endpoint called")
if params is None:
params = []
if self.enforce_group:
hosts = None
result = Result(self.name)
if self.before:
for forward in self.before:
result_forward = await forward.trigger(self.type, {}, hosts)
result.embed(forward.name, result_forward)
# TODO: run these concurrently?
# Only forward values we expect
filtered_request = copy(self.values)
if request is None:
request = dict()
if filtered_request:
for key, value in filtered_request.items():
try:
if not isinstance(request[key], value):
msg = (
f"{self.name} received value '{key}'' of type "
f"{type(request[key]).__name__} (expected {value.__name__})."
)
self.logger.warning(msg)
raise InvalidUsage(msg)
except KeyError as e:
msg = f"{self.name} requires value '{key}'."
self.logger.warning(msg)
raise InvalidUsage(msg) from e
# save the state change:
if self.save_state:
for path in self.save_state:
self.state.write(path, request.get(key), key)
filtered_request[key] = request.pop(key)
self.logger.debug(f'Request "{filtered_request}"')
# Send values from state if not found in request (some type checking is done in constructor
# and when state changed)
if self.send_state:
send_state = self.state.read(self.send_state)
if filtered_request:
send_state.update(filtered_request)
filtered_request = send_state
# Forward the request to group and then to other coco endpoints
# TODO: should we do that concurrently?
for forward in self.forwards_external:
result_forward = await forward.trigger(
self.type, filtered_request, hosts, params
)
result.add_result(result_forward)
for forward in self.forwards_internal:
result_forward = await forward.trigger(
self.type, filtered_request, hosts, params
)
result.embed(forward.name, result_forward)
# Look for result type parameter in request
if request:
result.type = request.pop("coco_report_type", self.report_type)
else:
result.type = self.report_type
# Report any additional values in the request
if request:
for key in request.keys():
msg = f"Found additional value '{key}' in request to /{self.name}."
self.logger.warning(msg)
result.add_message(msg)
if self.after:
for forward in self.after:
result_forward = await forward.trigger(self.type, {}, hosts)
result.embed(forward.name, result_forward)
# TODO: run these concurrently?
if self.get_state:
result.state(self.state.extract(self.get_state))
if result.success:
if self.set_state:
for path, value in self.set_state.items():
self.state.write(path, value)
self.write_timestamp()
self.logger.debug("Success!")
return result
def write_timestamp(self):
"""
Write a Unix timestamp (float) to the state.
Does nothing if the endpoint doesn't have a path specified in `timestamp`.
"""
if not self.timestamp_path:
return
self.state.write(self.timestamp_path, time.time())
self.logger.debug(
f"/{self.name} saved timestamp to state: {self.timestamp_path}"
)
def client_call(self, host, port, metrics_port, args):
"""
Call from a client.
Send a request to coco daemon at <host>. Return the reply as json or an error string.
Parameters
----------
host : str
Address of coco daemon.
port : int
Port of coco daemon.
metrics_port : int
Port of the prometheus server
args : :class:`Namespace`
Is expected to include all values of the endpoint.
"""
data = copy(self.values)
if data:
for key, type_ in data.items():
data[key] = self._parse_container_arg(key, type_, vars(args)[key])
else:
data = dict()
args.endpoint = self.name
args.type = self.type
args.data = data
return self.client_send_request(host, port, metrics_port, args)
@staticmethod
def client_send_request(host, port, metrics_port, args):
"""
Send a request to an endpoint.
Parameters
----------
host : str
Host.
port : int
Port.
metrics_port : int
Port of the prometheus server
endpoint : str
Endpoint name.
type : str
HTTP request type.
data : json
JSON data.
args : :class:`argparse.Namespace`
Namespace populated by argparse. May contain the report type
(`report : str`), the refresh time for the client in seconds
('client-refresh-time' : int) and ('silent' : boolean) to suppress printing
anything but the result.
Returns
-------
bool
Success
json or str
The reply
"""
data = args.data
endpoint = args.endpoint
type_ = args.type
data["coco_report_type"] = args.report
async def print_queue_size(metric_request_count):
try:
q_size = await metric.get("coco_queue_length_total", metrics_port, host)
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
print(f"Couldn't get queue fill level from cocod: {err}")
return
print(
f"\rThere are {int(q_size)} requests in the queue.",
sep=" ",
end="",
flush=True,
)
if metric_request_count % 2:
print(" ", sep=" ", end="", flush=True)
else:
print(".", sep=" ", end="", flush=True)
async def send_request():
url = f"http://{host}:{port}/{endpoint}"
if not args.silent:
print("Sending request...")
async with ClientSession() as session:
try:
command = getattr(session, type_.lower())
async with command(url, json=data) as resp:
try:
result = await resp.json()
except ContentTypeError:
result = {"Error": await resp.text()}
except Exception as e:
return False, f"coco-client: Sending request failed: {e}"
else:
return True, result
async def request_and_wait():
"""
Send the request and while waiting get and print queue fill level metric.
Returns
-------
Done task for request result.
"""
main_request = asyncio.create_task(send_request())
if not args.silent:
metric_request_count = 0
while True:
metric_request_count = metric_request_count + 1
queue_size = asyncio.create_task(
print_queue_size(metric_request_count)
)
# Wait until either main request or request for metric is done
done, _ = await asyncio.wait(
{main_request, queue_size}, return_when=asyncio.FIRST_COMPLETED
)
if main_request in done:
queue_size.cancel()
print("\n")
break
# Wait a moment before getting metric again
wait = asyncio.create_task(asyncio.sleep(args.client_refresh_time))
# Cancel waiting in case main request is done
done, _ = await asyncio.wait(
{main_request, wait}, return_when=asyncio.FIRST_COMPLETED
)
if main_request in done:
wait.cancel()
print("\n")
break
return await main_request
return asyncio.run(request_and_wait())
@staticmethod
def _parse_container_arg(key, type_, arg):
if type_ in (list, dict):
try:
value = json.loads(arg)
except json.JSONDecodeError as e:
raise InvalidUsage(f"Failure parsing argument '{key}': {e}") from e
return value
return arg
class LocalEndpoint:
"""An endpoint that will execute a callable solely within coco.
Parameters
----------
name
Endpoint name.
type_
Type of request to accept. Either a string ("POST", ...) or a list of
strings.
callable
A callable that will be called to execute the endpoint.
"""
call_on_start = False
def __init__(
self,
name: str,
type_: Union[str, List[str]],
callable: Callable[[sanic.request.Request], Optional[dict]],
):
self.name = name
self.type = type_
self.callable = callable
self.schedule = None
async def call(self, request, **_):
"""Call the local endpoint."""
return await self.callable(request)
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:33
==coco.endpoint_LOCAL_1678491:34
logger = logging.getLogger(__name__)
class Endpoint:
"""
An endpoint.
Does whatever the config says.
"""
def __init__(self, name, conf, forwarder, state):
logger.debug(f"Loading {name}.conf")
self.name = name
if conf is None:
conf = dict()
self.description = conf.get("description", "")
self.type = conf.get("type", "GET")
self.group = conf.get("group")
self.callable = conf.get("callable", False)
self.call_on_start = conf.get("call_on_start", False)
self.forwarder = forwarder
self.state = state
self.report_type = conf.get("report_type", "CODES_OVERVIEW")
self.values = copy(conf.get("values", None))
self.get_state = conf.get("get_state", None)
self.send_state = conf.get("send_state", None)
self.save_state = conf.get("save_state", None)
self.set_state = conf.get("set_state", None)
self.schedule = conf.get("schedule", None)
self.enforce_group = bool(conf.get("enforce_group", False))
self.forward_checks = dict()
# Setup the endpoint logger
self.logger = logging.getLogger(f"{__name__}.{self.name}")
if self.values:
for key, value in self.values.items():
self.values[key] = locate(value)
if self.values[key] is None:
raise RuntimeError(
f"Value {key} of endpoint {name} is of unknown type "
f"{value}."
)
if not self.state:
return
# To hold forward calls: first external ones than internal (coco) endpoints.
self.has_external_forwards = False
self._load_calls(conf.get("call", None))
self.before = list()
self.after = list()
self._load_internal_forward(conf.get("before"), self.before)
self._load_internal_forward(conf.get("after"), self.after)
self.timestamp_path = conf.get("timestamp", None)
if self.timestamp_path:
if self.state.find_or_create(self.timestamp_path):
logger.info(
f"`{self.timestamp_path}` is not empty. /{name} will overwrite "
f"it with timestamps."
)
if self.save_state:
if isinstance(self.save_state, str):
self.save_state = [self.save_state]
# Check if state path exists
for save_state in self.save_state:
path = self.state.find_or_create(save_state)
if not path:
self.logger.debug(
f"state path `{save_state}` configured in `save_state` for "
f"endpoint `{name}` is empty."
)
# If save_state is set, the configured values have to match.
if self.values:
# Check if endpoint value types match the associated part of the saved state
for key in self.values.keys():
try:
if not (
path[key] is None
or isinstance(path[key], self.values[key])
):
raise RuntimeError(
f"Value {key} in configured initial state at /{save_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
except KeyError:
# That the values are being saved in the state doesn't mean they need to
# exist in the initially loaded state, but write a debug line.
self.logger.debug(
f"Value {key} not found in configured initial state at "
f"/{save_state}/."
)
except TypeError as e:
raise ConfigError(
f"Value {key} has unknown type {self.values[key]} in "
f"config of endpoint /{self.name}."
) from e
else:
self.logger.warning(
f"{self.name}.conf has set save_state ({save_state}), but no "
f"values are listed. This endpoint will ignore all data sent to it."
)
# If send_state is set, the configured values have to match.
if self.send_state:
# Check if state path exists
path = self.state.find_or_create(self.send_state)
if not path:
self.logger.warning(
f"state path `{self.send_state}` configured in "
f"`send_state` for endpoint `{name}` is empty."
)
if self.values:
# Check if endpoint value types match the associated part of the send_state
for key in self.values.keys():
try:
if not isinstance(path[key], self.values[key]):
raise RuntimeError(
f"Value {key} in configured initial state at /{self.send_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
# It exists both in the values and the state
self.logger.debug(
f"Value {key} is required by this endpoint so it will never "
f"get sent from state (the key was found in both `values` "
f"and in `send_state`)."
)
# TODO: Add an option to overwrite values only if present in request?
except KeyError:
# That the values are being sent from the state doesn't mean they need to
# exist in the value list.
pass
# Check if get state path exists
if self.get_state:
path = self.state.find_or_create(self.get_state)
if not path:
self.logger.warning(
f"state path `{self.get_state}` configured in "
f"`get_state` for endpoint `{name}` is empty."
)
def _load_internal_forward(self, dict_, list_):
"""
Load Forward's from the config dictionary, generate objects and place in list.
Parameters
----------
dict_ : Dict, str, List[Dict] or List[str]
Config dict(s) describing an internal forward or just string(s) with endpoint name.
list_ : List[CocoForward]
The list to save the Forward objects in.
"""
if not dict_:
return
if not isinstance(dict_, list):
dict_ = [dict_]
for f in dict_:
if isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Found and internal forwarding block in {self.name}.cong that is missing "
f"field 'name'."
) from e
try:
request = f.pop("request")
except KeyError:
request = None
list_.append(
CocoForward(
name, self.forwarder, None, request, self._load_checks(f)
)
)
else:
if not isinstance(f, str):
raise ConfigError(
f"Found '{type(f)}' in {self.name}.conf in an internal forwarding block "
f"(expected str or dict)."
)
list_.append(CocoForward(f, self.forwarder, None, None, None))
def _load_calls(self, forward_dict):
"""Parse the dict from forwarding config and save the Forward objects."""
self.forwards_external = list()
self.forwards_internal = list()
if forward_dict is None:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. Or "
f"it needs to set 'call: forward: null'."
)
self.forwards_external.append(
ExternalForward(self.name, self.forwarder, self.group, None, None)
)
self.has_external_forwards = True
else:
# External forwards
forward_ext = forward_dict.get("forward", [self.name])
# could be a string or list(str):
if forward_ext:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. "
f"Or it needs to set 'call: forward: null'."
)
if not isinstance(forward_ext, list):
forward_ext = [forward_ext]
for f in forward_ext:
if isinstance(f, str):
self.forwards_external.append(
ExternalForward(f, self.forwarder, self.group, None, None)
)
# could also be a block where there are checks configured for each forward call
elif isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Entry in forward call from "
f"/{self.name} is missing field 'name'."
) from e
timeout = f.get("timeout", None)
if timeout is not None:
timeout = str2total_seconds(timeout)
self.forwards_external.append(
ExternalForward(
name,
self.forwarder,
self.group,
None,
self._load_checks(f),
timeout,
)
)
self.has_external_forwards = True
# Internal forwards
forward_to_coco = forward_dict.get("coco", None)
self._load_internal_forward(forward_to_coco, self.forwards_internal)
def _load_checks(self, check_dict: Dict) -> List[Check]:
checks = list()
if not check_dict:
return checks
try:
name = check_dict["name"]
except KeyError as e:
raise ConfigError(
f"Name missing from forward reply check block: {check_dict}."
) from e
save_to_state = check_dict.get("save_reply_to_state", None)
if save_to_state:
if not isinstance(save_to_state, str):
raise ConfigError(
f"'save_reply_to_state' in check for '{name}' in '{self.name}"
f".conf' is of type '{type(save_to_state).__name__}' "
f"(expected str)."
)
logger.debug(
f"Endpoint {self.name} will save replies to state: {save_to_state}."
)
on_failure = check_dict.get("on_failure", None)
if on_failure:
for action, endpoint in on_failure.items():
if not isinstance(endpoint, str):
raise ConfigError(
f"'on_failure'-endpoint in forward to '{name}' in "
f"'{self.name}.conf' is of type "
f"'{type(endpoint).__name__}' (expected str)."
)
if action not in ON_FAILURE_ACTIONS:
raise ConfigError(
f"Unknown 'on_failure'-action in '{name}' ('{self.name}."
f"conf'): {action}. Use one of {ON_FAILURE_ACTIONS}."
)
reply = check_dict.get("reply", None)
if reply:
if not isinstance(reply, dict):
raise ConfigError(
f"Value 'reply' defining checks in '{name}' has type "
f"{type(reply).__name__} (expected dict)."
)
values = reply.get("value", None)
types = reply.get("type", None)
identical = reply.get("identical", None)
state = reply.get("state", None)
state_hash = reply.get("state_hash", None)
num_hosts_warning = check_dict.get("num_hosts_warning", None)
if not (values or types or identical or state or state_hash):
logger.info(
f"In {self.name}.conf '{name}' has a 'reply' block, but it's empty."
)
return checks
if values:
checks.append(
ValueReplyCheck(
name,
values,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if types:
checks.append(
TypeReplyCheck(
name,
types,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if identical:
checks.append(
IdenticalReplyCheck(
name,
identical,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state:
checks.append(
StateReplyCheck(
name,
state,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state_hash:
checks.append(
StateHashReplyCheck(
name,
state_hash,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
return checks
async def call(self, request, hosts=None, params=None):
"""
Call the endpoint.
Returns
-------
:class:`Result`
The result of the endpoint call.
"""
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint_BASE_1678491:33
==coco.endpoint_REMOTE_1678491:32
logger = logging.getLogger(__name__)
class Endpoint:
"""
An endpoint.
Does whatever the config says.
"""
def __init__(self, name, conf, forwarder, state):
logger.debug(f"Loading {name}.conf")
self.name = name
if conf is None:
conf = dict()
self.description = conf.get("description", "")
self.type = conf.get("type", "GET")
self.group = conf.get("group")
self.callable = conf.get("callable", False)
self.call_on_start = conf.get("call_on_start", False)
self.forwarder = forwarder
self.state = state
self.report_type = conf.get("report_type", "CODES_OVERVIEW")
self.values = copy(conf.get("values", None))
self.get_state = conf.get("get_state", None)
self.send_state = conf.get("send_state", None)
self.save_state = conf.get("save_state", None)
self.set_state = conf.get("set_state", None)
self.schedule = conf.get("schedule", None)
self.enforce_group = bool(conf.get("enforce_group", False))
self.forward_checks = dict()
# Setup the endpoint logger
self.logger = logging.getLogger(f"{__name__}.{self.name}")
if self.values:
for key, value in self.values.items():
self.values[key] = locate(value)
if self.values[key] is None:
raise RuntimeError(
f"Value {key} of endpoint {name} is of unknown type "
f"{value}."
)
if not self.state:
return
# To hold forward calls: first external ones than internal (coco) endpoints.
self.has_external_forwards = False
self._load_calls(conf.get("call", None))
self.before = list()
self.after = list()
self._load_internal_forward(conf.get("before"), self.before)
self._load_internal_forward(conf.get("after"), self.after)
self.timestamp_path = conf.get("timestamp", None)
if self.timestamp_path:
if self.state.find_or_create(self.timestamp_path):
logger.info(
f"`{self.timestamp_path}` is not empty. /{name} will overwrite "
f"it with timestamps."
)
if self.save_state:
if isinstance(self.save_state, str):
self.save_state = [self.save_state]
# Check if state path exists
for save_state in self.save_state:
path = self.state.find_or_create(save_state)
if not path:
self.logger.debug(
f"state path `{save_state}` configured in `save_state` for "
f"endpoint `{name}` is empty."
)
# If save_state is set, the configured values have to match.
if self.values:
# Check if endpoint value types match the associated part of the saved state
for key in self.values.keys():
try:
if not (
path[key] is None
or isinstance(path[key], self.values[key])
):
raise RuntimeError(
f"Value {key} in configured initial state at /{save_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
except KeyError:
# That the values are being saved in the state doesn't mean they need to
# exist in the initially loaded state, but write a debug line.
self.logger.debug(
f"Value {key} not found in configured initial state at "
f"/{save_state}/."
)
except TypeError as e:
raise ConfigError(
f"Value {key} has unknown type {self.values[key]} in "
f"config of endpoint /{self.name}."
) from e
else:
self.logger.warning(
f"{self.name}.conf has set save_state ({save_state}), but no "
f"values are listed. This endpoint will ignore all data sent to it."
)
# If send_state is set, the configured values have to match.
if self.send_state:
# Check if state path exists
path = self.state.find_or_create(self.send_state)
if not path:
self.logger.warning(
f"state path `{self.send_state}` configured in "
f"`send_state` for endpoint `{name}` is empty."
)
if self.values:
# Check if endpoint value types match the associated part of the send_state
for key in self.values.keys():
try:
if not isinstance(path[key], self.values[key]):
raise RuntimeError(
f"Value {key} in configured initial state at /{self.send_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
# It exists both in the values and the state
self.logger.debug(
f"Value {key} is required by this endpoint so it will never "
f"get sent from state (the key was found in both `values` "
f"and in `send_state`)."
)
# TODO: Add an option to overwrite values only if present in request?
except KeyError:
# That the values are being sent from the state doesn't mean they need to
# exist in the value list.
pass
# Check if get state path exists
if self.get_state:
path = self.state.find_or_create(self.get_state)
if not path:
self.logger.warning(
f"state path `{self.get_state}` configured in "
f"`get_state` for endpoint `{name}` is empty."
)
def _load_internal_forward(self, dict_, list_):
"""
Load Forward's from the config dictionary, generate objects and place in list.
Parameters
----------
dict_ : Dict, str, List[Dict] or List[str]
Config dict(s) describing an internal forward or just string(s) with endpoint name.
list_ : List[CocoForward]
The list to save the Forward objects in.
"""
if not dict_:
return
if not isinstance(dict_, list):
dict_ = [dict_]
for f in dict_:
if isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Found and internal forwarding block in {self.name}.cong that is missing "
f"field 'name'."
) from e
try:
request = f.pop("request")
except KeyError:
request = None
list_.append(
CocoForward(
name, self.forwarder, None, request, self._load_checks(f)
)
)
else:
if not isinstance(f, str):
raise ConfigError(
f"Found '{type(f)}' in {self.name}.conf in an internal forwarding block "
f"(expected str or dict)."
)
list_.append(CocoForward(f, self.forwarder, None, None, None))
def _load_calls(self, forward_dict):
"""Parse the dict from forwarding config and save the Forward objects."""
self.forwards_external = list()
self.forwards_internal = list()
if forward_dict is None:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. Or "
f"it needs to set 'call: forward: null'."
)
self.forwards_external.append(
ExternalForward(self.name, self.forwarder, self.group, None, None)
)
self.has_external_forwards = True
else:
# External forwards
forward_ext = forward_dict.get("forward", [self.name])
# could be a string or list(str):
if forward_ext:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. "
f"Or it needs to set 'call: forward: null'."
)
if not isinstance(forward_ext, list):
forward_ext = [forward_ext]
for f in forward_ext:
if isinstance(f, str):
self.forwards_external.append(
ExternalForward(f, self.forwarder, self.group, None, None)
)
# could also be a block where there are checks configured for each forward call
elif isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Entry in forward call from "
f"/{self.name} is missing field 'name'."
) from e
self.forwards_external.append(
ExternalForward(
name,
self.forwarder,
self.group,
None,
self._load_checks(f),
)
)
self.has_external_forwards = True
# Internal forwards
forward_to_coco = forward_dict.get("coco", None)
self._load_internal_forward(forward_to_coco, self.forwards_internal)
def _load_checks(self, check_dict: Dict) -> List[Check]:
checks = list()
if not check_dict:
return checks
try:
name = check_dict["name"]
except KeyError as e:
raise ConfigError(
f"Name missing from forward reply check block: {check_dict}."
) from e
save_to_state = check_dict.get("save_reply_to_state", None)
if save_to_state:
if not isinstance(save_to_state, str):
raise ConfigError(
f"'save_reply_to_state' in check for '{name}' in '{self.name}"
f".conf' is of type '{type(save_to_state).__name__}' "
f"(expected str)."
)
logger.debug(
f"Endpoint {self.name} will save replies to state: {save_to_state}."
)
on_failure = check_dict.get("on_failure", None)
if on_failure:
for action, endpoint in on_failure.items():
if not isinstance(endpoint, str):
raise ConfigError(
f"'on_failure'-endpoint in forward to '{name}' in "
f"'{self.name}.conf' is of type "
f"'{type(endpoint).__name__}' (expected str)."
)
if action not in ON_FAILURE_ACTIONS:
raise ConfigError(
f"Unknown 'on_failure'-action in '{name}' ('{self.name}."
f"conf'): {action}. Use one of {ON_FAILURE_ACTIONS}."
)
reply = check_dict.get("reply", None)
if reply:
if not isinstance(reply, dict):
raise ConfigError(
f"Value 'reply' defining checks in '{name}' has type "
f"{type(reply).__name__} (expected dict)."
)
values = reply.get("value", None)
types = reply.get("type", None)
identical = reply.get("identical", None)
state = reply.get("state", None)
state_hash = reply.get("state_hash", None)
num_hosts_warning = check_dict.get("num_hosts_warning", None)
if not (values or types or identical or state or state_hash):
logger.info(
f"In {self.name}.conf '{name}' has a 'reply' block, but it's empty."
)
return checks
if values:
checks.append(
ValueReplyCheck(
name,
values,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if types:
checks.append(
TypeReplyCheck(
name,
types,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if identical:
checks.append(
IdenticalReplyCheck(
name,
identical,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state:
checks.append(
StateReplyCheck(
name,
state,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state_hash:
checks.append(
StateHashReplyCheck(
name,
state_hash,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
return checks
async def call(self, request, hosts=None, params=None):
"""
Call the endpoint.
Returns
-------
:class:`Result`
The result of the endpoint call.
"""
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint_BASE_1678491:29
==coco.endpoint_LOCAL_1678491:30
ON_FAILURE_ACTIONS = ["call", "call_single_host"]
# Module level logger, note that there is also a class level, endpoint specific
# logger
logger = logging.getLogger(__name__)
class Endpoint:
"""
An endpoint.
Does whatever the config says.
"""
def __init__(self, name, conf, forwarder, state):
logger.debug(f"Loading {name}.conf")
self.name = name
if conf is None:
conf = dict()
self.description = conf.get("description", "")
self.type = conf.get("type", "GET")
self.group = conf.get("group")
self.callable = conf.get("callable", False)
self.call_on_start = conf.get("call_on_start", False)
self.forwarder = forwarder
self.state = state
self.report_type = conf.get("report_type", "CODES_OVERVIEW")
self.values = copy(conf.get("values", None))
self.get_state = conf.get("get_state", None)
self.send_state = conf.get("send_state", None)
self.save_state = conf.get("save_state", None)
self.set_state = conf.get("set_state", None)
self.schedule = conf.get("schedule", None)
self.enforce_group = bool(conf.get("enforce_group", False))
self.forward_checks = dict()
# Setup the endpoint logger
self.logger = logging.getLogger(f"{__name__}.{self.name}")
if self.values:
for key, value in self.values.items():
self.values[key] = locate(value)
if self.values[key] is None:
raise RuntimeError(
f"Value {key} of endpoint {name} is of unknown type "
f"{value}."
)
if not self.state:
return
# To hold forward calls: first external ones than internal (coco) endpoints.
self.has_external_forwards = False
self._load_calls(conf.get("call", None))
self.before = list()
self.after = list()
self._load_internal_forward(conf.get("before"), self.before)
self._load_internal_forward(conf.get("after"), self.after)
self.timestamp_path = conf.get("timestamp", None)
if self.timestamp_path:
if self.state.find_or_create(self.timestamp_path):
logger.info(
f"`{self.timestamp_path}` is not empty. /{name} will overwrite "
f"it with timestamps."
)
if self.save_state:
if isinstance(self.save_state, str):
self.save_state = [self.save_state]
# Check if state path exists
for save_state in self.save_state:
path = self.state.find_or_create(save_state)
if not path:
self.logger.debug(
f"state path `{save_state}` configured in `save_state` for "
f"endpoint `{name}` is empty."
)
# If save_state is set, the configured values have to match.
if self.values:
# Check if endpoint value types match the associated part of the saved state
for key in self.values.keys():
try:
if not (
path[key] is None
or isinstance(path[key], self.values[key])
):
raise RuntimeError(
f"Value {key} in configured initial state at /{save_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
except KeyError:
# That the values are being saved in the state doesn't mean they need to
# exist in the initially loaded state, but write a debug line.
self.logger.debug(
f"Value {key} not found in configured initial state at "
f"/{save_state}/."
)
except TypeError as e:
raise ConfigError(
f"Value {key} has unknown type {self.values[key]} in "
f"config of endpoint /{self.name}."
) from e
else:
self.logger.warning(
f"{self.name}.conf has set save_state ({save_state}), but no "
f"values are listed. This endpoint will ignore all data sent to it."
)
# If send_state is set, the configured values have to match.
if self.send_state:
# Check if state path exists
path = self.state.find_or_create(self.send_state)
if not path:
self.logger.warning(
f"state path `{self.send_state}` configured in "
f"`send_state` for endpoint `{name}` is empty."
)
if self.values:
# Check if endpoint value types match the associated part of the send_state
for key in self.values.keys():
try:
if not isinstance(path[key], self.values[key]):
raise RuntimeError(
f"Value {key} in configured initial state at /{self.send_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
# It exists both in the values and the state
self.logger.debug(
f"Value {key} is required by this endpoint so it will never "
f"get sent from state (the key was found in both `values` "
f"and in `send_state`)."
)
# TODO: Add an option to overwrite values only if present in request?
except KeyError:
# That the values are being sent from the state doesn't mean they need to
# exist in the value list.
pass
# Check if get state path exists
if self.get_state:
path = self.state.find_or_create(self.get_state)
if not path:
self.logger.warning(
f"state path `{self.get_state}` configured in "
f"`get_state` for endpoint `{name}` is empty."
)
def _load_internal_forward(self, dict_, list_):
"""
Load Forward's from the config dictionary, generate objects and place in list.
Parameters
----------
dict_ : Dict, str, List[Dict] or List[str]
Config dict(s) describing an internal forward or just string(s) with endpoint name.
list_ : List[CocoForward]
The list to save the Forward objects in.
"""
if not dict_:
return
if not isinstance(dict_, list):
dict_ = [dict_]
for f in dict_:
if isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Found and internal forwarding block in {self.name}.cong that is missing "
f"field 'name'."
) from e
try:
request = f.pop("request")
except KeyError:
request = None
list_.append(
CocoForward(
name, self.forwarder, None, request, self._load_checks(f)
)
)
else:
if not isinstance(f, str):
raise ConfigError(
f"Found '{type(f)}' in {self.name}.conf in an internal forwarding block "
f"(expected str or dict)."
)
list_.append(CocoForward(f, self.forwarder, None, None, None))
def _load_calls(self, forward_dict):
"""Parse the dict from forwarding config and save the Forward objects."""
self.forwards_external = list()
self.forwards_internal = list()
if forward_dict is None:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. Or "
f"it needs to set 'call: forward: null'."
)
self.forwards_external.append(
ExternalForward(self.name, self.forwarder, self.group, None, None)
)
self.has_external_forwards = True
else:
# External forwards
forward_ext = forward_dict.get("forward", [self.name])
# could be a string or list(str):
if forward_ext:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. "
f"Or it needs to set 'call: forward: null'."
)
if not isinstance(forward_ext, list):
forward_ext = [forward_ext]
for f in forward_ext:
if isinstance(f, str):
self.forwards_external.append(
ExternalForward(f, self.forwarder, self.group, None, None)
)
# could also be a block where there are checks configured for each forward call
elif isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Entry in forward call from "
f"/{self.name} is missing field 'name'."
) from e
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:30
==coco.endpoint_REMOTE_1678491:29
ON_FAILURE_ACTIONS = ["call", "call_single_host"]
# Module level logger, note that there is also a class level, endpoint specific logger
logger = logging.getLogger(__name__)
class Endpoint:
"""
An endpoint.
Does whatever the config says.
"""
def __init__(self, name, conf, forwarder, state):
logger.debug(f"Loading {name}.conf")
self.name = name
if conf is None:
conf = dict()
self.description = conf.get("description", "")
self.type = conf.get("type", "GET")
self.group = conf.get("group")
self.callable = conf.get("callable", False)
self.call_on_start = conf.get("call_on_start", False)
self.forwarder = forwarder
self.state = state
self.report_type = conf.get("report_type", "CODES_OVERVIEW")
self.values = copy(conf.get("values", None))
self.get_state = conf.get("get_state", None)
self.send_state = conf.get("send_state", None)
self.save_state = conf.get("save_state", None)
self.set_state = conf.get("set_state", None)
self.schedule = conf.get("schedule", None)
self.enforce_group = bool(conf.get("enforce_group", False))
self.forward_checks = dict()
# Setup the endpoint logger
self.logger = logging.getLogger(f"{__name__}.{self.name}")
if self.values:
for key, value in self.values.items():
self.values[key] = locate(value)
if self.values[key] is None:
raise RuntimeError(
f"Value {key} of endpoint {name} is of unknown type "
f"{value}."
)
if not self.state:
return
# To hold forward calls: first external ones than internal (coco) endpoints.
self.has_external_forwards = False
self._load_calls(conf.get("call", None))
self.before = list()
self.after = list()
self._load_internal_forward(conf.get("before"), self.before)
self._load_internal_forward(conf.get("after"), self.after)
self.timestamp_path = conf.get("timestamp", None)
if self.timestamp_path:
if self.state.find_or_create(self.timestamp_path):
logger.info(
f"`{self.timestamp_path}` is not empty. /{name} will overwrite "
f"it with timestamps."
)
if self.save_state:
if isinstance(self.save_state, str):
self.save_state = [self.save_state]
# Check if state path exists
for save_state in self.save_state:
path = self.state.find_or_create(save_state)
if not path:
self.logger.debug(
f"state path `{save_state}` configured in `save_state` for "
f"endpoint `{name}` is empty."
)
# If save_state is set, the configured values have to match.
if self.values:
# Check if endpoint value types match the associated part of the saved state
for key in self.values.keys():
try:
if not (
path[key] is None
or isinstance(path[key], self.values[key])
):
raise RuntimeError(
f"Value {key} in configured initial state at /{save_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
except KeyError:
# That the values are being saved in the state doesn't mean they need to
# exist in the initially loaded state, but write a debug line.
self.logger.debug(
f"Value {key} not found in configured initial state at "
f"/{save_state}/."
)
except TypeError as e:
raise ConfigError(
f"Value {key} has unknown type {self.values[key]} in "
f"config of endpoint /{self.name}."
) from e
else:
self.logger.warning(
f"{self.name}.conf has set save_state ({save_state}), but no "
f"values are listed. This endpoint will ignore all data sent to it."
)
# If send_state is set, the configured values have to match.
if self.send_state:
# Check if state path exists
path = self.state.find_or_create(self.send_state)
if not path:
self.logger.warning(
f"state path `{self.send_state}` configured in "
f"`send_state` for endpoint `{name}` is empty."
)
if self.values:
# Check if endpoint value types match the associated part of the send_state
for key in self.values.keys():
try:
if not isinstance(path[key], self.values[key]):
raise RuntimeError(
f"Value {key} in configured initial state at /{self.send_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
# It exists both in the values and the state
self.logger.debug(
f"Value {key} is required by this endpoint so it will never "
f"get sent from state (the key was found in both `values` "
f"and in `send_state`)."
)
# TODO: Add an option to overwrite values only if present in request?
except KeyError:
# That the values are being sent from the state doesn't mean they need to
# exist in the value list.
pass
# Check if get state path exists
if self.get_state:
path = self.state.find_or_create(self.get_state)
if not path:
self.logger.warning(
f"state path `{self.get_state}` configured in "
f"`get_state` for endpoint `{name}` is empty."
)
def _load_internal_forward(self, dict_, list_):
"""
Load Forward's from the config dictionary, generate objects and place in list.
Parameters
----------
dict_ : Dict, str, List[Dict] or List[str]
Config dict(s) describing an internal forward or just string(s) with endpoint name.
list_ : List[CocoForward]
The list to save the Forward objects in.
"""
if not dict_:
return
if not isinstance(dict_, list):
dict_ = [dict_]
for f in dict_:
if isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Found and internal forwarding block in {self.name}.cong that is missing "
f"field 'name'."
) from e
try:
request = f.pop("request")
except KeyError:
request = None
list_.append(
CocoForward(
name, self.forwarder, None, request, self._load_checks(f)
)
)
else:
if not isinstance(f, str):
raise ConfigError(
f"Found '{type(f)}' in {self.name}.conf in an internal forwarding block "
f"(expected str or dict)."
)
list_.append(CocoForward(f, self.forwarder, None, None, None))
def _load_calls(self, forward_dict):
"""Parse the dict from forwarding config and save the Forward objects."""
self.forwards_external = list()
self.forwards_internal = list()
if forward_dict is None:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. Or "
f"it needs to set 'call: forward: null'."
)
self.forwards_external.append(
ExternalForward(self.name, self.forwarder, self.group, None, None)
)
self.has_external_forwards = True
else:
# External forwards
forward_ext = forward_dict.get("forward", [self.name])
# could be a string or list(str):
if forward_ext:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. "
f"Or it needs to set 'call: forward: null'."
)
if not isinstance(forward_ext, list):
forward_ext = [forward_ext]
for f in forward_ext:
if isinstance(f, str):
self.forwards_external.append(
ExternalForward(f, self.forwarder, self.group, None, None)
)
# could also be a block where there are checks configured for each forward call
elif isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Entry in forward call from "
f"/{self.name} is missing field 'name'."
) from e
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:33
==coco.endpoint_BASE_1678491:33
logger = logging.getLogger(__name__)
class Endpoint:
"""
An endpoint.
Does whatever the config says.
"""
def __init__(self, name, conf, forwarder, state):
logger.debug(f"Loading {name}.conf")
self.name = name
if conf is None:
conf = dict()
self.description = conf.get("description", "")
self.type = conf.get("type", "GET")
self.group = conf.get("group")
self.callable = conf.get("callable", False)
self.call_on_start = conf.get("call_on_start", False)
self.forwarder = forwarder
self.state = state
self.report_type = conf.get("report_type", "CODES_OVERVIEW")
self.values = copy(conf.get("values", None))
self.get_state = conf.get("get_state", None)
self.send_state = conf.get("send_state", None)
self.save_state = conf.get("save_state", None)
self.set_state = conf.get("set_state", None)
self.schedule = conf.get("schedule", None)
self.enforce_group = bool(conf.get("enforce_group", False))
self.forward_checks = dict()
# Setup the endpoint logger
self.logger = logging.getLogger(f"{__name__}.{self.name}")
if self.values:
for key, value in self.values.items():
self.values[key] = locate(value)
if self.values[key] is None:
raise RuntimeError(
f"Value {key} of endpoint {name} is of unknown type "
f"{value}."
)
if not self.state:
return
# To hold forward calls: first external ones than internal (coco) endpoints.
self.has_external_forwards = False
self._load_calls(conf.get("call", None))
self.before = list()
self.after = list()
self._load_internal_forward(conf.get("before"), self.before)
self._load_internal_forward(conf.get("after"), self.after)
self.timestamp_path = conf.get("timestamp", None)
if self.timestamp_path:
if self.state.find_or_create(self.timestamp_path):
logger.info(
f"`{self.timestamp_path}` is not empty. /{name} will overwrite "
f"it with timestamps."
)
if self.save_state:
if isinstance(self.save_state, str):
self.save_state = [self.save_state]
# Check if state path exists
for save_state in self.save_state:
path = self.state.find_or_create(save_state)
if not path:
self.logger.debug(
f"state path `{save_state}` configured in `save_state` for "
f"endpoint `{name}` is empty."
)
# If save_state is set, the configured values have to match.
if self.values:
# Check if endpoint value types match the associated part of the saved state
for key in self.values.keys():
try:
if not (
path[key] is None
or isinstance(path[key], self.values[key])
):
raise RuntimeError(
f"Value {key} in configured initial state at /{save_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
except KeyError:
# That the values are being saved in the state doesn't mean they need to
# exist in the initially loaded state, but write a debug line.
self.logger.debug(
f"Value {key} not found in configured initial state at "
f"/{save_state}/."
)
except TypeError as e:
raise ConfigError(
f"Value {key} has unknown type {self.values[key]} in "
f"config of endpoint /{self.name}."
) from e
else:
self.logger.warning(
f"{self.name}.conf has set save_state ({save_state}), but no "
f"values are listed. This endpoint will ignore all data sent to it."
)
# If send_state is set, the configured values have to match.
if self.send_state:
# Check if state path exists
path = self.state.find_or_create(self.send_state)
if not path:
self.logger.warning(
f"state path `{self.send_state}` configured in "
f"`send_state` for endpoint `{name}` is empty."
)
if self.values:
# Check if endpoint value types match the associated part of the send_state
for key in self.values.keys():
try:
if not isinstance(path[key], self.values[key]):
raise RuntimeError(
f"Value {key} in configured initial state at /{self.send_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
# It exists both in the values and the state
self.logger.debug(
f"Value {key} is required by this endpoint so it will never "
f"get sent from state (the key was found in both `values` "
f"and in `send_state`)."
)
# TODO: Add an option to overwrite values only if present in request?
except KeyError:
# That the values are being sent from the state doesn't mean they need to
# exist in the value list.
pass
# Check if get state path exists
if self.get_state:
path = self.state.find_or_create(self.get_state)
if not path:
self.logger.warning(
f"state path `{self.get_state}` configured in "
f"`get_state` for endpoint `{name}` is empty."
)
def _load_internal_forward(self, dict_, list_):
"""
Load Forward's from the config dictionary, generate objects and place in list.
Parameters
----------
dict_ : Dict, str, List[Dict] or List[str]
Config dict(s) describing an internal forward or just string(s) with endpoint name.
list_ : List[CocoForward]
The list to save the Forward objects in.
"""
if not dict_:
return
if not isinstance(dict_, list):
dict_ = [dict_]
for f in dict_:
if isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Found and internal forwarding block in {self.name}.cong that is missing "
f"field 'name'."
) from e
try:
request = f.pop("request")
except KeyError:
request = None
list_.append(
CocoForward(
name, self.forwarder, None, request, self._load_checks(f)
)
)
else:
if not isinstance(f, str):
raise ConfigError(
f"Found '{type(f)}' in {self.name}.conf in an internal forwarding block "
f"(expected str or dict)."
)
list_.append(CocoForward(f, self.forwarder, None, None, None))
def _load_calls(self, forward_dict):
"""Parse the dict from forwarding config and save the Forward objects."""
self.forwards_external = list()
self.forwards_internal = list()
if forward_dict is None:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. Or "
f"it needs to set 'call: forward: null'."
)
self.forwards_external.append(
ExternalForward(self.name, self.forwarder, self.group, None, None)
)
self.has_external_forwards = True
else:
# External forwards
forward_ext = forward_dict.get("forward", [self.name])
# could be a string or list(str):
if forward_ext:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. "
f"Or it needs to set 'call: forward: null'."
)
if not isinstance(forward_ext, list):
forward_ext = [forward_ext]
for f in forward_ext:
if isinstance(f, str):
self.forwards_external.append(
ExternalForward(f, self.forwarder, self.group, None, None)
)
# could also be a block where there are checks configured for each forward call
elif isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Entry in forward call from "
f"/{self.name} is missing field 'name'."
) from e
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint_LOCAL_1678491:34
==coco.endpoint_REMOTE_1678491:32
logger = logging.getLogger(__name__)
class Endpoint:
"""
An endpoint.
Does whatever the config says.
"""
def __init__(self, name, conf, forwarder, state):
logger.debug(f"Loading {name}.conf")
self.name = name
if conf is None:
conf = dict()
self.description = conf.get("description", "")
self.type = conf.get("type", "GET")
self.group = conf.get("group")
self.callable = conf.get("callable", False)
self.call_on_start = conf.get("call_on_start", False)
self.forwarder = forwarder
self.state = state
self.report_type = conf.get("report_type", "CODES_OVERVIEW")
self.values = copy(conf.get("values", None))
self.get_state = conf.get("get_state", None)
self.send_state = conf.get("send_state", None)
self.save_state = conf.get("save_state", None)
self.set_state = conf.get("set_state", None)
self.schedule = conf.get("schedule", None)
self.enforce_group = bool(conf.get("enforce_group", False))
self.forward_checks = dict()
# Setup the endpoint logger
self.logger = logging.getLogger(f"{__name__}.{self.name}")
if self.values:
for key, value in self.values.items():
self.values[key] = locate(value)
if self.values[key] is None:
raise RuntimeError(
f"Value {key} of endpoint {name} is of unknown type "
f"{value}."
)
if not self.state:
return
# To hold forward calls: first external ones than internal (coco) endpoints.
self.has_external_forwards = False
self._load_calls(conf.get("call", None))
self.before = list()
self.after = list()
self._load_internal_forward(conf.get("before"), self.before)
self._load_internal_forward(conf.get("after"), self.after)
self.timestamp_path = conf.get("timestamp", None)
if self.timestamp_path:
if self.state.find_or_create(self.timestamp_path):
logger.info(
f"`{self.timestamp_path}` is not empty. /{name} will overwrite "
f"it with timestamps."
)
if self.save_state:
if isinstance(self.save_state, str):
self.save_state = [self.save_state]
# Check if state path exists
for save_state in self.save_state:
path = self.state.find_or_create(save_state)
if not path:
self.logger.debug(
f"state path `{save_state}` configured in `save_state` for "
f"endpoint `{name}` is empty."
)
# If save_state is set, the configured values have to match.
if self.values:
# Check if endpoint value types match the associated part of the saved state
for key in self.values.keys():
try:
if not (
path[key] is None
or isinstance(path[key], self.values[key])
):
raise RuntimeError(
f"Value {key} in configured initial state at /{save_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
except KeyError:
# That the values are being saved in the state doesn't mean they need to
# exist in the initially loaded state, but write a debug line.
self.logger.debug(
f"Value {key} not found in configured initial state at "
f"/{save_state}/."
)
except TypeError as e:
raise ConfigError(
f"Value {key} has unknown type {self.values[key]} in "
f"config of endpoint /{self.name}."
) from e
else:
self.logger.warning(
f"{self.name}.conf has set save_state ({save_state}), but no "
f"values are listed. This endpoint will ignore all data sent to it."
)
# If send_state is set, the configured values have to match.
if self.send_state:
# Check if state path exists
path = self.state.find_or_create(self.send_state)
if not path:
self.logger.warning(
f"state path `{self.send_state}` configured in "
f"`send_state` for endpoint `{name}` is empty."
)
if self.values:
# Check if endpoint value types match the associated part of the send_state
for key in self.values.keys():
try:
if not isinstance(path[key], self.values[key]):
raise RuntimeError(
f"Value {key} in configured initial state at /{self.send_state}/ "
f"has type {type(path[key]).__name__} "
f"(expected {self.values[key].__name__})."
)
# It exists both in the values and the state
self.logger.debug(
f"Value {key} is required by this endpoint so it will never "
f"get sent from state (the key was found in both `values` "
f"and in `send_state`)."
)
# TODO: Add an option to overwrite values only if present in request?
except KeyError:
# That the values are being sent from the state doesn't mean they need to
# exist in the value list.
pass
# Check if get state path exists
if self.get_state:
path = self.state.find_or_create(self.get_state)
if not path:
self.logger.warning(
f"state path `{self.get_state}` configured in "
f"`get_state` for endpoint `{name}` is empty."
)
def _load_internal_forward(self, dict_, list_):
"""
Load Forward's from the config dictionary, generate objects and place in list.
Parameters
----------
dict_ : Dict, str, List[Dict] or List[str]
Config dict(s) describing an internal forward or just string(s) with endpoint name.
list_ : List[CocoForward]
The list to save the Forward objects in.
"""
if not dict_:
return
if not isinstance(dict_, list):
dict_ = [dict_]
for f in dict_:
if isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Found and internal forwarding block in {self.name}.cong that is missing "
f"field 'name'."
) from e
try:
request = f.pop("request")
except KeyError:
request = None
list_.append(
CocoForward(
name, self.forwarder, None, request, self._load_checks(f)
)
)
else:
if not isinstance(f, str):
raise ConfigError(
f"Found '{type(f)}' in {self.name}.conf in an internal forwarding block "
f"(expected str or dict)."
)
list_.append(CocoForward(f, self.forwarder, None, None, None))
def _load_calls(self, forward_dict):
"""Parse the dict from forwarding config and save the Forward objects."""
self.forwards_external = list()
self.forwards_internal = list()
if forward_dict is None:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. Or "
f"it needs to set 'call: forward: null'."
)
self.forwards_external.append(
ExternalForward(self.name, self.forwarder, self.group, None, None)
)
self.has_external_forwards = True
else:
# External forwards
forward_ext = forward_dict.get("forward", [self.name])
# could be a string or list(str):
if forward_ext:
if self.group is None:
raise ConfigError(
f"'{self.name}.conf' is missing config option 'group'. "
f"Or it needs to set 'call: forward: null'."
)
if not isinstance(forward_ext, list):
forward_ext = [forward_ext]
for f in forward_ext:
if isinstance(f, str):
self.forwards_external.append(
ExternalForward(f, self.forwarder, self.group, None, None)
)
# could also be a block where there are checks configured for each forward call
elif isinstance(f, dict):
try:
name = f["name"]
except KeyError as e:
raise ConfigError(
f"Entry in forward call from "
f"/{self.name} is missing field 'name'."
) from e
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:279
==coco.endpoint_BASE_1678491:274
)
)
self.has_external_forwards = True
# Internal forwards
forward_to_coco = forward_dict.get("coco", None)
self._load_internal_forward(forward_to_coco, self.forwards_internal)
def _load_checks(self, check_dict: Dict) -> List[Check]:
checks = list()
if not check_dict:
return checks
try:
name = check_dict["name"]
except KeyError as e:
raise ConfigError(
f"Name missing from forward reply check block: {check_dict}."
) from e
save_to_state = check_dict.get("save_reply_to_state", None)
if save_to_state:
if not isinstance(save_to_state, str):
raise ConfigError(
f"'save_reply_to_state' in check for '{name}' in '{self.name}"
f".conf' is of type '{type(save_to_state).__name__}' "
f"(expected str)."
)
logger.debug(
f"Endpoint {self.name} will save replies to state: {save_to_state}."
)
on_failure = check_dict.get("on_failure", None)
if on_failure:
for action, endpoint in on_failure.items():
if not isinstance(endpoint, str):
raise ConfigError(
f"'on_failure'-endpoint in forward to '{name}' in "
f"'{self.name}.conf' is of type "
f"'{type(endpoint).__name__}' (expected str)."
)
if action not in ON_FAILURE_ACTIONS:
raise ConfigError(
f"Unknown 'on_failure'-action in '{name}' ('{self.name}."
f"conf'): {action}. Use one of {ON_FAILURE_ACTIONS}."
)
reply = check_dict.get("reply", None)
if reply:
if not isinstance(reply, dict):
raise ConfigError(
f"Value 'reply' defining checks in '{name}' has type "
f"{type(reply).__name__} (expected dict)."
)
values = reply.get("value", None)
types = reply.get("type", None)
identical = reply.get("identical", None)
state = reply.get("state", None)
state_hash = reply.get("state_hash", None)
num_hosts_warning = check_dict.get("num_hosts_warning", None)
if not (values or types or identical or state or state_hash):
logger.info(
f"In {self.name}.conf '{name}' has a 'reply' block, but it's empty."
)
return checks
if values:
checks.append(
ValueReplyCheck(
name,
values,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if types:
checks.append(
TypeReplyCheck(
name,
types,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if identical:
checks.append(
IdenticalReplyCheck(
name,
identical,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state:
checks.append(
StateReplyCheck(
name,
state,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state_hash:
checks.append(
StateHashReplyCheck(
name,
state_hash,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
return checks
async def call(self, request, hosts=None, params=None):
"""
Call the endpoint.
Returns
-------
:class:`Result`
The result of the endpoint call.
"""
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint_LOCAL_1678491:280
==coco.endpoint_REMOTE_1678491:273
)
)
self.has_external_forwards = True
# Internal forwards
forward_to_coco = forward_dict.get("coco", None)
self._load_internal_forward(forward_to_coco, self.forwards_internal)
def _load_checks(self, check_dict: Dict) -> List[Check]:
checks = list()
if not check_dict:
return checks
try:
name = check_dict["name"]
except KeyError as e:
raise ConfigError(
f"Name missing from forward reply check block: {check_dict}."
) from e
save_to_state = check_dict.get("save_reply_to_state", None)
if save_to_state:
if not isinstance(save_to_state, str):
raise ConfigError(
f"'save_reply_to_state' in check for '{name}' in '{self.name}"
f".conf' is of type '{type(save_to_state).__name__}' "
f"(expected str)."
)
logger.debug(
f"Endpoint {self.name} will save replies to state: {save_to_state}."
)
on_failure = check_dict.get("on_failure", None)
if on_failure:
for action, endpoint in on_failure.items():
if not isinstance(endpoint, str):
raise ConfigError(
f"'on_failure'-endpoint in forward to '{name}' in "
f"'{self.name}.conf' is of type "
f"'{type(endpoint).__name__}' (expected str)."
)
if action not in ON_FAILURE_ACTIONS:
raise ConfigError(
f"Unknown 'on_failure'-action in '{name}' ('{self.name}."
f"conf'): {action}. Use one of {ON_FAILURE_ACTIONS}."
)
reply = check_dict.get("reply", None)
if reply:
if not isinstance(reply, dict):
raise ConfigError(
f"Value 'reply' defining checks in '{name}' has type "
f"{type(reply).__name__} (expected dict)."
)
values = reply.get("value", None)
types = reply.get("type", None)
identical = reply.get("identical", None)
state = reply.get("state", None)
state_hash = reply.get("state_hash", None)
num_hosts_warning = check_dict.get("num_hosts_warning", None)
if not (values or types or identical or state or state_hash):
logger.info(
f"In {self.name}.conf '{name}' has a 'reply' block, but it's empty."
)
return checks
if values:
checks.append(
ValueReplyCheck(
name,
values,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if types:
checks.append(
TypeReplyCheck(
name,
types,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if identical:
checks.append(
IdenticalReplyCheck(
name,
identical,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state:
checks.append(
StateReplyCheck(
name,
state,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
if state_hash:
checks.append(
StateHashReplyCheck(
name,
state_hash,
on_failure,
save_to_state,
self.forwarder,
self.state,
num_hosts_warning,
)
)
return checks
async def call(self, request, hosts=None, params=None):
"""
Call the endpoint.
Returns
-------
:class:`Result`
The result of the endpoint call.
"""
message: Similar lines in 2 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:2
==coco.endpoint_LOCAL_1678491:2
import asyncio
import logging
from copy import copy
import time
from typing import Optional, Callable, Union, List, Dict
import json
from pydoc import locate
from aiohttp import (
ClientSession,
ContentTypeError,
)
import sanic
from .result import Result
from .request_forwarder import ExternalForward, CocoForward
from . import metric
from .check import (
Check,
ValueReplyCheck,
TypeReplyCheck,
IdenticalReplyCheck,
StateHashReplyCheck,
StateReplyCheck,
)
from .exceptions import ConfigError, InvalidUsage
from .util import str2total_seconds
ON_FAILURE_ACTIONS = ["call", "call_single_host"]
# Module level logger, note that there is also a class level, endpoint specific
message: Similar lines in 3 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:2
==coco.endpoint_BASE_1678491:2
==coco.endpoint_LOCAL_1678491:2
import asyncio
import logging
from copy import copy
import time
from typing import Optional, Callable, Union, List, Dict
import json
from pydoc import locate
from aiohttp import (
ClientSession,
ContentTypeError,
)
import sanic
from .result import Result
from .request_forwarder import ExternalForward, CocoForward
from . import metric
from .check import (
Check,
ValueReplyCheck,
TypeReplyCheck,
IdenticalReplyCheck,
StateHashReplyCheck,
StateReplyCheck,
)
from .exceptions import ConfigError, InvalidUsage
message: Similar lines in 4 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:468
==coco.endpoint_BASE_1678491:457
==coco.endpoint_LOCAL_1678491:463
==coco.endpoint_REMOTE_1678491:462
if self.send_state:
send_state = self.state.read(self.send_state)
if filtered_request:
send_state.update(filtered_request)
filtered_request = send_state
# Forward the request to group and then to other coco endpoints
# TODO: should we do that concurrently?
for forward in self.forwards_external:
result_forward = await forward.trigger(
self.type, filtered_request, hosts, params
)
result.add_result(result_forward)
for forward in self.forwards_internal:
result_forward = await forward.trigger(
self.type, filtered_request, hosts, params
)
result.embed(forward.name, result_forward)
# Look for result type parameter in request
if request:
result.type = request.pop("coco_report_type", self.report_type)
else:
result.type = self.report_type
# Report any additional values in the request
if request:
for key in request.keys():
msg = f"Found additional value '{key}' in request to /{self.name}."
self.logger.warning(msg)
result.add_message(msg)
if self.after:
for forward in self.after:
result_forward = await forward.trigger(self.type, {}, hosts)
result.embed(forward.name, result_forward)
# TODO: run these concurrently?
if self.get_state:
result.state(self.state.extract(self.get_state))
if result.success:
if self.set_state:
for path, value in self.set_state.items():
self.state.write(path, value)
self.write_timestamp()
self.logger.debug("Success!")
return result
def write_timestamp(self):
"""
Write a Unix timestamp (float) to the state.
Does nothing if the endpoint doesn't have a path specified in `timestamp`.
"""
if not self.timestamp_path:
return
self.state.write(self.timestamp_path, time.time())
self.logger.debug(
f"/{self.name} saved timestamp to state: {self.timestamp_path}"
)
def client_call(self, host, port, metrics_port, args):
"""
Call from a client.
Send a request to coco daemon at <host>. Return the reply as json or an error string.
Parameters
----------
host : str
Address of coco daemon.
port : int
Port of coco daemon.
metrics_port : int
Port of the prometheus server
args : :class:`Namespace`
Is expected to include all values of the endpoint.
"""
data = copy(self.values)
if data:
for key, type_ in data.items():
data[key] = self._parse_container_arg(key, type_, vars(args)[key])
else:
data = dict()
args.endpoint = self.name
args.type = self.type
args.data = data
return self.client_send_request(host, port, metrics_port, args)
@staticmethod
def client_send_request(host, port, metrics_port, args):
"""
Send a request to an endpoint.
Parameters
----------
host : str
Host.
port : int
Port.
metrics_port : int
Port of the prometheus server
endpoint : str
Endpoint name.
type : str
HTTP request type.
data : json
JSON data.
args : :class:`argparse.Namespace`
Namespace populated by argparse. May contain the report type
(`report : str`), the refresh time for the client in seconds
('client-refresh-time' : int) and ('silent' : boolean) to suppress printing
anything but the result.
Returns
-------
bool
Success
json or str
The reply
"""
data = args.data
endpoint = args.endpoint
type_ = args.type
data["coco_report_type"] = args.report
async def print_queue_size(metric_request_count):
try:
q_size = await metric.get("coco_queue_length_total", metrics_port, host)
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
print(f"Couldn't get queue fill level from cocod: {err}")
return
print(
f"\rThere are {int(q_size)} requests in the queue.",
sep=" ",
end="",
flush=True,
)
if metric_request_count % 2:
print(" ", sep=" ", end="", flush=True)
else:
print(".", sep=" ", end="", flush=True)
async def send_request():
url = f"http://{host}:{port}/{endpoint}"
if not args.silent:
print("Sending request...")
async with ClientSession() as session:
try:
command = getattr(session, type_.lower())
async with command(url, json=data) as resp:
try:
result = await resp.json()
except ContentTypeError:
result = {"Error": await resp.text()}
except Exception as e:
return False, f"coco-client: Sending request failed: {e}"
else:
return True, result
async def request_and_wait():
"""
Send the request and while waiting get and print queue fill level metric.
Returns
-------
Done task for request result.
"""
main_request = asyncio.create_task(send_request())
if not args.silent:
metric_request_count = 0
while True:
metric_request_count = metric_request_count + 1
queue_size = asyncio.create_task(
print_queue_size(metric_request_count)
)
# Wait until either main request or request for metric is done
done, _ = await asyncio.wait(
{main_request, queue_size}, return_when=asyncio.FIRST_COMPLETED
)
if main_request in done:
queue_size.cancel()
print("\n")
break
# Wait a moment before getting metric again
wait = asyncio.create_task(asyncio.sleep(args.client_refresh_time))
# Cancel waiting in case main request is done
done, _ = await asyncio.wait(
{main_request, wait}, return_when=asyncio.FIRST_COMPLETED
)
if main_request in done:
wait.cancel()
print("\n")
break
return await main_request
return asyncio.run(request_and_wait())
@staticmethod
def _parse_container_arg(key, type_, arg):
if type_ in (list, dict):
try:
value = json.loads(arg)
except json.JSONDecodeError as e:
raise InvalidUsage(f"Failure parsing argument '{key}': {e}") from e
return value
return arg
class LocalEndpoint:
"""An endpoint that will execute a callable solely within coco.
Parameters
----------
name
Endpoint name.
type_
Type of request to accept. Either a string ("POST", ...) or a list of
strings.
callable
A callable that will be called to execute the endpoint.
"""
call_on_start = False
def __init__(
self,
name: str,
type_: Union[str, List[str]],
callable: Callable[[sanic.request.Request], Optional[dict]],
):
self.name = name
self.type = type_
self.callable = callable
self.schedule = None
async def call(self, request, **_):
"""Call the local endpoint."""
return await self.callable(request)
message: Similar lines in 4 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:418
==coco.endpoint_BASE_1678491:413
==coco.endpoint_LOCAL_1678491:419
==coco.endpoint_REMOTE_1678491:412
if params is None:
params = []
if self.enforce_group:
hosts = None
result = Result(self.name)
if self.before:
for forward in self.before:
result_forward = await forward.trigger(self.type, {}, hosts)
result.embed(forward.name, result_forward)
# TODO: run these concurrently?
# Only forward values we expect
filtered_request = copy(self.values)
if request is None:
request = dict()
if filtered_request:
for key, value in filtered_request.items():
try:
if not isinstance(request[key], value):
msg = (
f"{self.name} received value '{key}'' of type "
f"{type(request[key]).__name__} (expected {value.__name__})."
)
self.logger.warning(msg)
raise InvalidUsage(msg)
except KeyError as e:
msg = f"{self.name} requires value '{key}'."
self.logger.warning(msg)
raise InvalidUsage(msg) from e
# save the state change:
if self.save_state:
for path in self.save_state:
self.state.write(path, request.get(key), key)
filtered_request[key] = request.pop(key)
message: Similar lines in 4 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:2
==coco.endpoint_BASE_1678491:2
==coco.endpoint_LOCAL_1678491:2
==coco.endpoint_REMOTE_1678491:2
import asyncio
import logging
from copy import copy
import time
from typing import Optional, Callable, Union, List, Dict
import json
from pydoc import locate
from aiohttp import (
ClientSession,
ContentTypeError,
)
import sanic
from .result import Result
from .request_forwarder import ExternalForward, CocoForward
from . import metric
from .check import (
Check,
ValueReplyCheck,
TypeReplyCheck,
IdenticalReplyCheck,
StateHashReplyCheck,
StateReplyCheck,
)
message: Similar lines in 4 files
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
==coco.endpoint:271
==coco.endpoint_BASE_1678491:267
==coco.endpoint_LOCAL_1678491:272
==coco.endpoint_REMOTE_1678491:266
self.forwards_external.append(
ExternalForward(
name,
self.forwarder,
self.group,
None,
self._load_checks(f),
message: Too few public methods (1/2)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
105:
106:
> 107: class Timer:
108: """Asynchronous timer."""
109:
message: Too few public methods (0/2)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
107:
108:
> 109: class DefaultValue:
110: """Tag a config node with a default value."""
111:
message: Too few public methods (0/2)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
114:
115:
> 116: class RequiredValue:
117: """Tag a config node as being required."""
118:
message: Too few public methods (1/2)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
212:
213:
> 214: class SlackLogFilter(logging.Filter):
215: """Filter to allow slack messaging only when requested.
216:
message: Too few public methods (1/2)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
670:
671:
> 672: class LocalEndpoint:
673: """An endpoint that will execute a callable solely within coco.
674:
message: Too few public methods (1/2)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
675:
676:
> 677: class LocalEndpoint:
678: """An endpoint that will execute a callable solely within coco.
679:
message: Too few public methods (1/2)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
676:
677:
> 678: class LocalEndpoint:
679: """An endpoint that will execute a callable solely within coco.
680:
message: Too few public methods (1/2)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
681:
682:
> 683: class LocalEndpoint:
684: """An endpoint that will execute a callable solely within coco.
685:
message: Too many branches (25/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
36: class Endpoint:
...
41: """
42:
> 43: def __init__(self, name, conf, forwarder, state):
44: logger.debug(f"Loading {name}.conf")
45: self.name = name
message: Too many branches (25/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
37: class Endpoint:
...
42: """
43:
> 44: def __init__(self, name, conf, forwarder, state):
45: logger.debug(f"Loading {name}.conf")
46: self.name = name
message: Too many branches (25/12)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
37: class Endpoint:
...
42: """
43:
> 44: def __init__(self, name, conf, forwarder, state):
45: logger.debug(f"Loading {name}.conf")
46: self.name = name
message: Too many branches (25/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
38: class Endpoint:
...
43: """
44:
> 45: def __init__(self, name, conf, forwarder, state):
46: logger.debug(f"Loading {name}.conf")
47: self.name = name
message: Too many branches (19/12)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
54: def main_loop(
...
68: """
69:
> 70: async def go():
71:
72: # start the prometheus server for forwarded requests
message: Too many branches (22/12)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
10: class Result:
...
249: self._state.update(state)
250:
> 251: def report(self, report_type=None):
252: """
253: Generate a report.
message: Too many branches (16/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
36: class Endpoint:
...
280: self._load_internal_forward(forward_to_coco, self.forwards_internal)
281:
> 282: def _load_checks(self, check_dict: Dict) -> List[Check]:
283: checks = list()
284: if not check_dict:
message: Too many branches (16/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
37: class Endpoint:
...
281: self._load_internal_forward(forward_to_coco, self.forwards_internal)
282:
> 283: def _load_checks(self, check_dict: Dict) -> List[Check]:
284: checks = list()
285: if not check_dict:
message: Too many branches (16/12)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
37: class Endpoint:
...
286: self._load_internal_forward(forward_to_coco, self.forwards_internal)
287:
> 288: def _load_checks(self, check_dict: Dict) -> List[Check]:
289: checks = list()
290: if not check_dict:
message: Too many branches (16/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
38: class Endpoint:
...
287: self._load_internal_forward(forward_to_coco, self.forwards_internal)
288:
> 289: def _load_checks(self, check_dict: Dict) -> List[Check]:
290: checks = list()
291: if not check_dict:
message: Too many branches (18/12)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
334: class StateReplyCheck(ReplyCheck):
...
380: )
381:
> 382: async def run(self, result: Result):
383: """
384: Run the check on the given reply.
message: Too many branches (27/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
36: class Endpoint:
...
400: return checks
401:
> 402: async def call(self, request, hosts=None, params=None):
403: """
404: Call the endpoint.
message: Too many branches (25/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
37: class Endpoint:
...
401: return checks
402:
> 403: async def call(self, request, hosts=None, params=None):
404: """
405: Call the endpoint.
message: Too many branches (27/12)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
37: class Endpoint:
...
406: return checks
407:
> 408: async def call(self, request, hosts=None, params=None):
409: """
410: Call the endpoint.
message: Too many branches (25/12)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
38: class Endpoint:
...
407: return checks
408:
> 409: async def call(self, request, hosts=None, params=None):
410: """
411: Call the endpoint.
message: Too many statements (69/50)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
36: class Endpoint:
...
41: """
42:
> 43: def __init__(self, name, conf, forwarder, state):
44: logger.debug(f"Loading {name}.conf")
45: self.name = name
message: Too many statements (69/50)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
37: class Endpoint:
...
42: """
43:
> 44: def __init__(self, name, conf, forwarder, state):
45: logger.debug(f"Loading {name}.conf")
46: self.name = name
message: Too many statements (69/50)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
37: class Endpoint:
...
42: """
43:
> 44: def __init__(self, name, conf, forwarder, state):
45: logger.debug(f"Loading {name}.conf")
46: self.name = name
message: Too many statements (69/50)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
38: class Endpoint:
...
43: """
44:
> 45: def __init__(self, name, conf, forwarder, state):
46: logger.debug(f"Loading {name}.conf")
47: self.name = name
message: Too many statements (74/50)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
52:
53:
> 54: def main_loop(
55: endpoints, forwarder, coco_port, metrics_port, log_level, frontend_timeout
56: ):
message: Too many statements (67/50)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
54: def main_loop(
...
68: """
69:
> 70: async def go():
71:
72: # start the prometheus server for forwarded requests
message: Too many statements (58/50)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
36: class Endpoint:
...
400: return checks
401:
> 402: async def call(self, request, hosts=None, params=None):
403: """
404: Call the endpoint.
message: Too many statements (54/50)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
37: class Endpoint:
...
401: return checks
402:
> 403: async def call(self, request, hosts=None, params=None):
404: """
405: Call the endpoint.
message: Too many statements (58/50)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
37: class Endpoint:
...
406: return checks
407:
> 408: async def call(self, request, hosts=None, params=None):
409: """
410: Call the endpoint.
message: Too many statements (54/50)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
38: class Endpoint:
...
407: return checks
408:
> 409: async def call(self, request, hosts=None, params=None):
410: """
411: Call the endpoint.
message: Too many nested blocks (6/5)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
43: def __init__(self, name, conf, forwarder, state):
...
95: )
96:
> 97: if self.save_state:
98: if isinstance(self.save_state, str):
99: self.save_state = [self.save_state]
message: Too many nested blocks (6/5)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
44: def __init__(self, name, conf, forwarder, state):
...
96: )
97:
> 98: if self.save_state:
99: if isinstance(self.save_state, str):
100: self.save_state = [self.save_state]
message: Too many nested blocks (6/5)
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
44: def __init__(self, name, conf, forwarder, state):
...
96: )
97:
> 98: if self.save_state:
99: if isinstance(self.save_state, str):
100: self.save_state = [self.save_state]
message: Too many nested blocks (6/5)
author : Rick Nitsche <[email protected]>
date : 2021-06-18T12:19:05
45: def __init__(self, name, conf, forwarder, state):
...
97: )
98:
> 99: if self.save_state:
100: if isinstance(self.save_state, str):
101: self.save_state = [self.save_state]
message: Consider using sys.exit()
author : Rick Nitsche <[email protected]>
date : 2021-04-15T18:11:09
54: def main_loop(
...
86: if name == "coco_shutdown":
87: logger.info("coco.worker: Received shutdown command. Exiting...")
> 88: exit(0)
89:
90: # Use the name to get all info on the call and delete from redis.