Skip to content

Commit

Permalink
Fixing things up in preparation for larger changes
Browse files Browse the repository at this point in the history
+ Extends are now concurrent by default
+ Color codes are stripped from log files
+ Broker now respects its own debug setting
+ Session command timeouts are now flexible
Fixes #133
Fixes #135
  • Loading branch information
JacobCallahan committed Sep 29, 2021
1 parent ffe354c commit 136bb56
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 29 deletions.
8 changes: 8 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
History
=======

0.1.25 (2021-09-30)
------------------

+ Extends are now concurrent by default
+ Color codes are stripped from log files
+ Broker now respects its own debug setting
+ Session command timeouts are now flexible

0.1.24 (2021-09-16)
------------------

Expand Down
56 changes: 38 additions & 18 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def checkin(self, sequential=False, host=None):
hosts = [hosts]

if not hosts:
logger.debug('Checkin called with no hosts, taking no action')
logger.debug("Checkin called with no hosts, taking no action")
return

with ProcessPoolExecutor(
Expand All @@ -228,31 +228,51 @@ def checkin(self, sequential=False, host=None):
)
helpers.update_inventory(remove=[h.hostname for h in hosts])

def extend(self, host=None):
def _extend(self, host):
"""extend a single VM"""
logger.info(f"Extending host {host.hostname}")
provider = PROVIDERS[host._broker_provider]
self._kwargs["target_vm"] = host
self._act(provider, "extend_vm", checkout=False)
return host

def extend(self, sequential=False, host=None):
"""extend one or more VMs
:param host: can be one of:
None - Will use the contents of self._hosts
A single host object
A list of host objects
A dictionary mapping host types to one or more host objects
:param sequential: boolean whether to run checkins sequentially
"""
if host is None:
host = self._hosts
logger.debug(host)
if isinstance(host, dict):
for _host in host.values():
self.extend(_host)
elif isinstance(host, list):
# reversing over a copy of the list to avoid skipping
for _host in host[::-1]:
self.extend(_host)
elif host:
logger.info(f"Extending host {host.hostname}")
provider = PROVIDERS[host._broker_provider]
self._kwargs["target_vm"] = host
logger.debug(f"Executing extend with provider {provider.__name__}")
self._act(provider, "extend_vm", checkout=False)
# default to hosts listed on the instance
hosts = host or self._hosts
logger.debug(
f"Extend called with: {hosts}, "
f'running {"sequential" if sequential else "concurrent"}'
)
# normalize the type since the function accepts multiple types
if isinstance(hosts, dict):
# flatten the lists of hosts from the values of the dict
hosts = [host for host_list in hosts.values() for host in host_list]
if not isinstance(hosts, list):
hosts = [hosts]

if not hosts:
logger.debug("Extend called with no hosts, taking no action")
return

with ProcessPoolExecutor(
max_workers=1 if sequential else len(hosts)
) as workers:
completed_extends = as_completed(
workers.submit(self._extend, _host) for _host in hosts
)
for completed in completed_extends:
_host = completed.result()
logger.info(f"Completed extend for {_host.hostname or _host.name}")

@staticmethod
def sync_inventory(provider):
Expand Down
12 changes: 8 additions & 4 deletions broker/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def provider_cmd(*args, **kwargs): # the actual subcommand
@click.option(
"--log-level",
type=click.Choice(["info", "warning", "error", "critical", "debug", "silent"]),
default="info",
default="debug" if settings.settings.debug else "info",
callback=helpers.update_log_level,
is_eager=True,
expose_value=False,
Expand Down Expand Up @@ -183,7 +183,7 @@ def providers():
@click.argument("vm", type=str, nargs=-1)
@click.option("-b", "--background", is_flag=True, help="Run checkin in the background")
@click.option("--all", "all_", is_flag=True, help="Select all VMs")
@click.option("--sequential", is_flag=True, default=False, help="Run checkins sequentially")
@click.option("--sequential", is_flag=True, help="Run checkins sequentially")
@click.option(
"--filter", type=str, help="Checkin only what matches the specified filter"
)
Expand Down Expand Up @@ -246,11 +246,13 @@ def inventory(details, sync, filter):
@click.argument("vm", type=str, nargs=-1)
@click.option("-b", "--background", is_flag=True, help="Run extend in the background")
@click.option("--all", "all_", is_flag=True, help="Select all VMs")

@click.option("--sequential", is_flag=True, help="Run extends sequentially")
@click.option(
"--filter", type=str, help="Extend only what matches the specified filter"
)
@provider_options
def extend(vm, background, all_, filter, **kwargs):
def extend(vm, background, all_, sequential, filter, **kwargs):
"""Extend a host's lease time
COMMAND: broker extend <vm hostname>|<vm name>|<local id>
Expand All @@ -261,6 +263,8 @@ def extend(vm, background, all_, filter, **kwargs):
:param all_: Click option all
:param sequential: Flag for whether to run extends sequentially
:param filter: a filter string matching broker's specification
"""
broker_args = helpers.clean_dict(kwargs)
Expand All @@ -272,7 +276,7 @@ def extend(vm, background, all_, filter, **kwargs):
if str(num) in vm or host["hostname"] in vm or host["name"] in vm or all_:
to_extend.append(VMBroker().reconstruct_host(host))
broker_inst = VMBroker(hosts=to_extend, **broker_args)
broker_inst.extend()
broker_inst.extend(sequential=sequential)


@cli.command()
Expand Down
26 changes: 24 additions & 2 deletions broker/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def resolve_file_args(broker_args):
if isinstance(val, Path) or (
isinstance(val, str) and val[-4:] in ("json", "yaml", ".yml")
):
if (data := load_file(val)) :
if (data := load_file(val)):
if key == "args_file":
if isinstance(data, dict):
final_args.update(data)
Expand Down Expand Up @@ -361,4 +361,26 @@ def handle_keyboardinterrupt(*args):
if choice.lower()[0] == "y":
fork_broker()
else:
raise exceptions.BrokerError("Broker killed by user.")
raise exceptions.BrokerError("Broker killed by user.")


def translate_timeout(timeout):
"""Allows for flexible timeout definitions, converts other units to ms
acceptable units are (s)econds, (m)inutes, (h)ours, (d)ays
"""
if isinstance(timeout, str):
timeout, unit = int(timeout[:-1]), timeout[-1]
if unit == "d":
timeout *= 24
unit = "h"
if unit == "h":
timeout *= 60
unit = "m"
if unit == "m":
timeout *= 60
unit = "s"
if unit == "s":
timeout *= 1000
return timeout if isinstance(timeout, int) else 0
11 changes: 7 additions & 4 deletions broker/logger.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# -*- encoding: utf-8 -*-
"""Module handling internal and dependency logging."""
import logging
from pathlib import Path
import logzero
from broker.settings import BROKER_DIRECTORY
from broker.settings import BROKER_DIRECTORY, settings


def setup_logzero(level="info", path="logs/broker.log", silent=False):
Expand All @@ -15,15 +14,19 @@ def setup_logzero(level="info", path="logs/broker.log", silent=False):
"%(end_color)s %(message)s"
)
log_level = getattr(logging, level.upper(), logging.INFO)

# formatter for terminal
formatter = logzero.LogFormatter(
fmt=debug_fmt if log_level is logging.DEBUG else log_fmt
)
logzero.setup_default_logger(formatter=formatter, disableStderrLogger=silent)
logzero.loglevel(log_level)
# formatter for file
formatter = logzero.LogFormatter(
fmt=debug_fmt if log_level is logging.DEBUG else log_fmt, color=False
)
logzero.logfile(
path, loglevel=log_level, maxBytes=1e9, backupCount=3, formatter=formatter
)


setup_logzero()
setup_logzero(level="debug" if settings.debug else "info")
3 changes: 2 additions & 1 deletion broker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from ssh2.session import Session as ssh2_Session
from ssh2 import sftp as ssh2_sftp
from broker.helpers import translate_timeout

SESSIONS = {}

Expand Down Expand Up @@ -61,7 +62,7 @@ def _read(channel):

def run(self, command, timeout=0):
"""run a command on the host and return the results"""
self.session.set_timeout(timeout)
self.session.set_timeout(translate_timeout(timeout))
channel = self.session.open_session()
channel.execute(
command,
Expand Down

0 comments on commit 136bb56

Please sign in to comment.