Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance aggregator to skip round check for evaluation only flow #1258

Merged

Conversation

ishaileshpant
Copy link
Collaborator

@ishaileshpant ishaileshpant commented Jan 10, 2025

  • add --task_group option to aggregator cli, default to "learning"
  • enhance Aggregator to take selected_task_group attribute to enable fedeval or learning switching at aggregator level
  • rebase 16.Jan.1
  • fix aggregator cli test cases as per new "selected_task_group" field in start
  • changed default assigner task_group name to "learning" and "evaluation"
  • updated worspaces to use new task_group names - learning / evaluation
  • updated as per review comments
  • update the FedEval documentation with e2e usage steps
  • Fixed docs indentation issue,reduced the verbosity in doc
    Testing steps / results

We will use same plan for both training and evaluation using aggregator mode to skip the round check in evaluate mode. This we can do either of the following ways taking torch_cnn_mnist and torch_cnn_mnist_fed_eval ws as examples:

  1. Use torch_cnn_mnist workspace for training
 INFO     Parsing Federated Learning Plan : SUCCESS : /home/perfuser/src/clean/openfl/cnn_train_eval/plan/plan.yaml.                       plan.py:193
           INFO     aggregator:                                                                                    
                                  plan.py:198
                      settings:                                                                                    

                        best_state_path: save/best.pbuf                                                            

                        db_store_rounds: 2                                                                         

                        init_state_path: save/init.pbuf                                                            

                        last_state_path: save/last.pbuf                                                            

                        rounds_to_train: 3                                                                         

                        write_logs: false                                                                          

                      template: openfl.component.aggregator.Aggregator                                             

                    assigner:                                                                                      

                      settings:                                                                                    

                        task_groups:                                                                               

                        - name: train_and_validate                                                                 

                          percentage: 1.0                                                                          

                          tasks:                                                                                   

                          - aggregated_model_validation                                                            

                          - train                                                                                  

                          - locally_tuned_model_validation                                                         

                      template: openfl.component.RandomGroupedAssigner                                             

                    collaborator:                                                                                  

                      settings:                                                                                    

                        db_store_rounds: 1                                                                         

                        delta_updates: false                                                                       

                        opt_treatment: RESET                                                                       

                      template: openfl.component.collaborator.Collaborator                                         

                    compression_pipeline:                                                                          

                      settings: {}                                                                                 

                      template: openfl.pipelines.NoCompressionPipeline                                             

                    data_loader:                                                                                   

                      settings:                                                                                    

                        batch_size: 64                                                                             

                        collaborator_count: 2                                                                      

                      template: src.dataloader.PyTorchMNISTInMemory                                                

                    network:                                                                                       

                      settings:                                                                                    

                        agg_addr: PerfTestProd12.amr.corp.intel.com                                                

                        agg_port: 60674                                                                            

                        cert_folder: cert                                                                          

                        client_reconnect_interval: 5                                                               

                        hash_salt: auto                                                                            

                        require_client_auth: true                                                                  

                        use_tls: true                                                                              

                      template: openfl.federation.Network                                                          

                    task_runner:                                                                                   

                      settings: {}                                                                                 

                      template: src.taskrunner.TemplateTaskRunner                                                  

                    tasks:                                                                                         

                      aggregated_model_validation:                                                                 

                        function: validate_task                                                                    

                        kwargs:                                                                                    

                          apply: global                                                                            

                          metrics:                                                                                 

                          - acc                                                                                    

                      locally_tuned_model_validation:                                                              

                        function: validate_task                                                                    

                        kwargs:                                                                                    

                          apply: local                                                                             

                          metrics:                                                                                 

                          - acc                                                                                    

                      settings: {}                                                                                 

                      train:                                                                                       

                        function: train_task                                                                       

                        kwargs:                                                                                    

                          epochs: 1                                                                                

                          metrics:                                                                                 

                          - loss                                                                                   

        WARNING  Following parameters omitted from global initial model, local initialization will determine values: []                           plan.py:186
           INFO     Creating Initial Weights File    🠆 save/init.pbuf
                                   plan.py:196
           INFO     FL-Plan hash is 9ac97b35acea0baaebe9080256800670e3755757e63e8073c656a48f95b42860786357582ff032b5af0ce9e70f8b9a05                 plan.py:288
           INFO     ['plan_9ac97b35']                                                                              
                                  plan.py:223

                                  ==> fx_aggregator.log <==
[11:11:38] INFO     Sending tasks to collaborator collaborator2 for round 0                                        
                            aggregator.py:414

==> collab1.log <==
           INFO     Building `openfl.pipelines.NoCompressionPipeline` Module.                                      
                                  plan.py:226

==> collab2.log <==
           INFO     Received Tasks: [name: "aggregated_model_validation"                                           
                          collaborator.py:184
                    , name: "train"                                                                                

                    , name: "locally_tuned_model_validation"                                                       

                    ]                                                                                              


==> collab1.log <==
           INFO     Building `openfl.component.collaborator.Collaborator` Module.                                  
                                  plan.py:226
           INFO     Waiting for tasks...                                                                           
                          collaborator.py:234

==> fx_aggregator.log <==
           INFO     Sending tasks to collaborator collaborator1 for round 0                                        
                            aggregator.py:414

==> collab1.log <==
           INFO     Received Tasks: [name: "aggregated_model_validation"                                           
                          collaborator.py:184
                    , name: "train"                                                                                

                    , name: "locally_tuned_model_validation"                                                       

==> fx_aggregator.log <==
[11:13:28] INFO     Collaborator collaborator1 is sending task results for train, round 2                          
                            aggregator.py:634
           INFO     Collaborator collaborator1 is sending task results for locally_tuned_model_validation, round 2                             aggregator.py:634
           INFO     Round 2: Collaborators that have completed all tasks: ['collaborator2', 'collaborator1']                                  aggregator.py:1054
           INFO     Round 2: saved the best model with score 0.969897                                              
                            aggregator.py:960
           INFO     Saving round 2 model...                                                                        
                            aggregator.py:999
           INFO     Experiment Completed. Cleaning up...                                                           
                           aggregator.py:1010

==> collab1.log <==
[11:13:28] INFO     Waiting for tasks...                                                                           
                          collaborator.py:234

==> fx_aggregator.log <==
           INFO     Sending signal to collaborator collaborator1 to shutdown...                                    
                            aggregator.py:361

==> collab1.log <==
           INFO     Received shutdown signal. Exiting...                                                           
                          collaborator.py:199

 ✔️ OK

==> collab2.log <==
[11:13:37] INFO     Waiting for tasks...                                                                           
                          collaborator.py:234

==> fx_aggregator.log <==
[11:13:37] INFO     Sending signal to collaborator collaborator2 to shutdown...                                    
                            aggregator.py:361

==> collab2.log <==
           INFO     Received shutdown signal. Exiting...                                                           
                          collaborator.py:199

 ✔️ OK

==> fx_aggregator.log <==

 ✔️ OK


(venv) perfuser@PerfTestProd12:~/src/clean/openfl$ ll cnn_train_eval/save/
total 5060
drwx------ 2 perfuser perfuser    4096 Jan 10 11:12 .
drwx------ 8 perfuser perfuser    4096 Jan 10 11:11 ..
-rw------- 1 perfuser perfuser 1722974 Jan 10 11:13 best.pbuf
-rw------- 1 perfuser perfuser 1722958 Jan 10 11:10 init.pbuf
-rw------- 1 perfuser perfuser 1722974 Jan 10 11:13 last.pbuf
(venv) perfuser@PerfTestProd12:~/src/clean/openfl$ 

  1. Save the best model
(venv) perfuser@PerfTestProd12:~/src/clean/openfl$ ll cnn_train_eval/save/
total 5060
drwx------ 2 perfuser perfuser    4096 Jan 10 11:12 .
drwx------ 8 perfuser perfuser    4096 Jan 10 11:11 ..
-rw------- 1 perfuser perfuser 1722974 Jan 10 11:13 best.pbuf
-rw------- 1 perfuser perfuser 1722958 Jan 10 11:10 init.pbuf
-rw------- 1 perfuser perfuser 1722974 Jan 10 11:13 last.pbuf
(venv) perfuser@PerfTestProd12:~/src/clean/openfl$ cp cnn_train_eval/save/best.pbuf ~/trained_model.pbuf
(venv) perfuser@PerfTestProd12:~/src/clean/openfl$ cd ~
(venv) perfuser@PerfTestProd12:~$ ll *.pbuf
-rw------- 1 perfuser perfuser 1722974 Jan  9 06:46 best.pbuf
-rw------- 1 perfuser perfuser 1722974 Jan 10 10:44 torch_cnn_mnist_init.pbuf
-rw------- 1 perfuser perfuser 1722974 Jan 10 11:15 trained_model.pbuf
(venv) perfuser@PerfTestProd12:~$ 

  1. Reset the plan defaults to federated-evaluation defaults, using torch_cnn_mnist_fed_eval plan.yaml as reference
    or
    we can also change two setting in plan to make it ready for evaluation
    • rename the task_group name to "evaluation"
    • in aggregator settings section set round_to_train to 1
    • in assigner settings section delete all task from task_group except "aggregated_model_validation"
      in this test we take second approach
+++ b/openfl-workspace/torch_cnn_mnist/plan/plan.yaml
@@ -7,7 +7,7 @@ aggregator:
     db_store_rounds: 2
     init_state_path: save/init.pbuf
     last_state_path: save/last.pbuf
-    rounds_to_train: 2
+    rounds_to_train: 1
     write_logs: false
   template: openfl.component.aggregator.Aggregator
 assigner:
@@ -17,8 +17,6 @@ assigner:
       percentage: 1.0
       tasks:
       - aggregated_model_validation
-      - train
-      - locally_tuned_model_validation
   template: openfl.component.RandomGroupedAssigner
 collaborator:
   settings:
  1. Create a fresh workspace using torch_cnn_mnist with init.pbuf created via plan initialize with random weights
INFO     Parsing Federated Learning Plan : SUCCESS : /home/perfuser/src/clean/openfl/cnn_train_eval/plan/plan.yaml.                       plan.py:193
           INFO     aggregator:                                                                                    
                                  plan.py:198
                      settings:                                                                                    

                        best_state_path: save/best.pbuf                                                            

                        db_store_rounds: 2                                                                         

                        init_state_path: save/init.pbuf                                                            

                        last_state_path: save/last.pbuf                                                            

                        rounds_to_train: 1                                                                         

                        write_logs: false                                                                          

                      template: openfl.component.aggregator.Aggregator                                             

                    assigner:                                                                                      

                      settings:                                                                                    

                        task_groups:                                                                               

                        - name: train_and_validate                                                                 

                          percentage: 1.0                                                                          

                          tasks:                                                                                   

                          - aggregated_model_validation                                                            

                      template: openfl.component.RandomGroupedAssigner                                             

                    collaborator:                                                                                  

                      settings:                                                                                    

                        db_store_rounds: 1                                                                         

                        delta_updates: false                                                                       


           INFO     Building `openfl.pipelines.NoCompressionPipeline` Module.                                      
                                  plan.py:226
           WARNING  tried to remove tensor: __opt_state_needed not present in the tensor dict                      
                                  split.py:94
           WARNING  Following parameters omitted from global initial model, local initialization will determine values: []                           plan.py:186
           INFO     Creating Initial Weights File    🠆 save/init.pbuf
                                   plan.py:196
           INFO     FL-Plan hash is 101573a09ad2c40271ffc1d4b8116f6da0f159cfaf53826f6816ed7c1a74bc123075e478221ebd2322dac4a35c6142cb                 plan.py:288
           INFO     ['plan_101573a0']                                                                              
                                  plan.py:223

 ✔️ OK
  • in the workspaces' save directory replace the init.pbuf with previously saved best.pbuf model file
cd cnn_train_eval/save/
(venv) perfuser@PerfTestProd12:~/src/clean/openfl/cnn_train_eval/save$ ll
total 1692
drwx------ 2 perfuser perfuser    4096 Jan 10 11:23 .
drwx------ 8 perfuser perfuser    4096 Jan 10 11:20 ..
-rw------- 1 perfuser perfuser 1722958 Jan 10 11:23 init.pbuf
(venv) perfuser@PerfTestProd12:~/src/clean/openfl/cnn_train_eval/save$ cp ~/trained_model.pbuf init.pbuf 
(venv) perfuser@PerfTestProd12:~/src/clean/openfl/cnn_train_eval/save$ ll
total 1692
drwx------ 2 perfuser perfuser    4096 Jan 10 11:23 .
drwx------ 8 perfuser perfuser    4096 Jan 10 11:20 ..
-rw------- 1 perfuser perfuser 1722974 Jan 10 11:24 init.pbuf
(venv) perfuser@PerfTestProd12:~/src/clean/openfl/cnn_train_eval/save$ 
  • run the aggregator with --selected_task_group as "evaluation"
==> fx_aggregator.log <==
           WARNING  CutoffTimeBasedStragglerHandling is disabled as straggler_cutoff_time is set to np.inf.           cutoff_time_based_straggler_handling.py:46
           INFO     Building `openfl.component.aggregator.Aggregator` Module.                                      
                                  plan.py:226
           INFO     Skipping round_number check for evaluation task_group                                                  
                            aggregator.py:220
           INFO     Starting Aggregator gRPC Server                                                                
                     aggregator_server.py:347

==> fx_aggregator.log <==
[11:26:44] INFO     Sending tasks to collaborator collaborator1 for round 0                                        
                            aggregator.py:414

==> collab1.log <==
           INFO     Received Tasks: [name: "aggregated_model_validation"                                           
                          collaborator.py:184
                    ]                                                                                              


==> fx_aggregator.log <==
           INFO     Sending tasks to collaborator collaborator2 for round 0                                        
                            aggregator.py:414

==> collab2.log <==
           INFO     Received Tasks: [name: "aggregated_model_validation"                                           
                          collaborator.py:184
                    ]                                                                                              


==> fx_aggregator.log <==
[11:26:46] INFO     Collaborator collaborator2 is sending task results for aggregated_model_validation, round 0                                aggregator.py:634
[11:26:47] INFO     Round 0: Collaborators that have completed all tasks: ['collaborator2']                        
                           aggregator.py:1054
           INFO     Collaborator collaborator1 is sending task results for aggregated_model_validation, round 0                                aggregator.py:634
           INFO     Round 0: Collaborators that have completed all tasks: ['collaborator2', 'collaborator1']                                  aggregator.py:1054

==> collab2.log <==
[11:26:47] INFO     Waiting for tasks...                                                                           
                          collaborator.py:234

==> fx_aggregator.log <==
           INFO     Round 0: saved the best model with score 0.969897                                              
                            aggregator.py:960
           INFO     Saving round 0 model...                                                                        
                            aggregator.py:999
           INFO     Experiment Completed. Cleaning up...                                                           
                           aggregator.py:1010

==> collab1.log <==
[11:26:47] INFO     Waiting for tasks...                                                                           
                          collaborator.py:234

==> fx_aggregator.log <==
           INFO     Sending signal to collaborator collaborator1 to shutdown...                                    
                            aggregator.py:361

==> collab1.log <==
           INFO     Received shutdown signal. Exiting...                                                           
                          collaborator.py:199

 ✔️ OK

==> collab2.log <==
[11:26:57] INFO     Waiting for tasks...                                                                           
                          collaborator.py:234

==> fx_aggregator.log <==
[11:26:57] INFO     Sending signal to collaborator collaborator2 to shutdown...                                    
                            aggregator.py:361

==> collab2.log <==
           INFO     Received shutdown signal. Exiting...                                                           
                          collaborator.py:199

 ✔️ OK

==> fx_aggregator.log <==

 ✔️ OK

Since the only task executed was evaulation / aggregated_model_validation the best and last pbuf files as seen below are still random weights but the init is the replaced trained model pbuf and same was used for reporting the accuracy.

ll cnn_train_eval/save/
total 5060
drwx------ 2 perfuser perfuser    4096 Jan 10 11:26 .
drwx------ 8 perfuser perfuser    4096 Jan 10 11:26 ..
-rw------- 1 perfuser perfuser 1722958 Jan 10 11:26 best.pbuf
-rw------- 1 perfuser perfuser 1722974 Jan 10 11:24 init.pbuf
-rw------- 1 perfuser perfuser 1722958 Jan 10 11:26 last.pbuf
(venv) perfuser@PerfTestProd12:~/src/clean/openfl$ 

@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch from 1c23479 to c7fecc5 Compare January 10, 2025 05:00
@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch 6 times, most recently from fb94d77 to df65195 Compare January 10, 2025 15:56
@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch 5 times, most recently from 1008b8f to 7c7a82a Compare January 10, 2025 16:56
@ishaileshpant ishaileshpant changed the title [Please don't merge] Enhance aggregator to skip round check for evaluation only flow Enhance aggregator to skip round check for evaluation only flow Jan 12, 2025
@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch 2 times, most recently from 6b75d7b to ded0375 Compare January 12, 2025 13:42
openfl/interface/aggregator.py Outdated Show resolved Hide resolved
openfl/interface/aggregator.py Outdated Show resolved Hide resolved
openfl/interface/aggregator.py Outdated Show resolved Hide resolved
@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch from ded0375 to 17f4d99 Compare January 13, 2025 08:54
Copy link
Collaborator

@teoparvanov teoparvanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing this, @ishaileshpant! This way, OpenFL 1.7+ users would be able to use FedEval in a meaningful way (more improvements coming in 1.8).

PS: I just have one more suggestion, in addition to the latest set of comments by @MasterSkepticista

@ishaileshpant ishaileshpant requested a review from psfoley January 13, 2025 13:56
Copy link
Collaborator

@kta-intel kta-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ishaileshpant - very glad to see you were able to run an evaluation flow on a pretrained model. I have some comments that I think will make this PR more "standalone"

@kta-intel kta-intel self-requested a review January 13, 2025 19:37
Copy link
Collaborator

@kta-intel kta-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted offline, this PR successfully adds an aggregator CLI flag to set the task group and enables ability to run an FL round on a pretrained model.

My other comments can be addressed in #1226 or related PR

Thanks @ishaileshpant !

@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch from 17f4d99 to 52453e4 Compare January 14, 2025 18:57
@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch 8 times, most recently from b8f8e49 to 11692ab Compare January 15, 2025 13:33
Copy link
Collaborator

@teoparvanov teoparvanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, @ishaileshpant ! I've just caught a couple of typos to correct in the tutorial document. Other than that, the PR should be good to go!

docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
docs/about/features_index/fed_eval.rst Outdated Show resolved Hide resolved
@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch 3 times, most recently from 22441a1 to 5d4f3a2 Compare January 15, 2025 14:14
@ishaileshpant
Copy link
Collaborator Author

Nice work, @ishaileshpant ! I've just caught a couple of typos to correct in the tutorial document. Other than that, the PR should be good to go!

Fixed all the typos and nit @teoparvanov

@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch 2 times, most recently from 823599b to 05fc4ce Compare January 15, 2025 14:27
…rning"

- enhance Aggregator to take selected_task_group attribute to enable fedeval or learning switching at aggregator level
- rebase 16.Jan.1
- fix aggregator cli test cases as per new "selected_task_group" field in start
- changed default assigner task_group name to "learning" and "evaluation"
- updated worspaces to use new task_group names - learning / evaluation
- updated as per review comments
- update the FedEval documentation with e2e usage steps
- Rebased 15-Jan.1
- Fixed docs indentation issue,reduced the verbosity in doc
Signed-off-by: Shailesh Pant <[email protected]>
@ishaileshpant ishaileshpant force-pushed the mode_switching_aggregator branch from 05fc4ce to f3211e7 Compare January 16, 2025 09:53
@teoparvanov teoparvanov merged commit 69a2ceb into securefederatedai:develop Jan 16, 2025
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants