Skip to content

Commit

Permalink
Dynamic logging with admin commands (#3127)
Browse files Browse the repository at this point in the history
depends on #3126
- Add dynamic logging mechanism with options to provide a file,
levelname/level for root level, or to reload
- Add log_config argument for simulator
- `configure_site_log target config` under operate command category
- `configure_job_log job_id target config` under manage_job command
category

### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
  • Loading branch information
SYangster authored Jan 15, 2025
1 parent 58cd502 commit 1be401c
Show file tree
Hide file tree
Showing 18 changed files with 358 additions and 15 deletions.
2 changes: 2 additions & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ class AdminCommandNames(object):
SHELL_TAIL = "tail"
SHELL_GREP = "grep"
APP_COMMAND = "app_command"
CONFIGURE_JOB_LOG = "configure_job_log"
CONFIGURE_SITE_LOG = "configure_site_log"


class ServerCommandNames(object):
Expand Down
36 changes: 36 additions & 0 deletions nvflare/fuel/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,42 @@ def apply_log_config(dict_config, dir_path: str = "", file_prefix: str = ""):
logging.config.dictConfig(dict_config)


def dynamic_log_config(config: str, workspace: Workspace, job_id: str = None):
# Dynamically configure log given a config (filepath, levelname, levelnumber, 'reload'), apply the config to the proper locations.
if not isinstance(config, str):
raise ValueError(
f"Unsupported config type. Expect config to be string filepath, levelname, levelnumber, or 'reload' but got {type(config)}"
)

if config == "reload":
config = workspace.get_log_config_file_path()

if os.path.isfile(config):
# Read confg file
with open(config, "r") as f:
dict_config = json.load(f)

if job_id:
dir_path = workspace.get_run_dir(job_id)
else:
dir_path = workspace.get_root_dir()

apply_log_config(dict_config, dir_path)

else:
# Set level of root logger based on levelname or levelnumber
if config.isdigit():
level = int(config)
if not (0 <= level <= 50):
raise ValueError(f"Invalid logging level: {level}")
else:
level = getattr(logging, config.upper(), None)
if level is None:
raise ValueError(f"Invalid logging level: {config}")

logging.getLogger().setLevel(level)


def add_log_file_handler(log_file_name):
root_logger = logging.getLogger()
main_handler = root_logger.handlers[0]
Expand Down
6 changes: 5 additions & 1 deletion nvflare/job_config/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ def export_job(self, job_root: str):
self._set_all_apps()
self.job.generate_job_config(job_root)

def simulator_run(self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None):
def simulator_run(
self, workspace: str, n_clients: int = None, threads: int = None, gpu: str = None, log_config: str = None
):
"""Run the job with the simulator with the `workspace` using `clients` and `threads`.
For end users.
Expand All @@ -531,6 +533,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No
n_clients: number of clients.
threads: number of threads.
gpu: gpu assignments for simulating clients, comma separated
log_config: log config json file path
Returns:
Expand All @@ -556,6 +559,7 @@ def simulator_run(self, workspace: str, n_clients: int = None, threads: int = No
n_clients=n_clients,
threads=threads,
gpu=gpu,
log_config=log_config,
)

def as_id(self, obj: Any) -> str:
Expand Down
4 changes: 3 additions & 1 deletion nvflare/job_config/fed_job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def generate_job_config(self, job_root):

self._generate_meta(job_dir)

def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None):
def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, gpu=None, log_config=None):
with TemporaryDirectory() as job_root:
self.generate_job_config(job_root)

Expand All @@ -157,6 +157,8 @@ def simulator_run(self, workspace, clients=None, n_clients=None, threads=None, g
if gpu:
gpu = self._trim_whitespace(gpu)
command += " -gpu " + str(gpu)
if log_config:
command += " -l" + str(log_config)

new_env = os.environ.copy()
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)
Expand Down
2 changes: 2 additions & 0 deletions nvflare/private/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TrainingTopic(object):
START_JOB = "train.start_job"
GET_SCOPES = "train.get_scopes"
NOTIFY_JOB_STATUS = "train.notify_job_status"
CONFIGURE_JOB_LOG = "train.configure_job_log"


class RequestHeader(object):
Expand Down Expand Up @@ -98,6 +99,7 @@ class SysCommandTopic(object):
SHELL = "sys.shell"
REPORT_RESOURCES = "resource_manager.report_resources"
REPORT_ENV = "sys.report_env"
CONFIGURE_SITE_LOG = "sys.configure_site_log"


class ControlCommandTopic(object):
Expand Down
2 changes: 2 additions & 0 deletions nvflare/private/fed/app/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def define_simulator_parser(simulator_parser):
simulator_parser.add_argument("-c", "--clients", type=str, help="client names list")
simulator_parser.add_argument("-t", "--threads", type=int, help="number of parallel running clients")
simulator_parser.add_argument("-gpu", "--gpu", type=str, help="list of GPU Device Ids, comma separated")
simulator_parser.add_argument("-l", "--log_config", type=str, help="log config file path")
simulator_parser.add_argument("-m", "--max_clients", type=int, default=100, help="max number of clients")
simulator_parser.add_argument(
"--end_run_for_all",
Expand All @@ -46,6 +47,7 @@ def run_simulator(simulator_args):
n_clients=simulator_args.n_clients,
threads=simulator_args.threads,
gpu=simulator_args.gpu,
log_config=simulator_args.log_config,
max_clients=simulator_args.max_clients,
end_run_for_all=simulator_args.end_run_for_all,
)
Expand Down
46 changes: 37 additions & 9 deletions nvflare/private/fed/app/simulator/simulator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
n_clients=None,
threads=None,
gpu=None,
log_config=None,
max_clients=100,
end_run_for_all=False,
):
Expand All @@ -99,6 +100,7 @@ def __init__(
self.n_clients = n_clients
self.threads = threads
self.gpu = gpu
self.log_config = log_config
self.max_clients = max_clients
self.end_run_for_all = end_run_for_all

Expand Down Expand Up @@ -126,7 +128,15 @@ def __init__(
self.workspace = os.path.join(running_dir, self.workspace)

def _generate_args(
self, job_folder: str, workspace: str, clients=None, n_clients=None, threads=None, gpu=None, max_clients=100
self,
job_folder: str,
workspace: str,
clients=None,
n_clients=None,
threads=None,
gpu=None,
log_config=None,
max_clients=100,
):
args = Namespace(
job_folder=job_folder,
Expand All @@ -135,14 +145,22 @@ def _generate_args(
n_clients=n_clients,
threads=threads,
gpu=gpu,
log_config=log_config,
max_clients=max_clients,
)
args.set = []
return args

def setup(self):
self.args = self._generate_args(
self.job_folder, self.workspace, self.clients, self.n_clients, self.threads, self.gpu, self.max_clients
self.job_folder,
self.workspace,
self.clients,
self.n_clients,
self.threads,
self.gpu,
self.log_config,
self.max_clients,
)

if self.args.clients:
Expand All @@ -152,14 +170,19 @@ def setup(self):
for i in range(self.args.n_clients):
self.client_names.append("site-" + str(i + 1))

log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG)
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)
if self.args.log_config:
log_config_file_path = self.args.log_config
if not os.path.isfile(log_config_file_path):
self.logger.error(f"log_config: {log_config_file_path} is not a valid file path")
return False
else:
log_config_file_path = os.path.join(self.args.workspace, "local", WorkspaceConstants.LOGGING_CONFIG)
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)

with open(log_config_file_path, "r") as f:
dict_config = json.load(f)

self.args.log_config = None
self.args.config_folder = "config"
self.args.job_id = SimulatorConstants.JOB_NAME
self.args.client_config = os.path.join(self.args.config_folder, JobConstants.CLIENT_JOB_CONFIG)
Expand Down Expand Up @@ -669,9 +692,14 @@ def _pick_next_client(self):
def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name=RunnerTask.TASK_EXEC):
open_port = get_open_ports(1)[0]
client_workspace = os.path.join(self.args.workspace, client.client_name)
logging_config = os.path.join(
self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG
)
if self.args.log_config:
logging_config = self.args.log_config
if not os.path.isfile(logging_config):
raise ValueError(f"log_config: {logging_config} is not a valid file path")
else:
logging_config = os.path.join(
self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG
)
decomposer_module = ConfigService.get_str_var(
name=ConfigVarName.DECOMPOSER_MODULE, conf=SystemConfigs.RESOURCES_CONF
)
Expand Down
31 changes: 31 additions & 0 deletions nvflare/private/fed/client/admin_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.fed.client.client_status import get_status_message
from nvflare.security.logging import secure_format_exception
from nvflare.widgets.info_collector import InfoCollector
from nvflare.widgets.widget import WidgetID

Expand Down Expand Up @@ -246,6 +248,34 @@ def process(self, data: Shareable, fl_ctx: FLContext):
return None


class ConfigureJobLogCommand(CommandProcessor):
"""To implement the configure_job_log command."""

def get_command_name(self) -> str:
"""To get the command name.
Returns: AdminCommandNames.CONFIGURE_JOB_LOG
"""
return AdminCommandNames.CONFIGURE_JOB_LOG

def process(self, data: Shareable, fl_ctx: FLContext):
"""Called to process the configure_job_log command.
Args:
data: process data
fl_ctx: FLContext
Returns: configure_job_log command message
"""
engine = fl_ctx.get_engine()
try:
dynamic_log_config(data, engine.get_workspace(), fl_ctx.get_job_id())
except Exception as e:
return secure_format_exception(e)


class AdminCommands(object):
"""AdminCommands contains all the commands for processing the commands from the parent process."""

Expand All @@ -257,6 +287,7 @@ class AdminCommands(object):
ShowStatsCommand(),
ShowErrorsCommand(),
ResetErrorsCommand(),
ConfigureJobLogCommand(),
]

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions nvflare/private/fed/client/client_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ def get_current_run_info(self, job_id) -> ClientRunInfo:
def get_errors(self, job_id):
return self.client_executor.get_errors(job_id)

def configure_job_log(self, job_id, config):
return self.client_executor.configure_job_log(job_id, config)

def reset_errors(self, job_id):
self.client_executor.reset_errors(job_id)

Expand Down
31 changes: 31 additions & 0 deletions nvflare/private/fed/client/client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,37 @@ def get_errors(self, job_id):
secure_log_traceback()
return None

def configure_job_log(self, job_id, config):
"""Configure the job log.
Args:
job_id: the job_id
config: log config
Returns:
configure_job_log command message
"""
try:
request = new_cell_message({}, config)
return_data = self.client.cell.send_request(
target=self._job_fqcn(job_id),
channel=CellChannel.CLIENT_COMMAND,
topic=AdminCommandNames.CONFIGURE_JOB_LOG,
request=request,
optional=True,
timeout=self.job_query_timeout,
)
return_code = return_data.get_header(MessageHeaderKey.RETURN_CODE)
if return_code == ReturnCode.OK:
return return_data.payload
else:
return f"failed to configure_job_log with return code: {return_code}"
except Exception as e:
err = f"configure_job_log execution exception: {secure_format_exception(e)}."
self.logger.error(err)
secure_log_traceback()
return err

def reset_errors(self, job_id):
"""Resets the error information.
Expand Down
5 changes: 4 additions & 1 deletion nvflare/private/fed/client/client_req_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
from .info_coll_cmd import ClientInfoProcessor
from .scheduler_cmds import CancelResourceProcessor, CheckResourceProcessor, ReportResourcesProcessor, StartJobProcessor
from .shell_cmd import ShellCommandProcessor
from .sys_cmd import ReportEnvProcessor, SysInfoProcessor
from .sys_cmd import ConfigureSiteLogProcessor, ReportEnvProcessor, SysInfoProcessor
from .training_cmds import ( # StartClientMGpuProcessor,; SetRunNumberProcessor,
AbortAppProcessor,
AbortTaskProcessor,
ClientStatusProcessor,
ConfigureJobLogProcessor,
DeleteRunNumberProcessor,
DeployProcessor,
NotifyJobStatusProcessor,
Expand Down Expand Up @@ -52,6 +53,8 @@ class ClientRequestProcessors:
ReportResourcesProcessor(),
ReportEnvProcessor(),
NotifyJobStatusProcessor(),
ConfigureJobLogProcessor(),
ConfigureSiteLogProcessor(),
]

@staticmethod
Expand Down
22 changes: 21 additions & 1 deletion nvflare/private/fed/client/sys_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

from nvflare.apis.fl_constant import FLContextKey, SystemComponents
from nvflare.apis.fl_context import FLContext
from nvflare.private.admin_defs import Message
from nvflare.fuel.utils.log_utils import dynamic_log_config
from nvflare.private.admin_defs import Message, error_reply, ok_reply
from nvflare.private.defs import SysCommandTopic
from nvflare.private.fed.client.admin import RequestProcessor
from nvflare.security.logging import secure_format_exception


class SysInfoProcessor(RequestProcessor):
Expand Down Expand Up @@ -80,3 +82,21 @@ def process(self, req: Message, app_ctx) -> Message:
}
message = Message(topic="reply_" + req.topic, body=json.dumps(env))
return message


class ConfigureSiteLogProcessor(RequestProcessor):
def get_topics(self) -> List[str]:
return [SysCommandTopic.CONFIGURE_SITE_LOG]

def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
fl_ctx = engine.new_context()
site_name = fl_ctx.get_identity_name()
workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)

try:
dynamic_log_config(req.body, workspace)
except Exception as e:
return error_reply(secure_format_exception(e))

return ok_reply(topic=f"reply_{req.topic}", body=f"successfully configured {site_name} log")
Loading

0 comments on commit 1be401c

Please sign in to comment.