From f3211e7296be0310f51ed4deabe1359adf4c228c Mon Sep 17 00:00:00 2001 From: Shailesh Pant Date: Thu, 9 Jan 2025 20:03:59 +0530 Subject: [PATCH] - add --selected_task_group option to aggregator cli, default to "learning" - enhance Aggregator to take selected_task_group attribute to enable fedeval or learning switching at aggregator level - rebase 16.Jan.1 - fix aggregator cli test cases as per new "selected_task_group" field in start - changed default assigner task_group name to "learning" and "evaluation" - updated worspaces to use new task_group names - learning / evaluation - updated as per review comments - update the FedEval documentation with e2e usage steps - Rebased 15-Jan.1 - Fixed docs indentation issue,reduced the verbosity in doc Signed-off-by: Shailesh Pant --- docs/about/features_index/fed_eval.rst | 482 +++++++++++++++++- .../torch_cnn_mnist/plan/plan.yaml | 2 +- .../workspace/plan/defaults/assigner.yaml | 2 +- .../federated-evaluation/assigner.yaml | 2 +- openfl/component/aggregator/aggregator.py | 10 +- openfl/interface/aggregator.py | 49 +- tests/openfl/interface/test_aggregator_api.py | 44 +- 7 files changed, 560 insertions(+), 31 deletions(-) diff --git a/docs/about/features_index/fed_eval.rst b/docs/about/features_index/fed_eval.rst index e35c0f5afa..3f8e6b8637 100644 --- a/docs/about/features_index/fed_eval.rst +++ b/docs/about/features_index/fed_eval.rst @@ -24,34 +24,486 @@ In general pipeline is as follows: Example Using the Task Runner API (Aggregator-based Workflow) -------------------------------------------------------------- -To demonstrate usage of the task runner API (aggregator-based workflow) for federated evaluation, consider the `Hello Federation example `_. This sample script creates a simple federation with two collaborator nodes and one aggregator node, and executes based on a user specified workspace template. We provide a ``torch_cnn_mnist_fed_eval`` template, which is a federated evaluation template adapted from ``torch_cnn_mnist``. +The following steps can be leveraged to achieve practical e2e usage of FedEval -This script can be directly executed as follows: +*N.B*: We will be using torch_cnn_mnist plan itself for both training and with some minor changes for evaluation as well + +*Prerequisites*: Please ensure that OpenFL version==1.7 is installed or you can also choose to install latest from source. + +With OpenFL version==1.7 aggregator start command is enhanced to have an optional argument '--task_group' which, as the help suggest, will select the provided task_groups task to assigner for execution in the collaborator(s), since this defaults to 'learning' + +.. code-block:: shell + + Usage: fx aggregator start [OPTIONS] + + Start the aggregator service. + + Args: plan (str): Path to plan config file authorized_cols (str): Path to authorized collaborators file + task_group (str): Selected task-group for assignement - defaults to 'learning' + + Options: + -p, --plan PATH Federated learning plan [plan/plan.yaml] + -c, --authorized_cols PATH Authorized collaborator list [plan/cols.yaml] + --task_group TEXT Selected task-group for assignment - defaults to learning + --help Show this message and exit. + +1. **Setup** +We will use the `torch_cnn_mnist` workspace for training + +Let's first configure a workspace with all necesary certificates .. code-block:: shell - $ python test_hello_federation.py --template torch_cnn_mnist_fed_eval + fx workspace create --prefix ./cnn_train_eval --template torch_cnn_mnist + cd cnn_train_eval + fx workspace certify + fx aggregator generate-cert-request + fx aggregator certify --silent + +Succesful run of this will show in console both the FL plan details and certificates generations + +.. code-block:: shell + + INFO Parsing Federated Learning Plan : SUCCESS : + + settings: + + best_state_path: save/best.pbuf + + db_store_rounds: 2 + + init_state_path: save/init.pbuf + + last_state_path: save/last.pbuf + + rounds_to_train: 2 + + write_logs: false + + template: openfl.component.aggregator.Aggregator + + assigner: + + settings: + + task_groups: + + - name: learning + + percentage: 1.0 + + tasks: + + - aggregated_model_validation + + - train + + - locally_tuned_model_validation + + template: openfl.component.RandomGroupedAssigner + + collaborator: + + settings: + + db_store_rounds: 1 + + delta_updates: false + + opt_treatment: RESET + + template: openfl.component.collaborator.Collaborator + + compression_pipeline: + + settings: {} + + template: openfl.pipelines.NoCompressionPipeline + + data_loader: + + settings: + + batch_size: 64 + + collaborator_count: 2 + + template: src.dataloader.PyTorchMNISTInMemory + + network: + + settings: + + agg_addr: devvm###.com + + agg_port: 55529 + + cert_folder: cert + + client_reconnect_interval: 5 + + hash_salt: auto + + require_client_auth: true + + use_tls: true + + template: openfl.federation.Network + + task_runner: + + settings: {} + + template: src.taskrunner.TemplateTaskRunner + + tasks: + + aggregated_model_validation: + + function: validate_task + + kwargs: + + apply: global + + metrics: + + - acc + + locally_tuned_model_validation: + + function: validate_task + + kwargs: + + apply: local + + metrics: + + - acc + + settings: {} + + train: + + function: train_task + + kwargs: + + epochs: 1 + + metrics: + + - loss + New workspace directory structure: + cnn_train_eval + ├── requirements.txt + ├── .workspace + ├── logs + ├── data + ├── cert + ├── README.md + ├── src + │ ├── __init__.py + │ ├── taskrunner.py + │ ├── cnn_model.py + │ └── dataloader.py + ├── plan + │ ├── cols.yaml + │ ├── plan.yaml + │ ├── data.yaml + │ └── defaults + └── save + + 6 directories, 11 files + + ✔️ OK + Setting Up Certificate Authority... + + Done. + + ✔️ OK + Creating AGGREGATOR certificate key pair with following settings: CN=devvm###.com, SAN=DNS:devvm###.com + + ✔️ OK + The CSR Hash for file server/agg_devvm###.com.csr = 3affa56ce391a084961c5f1ba634f223536173665daa6191e705e13557f36d58c844133758f804d1f85d93bfc113fd7b -In order to adapt this template for federated evaluation, the following defaults were added for assigner, aggregator and tasks and same referenced in the ``plan.yaml``: + Signing AGGREGATOR certificate + + ✔️ OK + +2. Initialize the plan + +.. code-block:: shell + + cd ~/src/clean/openfl/cnn_train_eval + fx plan initialize >~/plan.log 2>&1 & + tail -f ~/plan.log + +This should initialize the plan with random initial weights in ``init.pbuf`` + +.. code-block:: shell + + WARNING Following parameters omitted from global initial model, local initialization will determine values: [] plan.py:186 + INFO Creating Initial Weights File 🠆 save/init.pbuf + plan.py:196 + ✔️ OK + +3. Next run the 'learning' federation with two collaborators + +.. code-block:: shell + + ## Create two collaborators + cd ~/src/clean/openfl/cnn_train_eval + fx collaborator create -n collaborator1 -d 1 + fx collaborator generate-cert-request -n collaborator1 + fx collaborator certify -n collaborator1 --silent + fx collaborator create -n collaborator2 -d 2 + fx collaborator generate-cert-request -n collaborator2 + fx collaborator certify -n collaborator2 --silent + + ## start the fedeval federation + fx aggregator start > ~/fx_aggregator.log 2>&1 & + fx collaborator start -n collaborator1 > ~/collab1.log 2>&1 & + fx collaborator start -n collaborator2 > ~/collab2.log 2>&1 & + cd ~ + tail -f plan.log fx_aggregator.log collab1.log collab2.log + +This script will run two collaborator and start the aggregator with default `--task_group` 'learning' + +The same is defined in the assigner section of the plan which comes from the defaults itself + +.. code-block:: yaml + + assigner: + + settings: + + task_groups: + + - name: learning + + percentage: 1.0 + + tasks: + + - aggregated_model_validation + + - train + + - locally_tuned_model_validation + +This will run the 2 rounds of training across both the collaborators + +.. code-block:: shell + + ==> fx_aggregator.log <== + INFO Sending tasks to collaborator collaborator2 for round 0 + aggregator.py:409 -.. literalinclude:: ../../../openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml + ==> collab2.log <== + INFO Received Tasks: [name: "aggregated_model_validation" + collaborator.py:184 + , name: "train" -.. literalinclude:: ../../../openfl-workspace/workspace/plan/defaults/federated-evaluation/aggregator.yaml + , name: "locally_tuned_model_validation" -.. literalinclude:: ../../../openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml + ] -.. literalinclude:: ../../../openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml +Post the end of learning federation we can note what is the best model accuracy reported and save the ``best.pbuf`` file for next step - evaluation -Key Changes for Federated Evaluation by baking in defaults for: +.. code-block:: shell + + ==> fx_aggregator.log <== + [06:09:27] INFO Collaborator collaborator1 is sending task results for train, round 1 + + [06:09:28] INFO Collaborator collaborator1 is sending task results for locally_tuned_model_validation, round 1 aggregator.py:629 + INFO Round 1: Collaborators that have completed all tasks: ['collaborator2', 'collaborator1'] aggregator.py:1049 + INFO Round 1: saved the best model with score 0.960096 + + INFO Saving round 1 model... + + INFO Experiment Completed. Cleaning up... + +In this case we can confirm that post the 2 rounds of training the model reported an accuracy of 0.960096 + +.. code-block:: shell + + Round 1: saved the best model with score 0.960096 + aggregator.py:955 + +Let's save this model (``best.pbuf``) for later usage + +.. code-block:: shell -1. **aggregator.settings.rounds_to_train**: Set to 1 -2. **assigner**: Assign to aggregated_model_validation instead of default assignments -3. **tasks**: Set to aggregated_model_validation instead of default tasks + cp cnn_train_eval/save/best.pbuf ~/trained_model.pbuf + devuser@devvm:~/src/clean/openfl$ + +Now let's create another workspace using the same plan and steps as mentioned in learning Setup: + +Post this we will do plan initialize and we shall replace the ``init.pbuf`` with the previously saved ``best.pbuf`` and then re-adjust the plan +to use "evaluation" defaults. + +Once all the pieces are in place we then run the aggregator in evaluation mode by supplying the `--task_group` as "evaluation" validating the +accuracy of the previously trained model + +The updated plan post initialization with edits to make it ready for evaluation will be as follows: + +.. code-block:: yaml + + aggregator: + settings: + best_state_path: save/best.pbuf + db_store_rounds: 2 + init_state_path: save/init.pbuf + last_state_path: save/last.pbuf + rounds_to_train: 1 + write_logs: false + template: openfl.component.aggregator.Aggregator + assigner: + settings: + task_groups: + - name: evaluation + percentage: 1.0 + tasks: + - aggregated_model_validation + template: openfl.component.RandomGroupedAssigner + collaborator: + settings: + db_store_rounds: 1 + delta_updates: false + opt_treatment: RESET + template: openfl.component.collaborator.Collaborator + compression_pipeline: + settings: {} + template: openfl.pipelines.NoCompressionPipeline + data_loader: + settings: + batch_size: 64 + collaborator_count: 2 + template: src.dataloader.PyTorchMNISTInMemory + network: + settings: + agg_addr: devvm###.com + agg_port: 55529 + cert_folder: cert + client_reconnect_interval: 5 + hash_salt: auto + require_client_auth: true + use_tls: true + template: openfl.federation.Network + task_runner: + settings: {} + template: src.taskrunner.TemplateTaskRunner + tasks: + aggregated_model_validation: + function: validate_task + kwargs: + apply: global + metrics: + - acc + locally_tuned_model_validation: + function: validate_task + kwargs: + apply: local + metrics: + - acc + settings: {} + train: + function: train_task + kwargs: + epochs: 1 + metrics: + - loss + +We have done following changes to the initialized torch_cnn_mnist plan in the new workspace: + - Set the rounds_to_train to 1 as evaluation needs just one round of federation run across the collaborators + - Removed all other training related tasks from assigner settings except "aggregated_model_validation" +Now let's replace the ``init.pbuf`` with the previously saved ``trained_model.pbuf`` + +.. code-block:: shell -**Optional**: modify ``src/pt_cnn.py`` to remove optimizer initialization and definition of loss function as these are not needed for evaluation + ll cnn_eval/save/init.pbuf + -rw------- 1 devuser devuser 1722958 Jan 14 09:44 cnn_eval/save/init.pbuf + (venv) devuser@devvm:~/src/clean/openfl$ cp ~/trained_model.pbuf cnn_eval/save/init.pbuf + (venv) devuser@devvm:~/src/clean/openfl$ ll cnn_eval/save/init.pbuf + -rw------- 1 devuser devuser 1722974 Jan 14 09:52 cnn_eval/save/init.pbuf + (venv) devuser@devvm:~/src/clean/openfl$ + +Notice the size changes in the ``init.pbuf`` as its replaced by the trained model we saved from the training run of the federation + +Now finally let's run the federation and this time we will launch the aggregator with overriding the default value of `--task_group` to "evaluation" + +.. code-block:: shell + + ## Create two collaborators + cd ~/src/clean/openfl/cnn_eval + fx collaborator create -n collaborator1 -d 1 + fx collaborator generate-cert-request -n collaborator1 + fx collaborator certify -n collaborator1 --silent + fx collaborator create -n collaborator2 -d 2 + fx collaborator generate-cert-request -n collaborator2 + fx collaborator certify -n collaborator2 --silent + + ## start the fedeval federation + fx aggregator start --task_group evaluation > ~/fx_aggregator.log 2>&1 & + fx collaborator start -n collaborator1 > ~/collab1.log 2>&1 & + fx collaborator start -n collaborator2 > ~/collab2.log 2>&1 & + cd ~ + tail -f plan.log fx_aggregator.log collab1.log collab2.log + +Notice the only change in fedration run steps from previous training round is the additional argument `--task_group` to aggregator start + +Now since the aggregators' task_group is set to "evaluation" it will skip the `round_number_check` and use the init model supplied just for evaluation + +.. code-block:: shell + + INFO Setting aggregator to assign: evaluation task_group + aggregator.py:101 + INFO 🧿 Starting the Aggregator Service. + aggregator.py:103 + + INFO Skipping round_number check for evaluation task_group + aggregator.py:215 + INFO Starting Aggregator gRPC Server + +In each collaborator logs we can see that the assigned task is only the evaluation task + +.. code-block:: shell + + => collab1.log <== + INFO Waiting for tasks... + collaborator.py:234 + INFO Received Tasks: [name: "aggregated_model_validation" + collaborator.py:184 + ] + ==> collab2.log <== + INFO Waiting for tasks... + collaborator.py:234 + INFO Received Tasks: [name: "aggregated_model_validation" + collaborator.py:184 + ] + +And post the federation run, since its only evaluation run, we get from the collaborator the accuracy of the init model which, as per successful +evaluation, is same as previously trained best models' accuracy, in our case that was 0.960096 + +.. code-block:: shell -This sample script will create a federation based on the `torch_cnn_mnist_fed_eval` template using the `plan.yaml` file defined above, spawning two collaborator nodes and a single aggregator node. The model will be sent to the two collaborator nodes, where each collaborator will perform model validation on its own local data. The accuracy from this model validation will then be send back to the aggregator where it will aggregated into a final accuracy metric. The federation will then be shutdown. + ==> fx_aggregator.log <== + [10:00:15] INFO Collaborator collaborator2 is sending task results for aggregated_model_validation, round 0 aggregator.py:629 + INFO Round 0: Collaborators that have completed all tasks: ['collaborator2'] + aggregator.py:1049 + INFO Collaborator collaborator1 is sending task results for aggregated_model_validation, round 0 aggregator.py:629 + INFO Round 0: Collaborators that have completed all tasks: ['collaborator2', 'collaborator1'] aggregator.py:1049 + INFO Round 0: saved the best model with score 0.960096 + aggregator.py:955 + INFO Saving round 0 model... + aggregator.py:994 + INFO Experiment Completed. Cleaning up... + aggregator.py:1005 + INFO Sending signal to collaborator collaborator1 to shutdown... + aggregator.py:356 --- -Congratulations, you have successfully performed federated evaluation across two decentralized collaborator nodes with minor default reference changes to plan \ No newline at end of file +Congratulations, you have successfully performed federated evaluation across two decentralized collaborator nodes using the same plan with minor evaluation-related changes leveraging a previously trained OpenFL model protobuf as input. \ No newline at end of file diff --git a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml index 7ad310a3e9..cae2fd0028 100644 --- a/openfl-workspace/torch_cnn_mnist/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml @@ -13,7 +13,7 @@ aggregator: assigner: settings: task_groups: - - name: train_and_validate + - name: learning percentage: 1.0 tasks: - aggregated_model_validation diff --git a/openfl-workspace/workspace/plan/defaults/assigner.yaml b/openfl-workspace/workspace/plan/defaults/assigner.yaml index 0b7e744475..6a5903794f 100644 --- a/openfl-workspace/workspace/plan/defaults/assigner.yaml +++ b/openfl-workspace/workspace/plan/defaults/assigner.yaml @@ -1,7 +1,7 @@ template : openfl.component.RandomGroupedAssigner settings : task_groups : - - name : train_and_validate + - name : learning percentage : 1.0 tasks : - aggregated_model_validation diff --git a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml b/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml index 9d583fa0c4..c660659e83 100644 --- a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml +++ b/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml @@ -1,7 +1,7 @@ template : openfl.component.RandomGroupedAssigner settings : task_groups : - - name : validate + - name : evaluation percentage : 1.0 tasks : - aggregated_model_validation \ No newline at end of file diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 61d253bcff..4b3ff2680c 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -1,7 +1,6 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 - """Aggregator module.""" import logging @@ -85,6 +84,7 @@ def __init__( callbacks: Optional[List] = None, persist_checkpoint=True, persistent_db_path=None, + task_group: str = "learning", ): """Initializes the Aggregator. @@ -111,7 +111,9 @@ def __init__( Defaults to 1. initial_tensor_dict (dict, optional): Initial tensor dictionary. callbacks: List of callbacks to be used during the experiment. + task_group (str, optional): Selected task_group for assignment. """ + self.task_group = task_group self.round_number = 0 self.next_model_round_number = 0 @@ -298,9 +300,13 @@ def _load_initial_tensors(self): self.model, compression_pipeline=self.compression_pipeline ) - if round_number > self.round_number: + # Check selected task_group before updating round number + if self.task_group == "evaluation": + logger.info(f"Skipping round_number check for {self.task_group} task_group") + elif round_number > self.round_number: logger.info(f"Starting training from round {round_number} of previously saved model") self.round_number = round_number + tensor_key_dict = { TensorKey(k, self.uuid, self.round_number, False, ("model",)): v for k, v in tensor_dict.items() diff --git a/openfl/interface/aggregator.py b/openfl/interface/aggregator.py index 930dcd43be..2297cc4d2b 100644 --- a/openfl/interface/aggregator.py +++ b/openfl/interface/aggregator.py @@ -1,18 +1,33 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 - """Aggregator module.""" import sys from logging import getLogger from pathlib import Path -from click import Path as ClickPath -from click import confirm, echo, group, option, pass_context, style +from click import ( + Path as ClickPath, +) +from click import ( + confirm, + echo, + group, + option, + pass_context, + style, +) from openfl.cryptography.ca import sign_certificate -from openfl.cryptography.io import get_csr_hash, read_crt, read_csr, read_key, write_crt, write_key +from openfl.cryptography.io import ( + get_csr_hash, + read_crt, + read_csr, + read_key, + write_crt, + write_key, +) from openfl.cryptography.participant import generate_csr from openfl.federated import Plan from openfl.interface.cli_helper import CERT_DIR @@ -52,9 +67,20 @@ def aggregator(context): default="plan/cols.yaml", type=ClickPath(exists=True), ) -def start_(plan, authorized_cols): - """Start the aggregator service.""" +@option( + "--task_group", + required=False, + default="learning", + help="Selected task-group for assignment - defaults to learning", +) +def start_(plan, authorized_cols, task_group): + """Start the aggregator service. + Args: + plan (str): Path to plan config file + authorized_cols (str): Path to authorized collaborators file + task_group (str): Selected task-group for assignement - defaults to 'learning' + """ if is_directory_traversal(plan): echo("Federated learning plan path is out of the openfl workspace scope.") sys.exit(1) @@ -62,14 +88,21 @@ def start_(plan, authorized_cols): echo("Authorized collaborator list file path is out of the openfl workspace scope.") sys.exit(1) - plan = Plan.parse( + # Parse plan and override mode if specified + parsed_plan = Plan.parse( plan_config_path=Path(plan).absolute(), cols_config_path=Path(authorized_cols).absolute(), ) + # Set task_group in aggregator settings + if "settings" not in parsed_plan.config["aggregator"]: + parsed_plan.config["aggregator"]["settings"] = {} + parsed_plan.config["aggregator"]["settings"]["task_group"] = task_group + logger.info(f"Setting aggregator to assign: {task_group} task_group") + logger.info("🧿 Starting the Aggregator Service.") - plan.get_server().serve() + parsed_plan.get_server().serve() @aggregator.command(name="generate-cert-request") diff --git a/tests/openfl/interface/test_aggregator_api.py b/tests/openfl/interface/test_aggregator_api.py index 14572cf8ab..7986634368 100644 --- a/tests/openfl/interface/test_aggregator_api.py +++ b/tests/openfl/interface/test_aggregator_api.py @@ -17,7 +17,19 @@ def test_aggregator_start(mock_parse): plan_config = plan_path.joinpath('plan.yaml') cols_config = plan_path.joinpath('cols.yaml') - mock_parse.return_value = mock.Mock() + # Create a mock plan with the required fields + mock_plan = mock.MagicMock() + mock_plan.__getitem__.side_effect = {'task_group': 'learning'}.get + mock_plan.get = {'task_group': 'learning'}.get + # Add the config attribute with proper nesting + mock_plan.config = { + 'aggregator': { + 'settings': { + 'task_group': 'learning' + } + } + } + mock_parse.return_value = mock_plan ret = start_(['-p', plan_config, '-c', cols_config], standalone_mode=False) @@ -32,7 +44,20 @@ def test_aggregator_start_illegal_plan(mock_parse, mock_is_directory_traversal): plan_config = plan_path.joinpath('plan.yaml') cols_config = plan_path.joinpath('cols.yaml') - mock_parse.return_value = mock.Mock() + # Create a mock plan with the required fields + mock_plan = mock.MagicMock() + mock_plan.__getitem__.side_effect = {'task_group': 'learning'}.get + mock_plan.get = {'task_group': 'learning'}.get + # Add the config attribute with proper nesting + mock_plan.config = { + 'aggregator': { + 'settings': { + 'task_group': 'learning' + } + } + } + mock_parse.return_value = mock_plan + mock_is_directory_traversal.side_effect = [True, False] with TestCase.assertRaises(test_aggregator_start_illegal_plan, SystemExit): @@ -48,7 +73,20 @@ def test_aggregator_start_illegal_cols(mock_parse, mock_is_directory_traversal): plan_config = plan_path.joinpath('plan.yaml') cols_config = plan_path.joinpath('cols.yaml') - mock_parse.return_value = mock.Mock() + # Create a mock plan with the required fields + mock_plan = mock.MagicMock() + mock_plan.__getitem__.side_effect = {'task_group': 'learning'}.get + mock_plan.get = {'task_group': 'learning'}.get + # Add the config attribute with proper nesting + mock_plan.config = { + 'aggregator': { + 'settings': { + 'task_group': 'learning' + } + } + } + mock_parse.return_value = mock_plan + mock_is_directory_traversal.side_effect = [False, True] with TestCase.assertRaises(test_aggregator_start_illegal_cols, SystemExit):