Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support hpo in distributed #524

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions federatedscope/autotune/baseline/distributed_xgb_client_1_hpo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use_gpu: True
device: 0
early_stop:
patience: 5
seed: 12345
federate:
client_num: 2
mode: 'distributed'
make_global_eval: False
online_aggr: False
total_round_num: 20
distribute:
use: True
server_host: '127.0.0.1'
server_port: 50051
client_host: '127.0.0.1'
client_port: 50052
role: 'client'
data_idx: 1
model:
type: xgb_tree
lambda_: 0.1
gamma: 0
num_of_trees: 10
train:
optimizer:
lr: 0.5
# learning rate for xgb model
eta: 0.5
data:
root: data/
type: adult
splits: [1.0, 0.0]
args: [{normalization: False, standardization: True}]
feat_engr:
scenario: vfl
dataloader:
type: raw
batch_size: 50
criterion:
type: CrossEntropyLoss
trainer:
type: verticaltrainer
vertical:
use: True
key_size: 256
dims: [7, 14]
algo: 'xgb'
eval:
freq: 5
best_res_update_round_wise_key: test_loss
hpo:
use: True
scheduler: sha
num_workers: 0
init_cand_num: 9
ss: 'federatedscope/autotune/baseline/vfl_ss.yaml'
sha:
budgets: [ 3, 9 ]
elim_rate: 3
iter: 1
metric: 'server_global_eval.test_loss'
working_folder: sha
63 changes: 63 additions & 0 deletions federatedscope/autotune/baseline/distributed_xgb_client_2_hpo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use_gpu: True
device: 0
early_stop:
patience: 5
seed: 12345
federate:
client_num: 2
mode: 'distributed'
make_global_eval: False
online_aggr: False
total_round_num: 20
distribute:
use: True
server_host: '127.0.0.1'
server_port: 50051
client_host: '127.0.0.1'
client_port: 50053
role: 'client'
data_idx: 2
model:
type: xgb_tree
lambda_: 0.1
gamma: 0
num_of_trees: 10
train:
optimizer:
lr: 0.5
# learning rate for xgb model
eta: 0.5
data:
root: data/
type: adult
splits: [1.0, 0.0]
args: [{normalization: False, standardization: True}]
feat_engr:
scenario: vfl
dataloader:
type: raw
batch_size: 50
criterion:
type: CrossEntropyLoss
trainer:
type: verticaltrainer
vertical:
use: True
key_size: 256
dims: [7, 14]
algo: 'xgb'
eval:
freq: 5
best_res_update_round_wise_key: test_loss
hpo:
use: True
scheduler: sha
num_workers: 0
init_cand_num: 9
ss: 'federatedscope/autotune/baseline/vfl_ss.yaml'
sha:
budgets: [ 3, 9 ]
elim_rate: 3
iter: 1
metric: 'server_global_eval.test_loss'
working_folder: sha
61 changes: 61 additions & 0 deletions federatedscope/autotune/baseline/distributed_xgb_server_hpo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use_gpu: True
device: 0
early_stop:
patience: 5
seed: 12345
federate:
client_num: 2
mode: 'distributed'
make_global_eval: False
online_aggr: False
total_round_num: 20
distribute:
use: True
server_host: '127.0.0.1'
server_port: 50051
role: 'server'
data_idx: 0
model:
type: xgb_tree
lambda_: 0.1
gamma: 0
num_of_trees: 10
train:
optimizer:
lr: 0.5
# learning rate for xgb model
eta: 0.5
data:
root: data/
type: adult
splits: [1.0, 0.0]
args: [{normalization: False, standardization: True}]
feat_engr:
scenario: vfl
dataloader:
type: raw
batch_size: 50
criterion:
type: CrossEntropyLoss
trainer:
type: verticaltrainer
vertical:
use: True
key_size: 256
dims: [7, 14]
algo: 'xgb'
eval:
freq: 5
best_res_update_round_wise_key: test_loss
hpo:
use: True
scheduler: sha
num_workers: 0
init_cand_num: 9
ss: 'federatedscope/autotune/baseline/vfl_ss.yaml'
sha:
budgets: [ 3, 9 ]
elim_rate: 3
iter: 1
metric: 'server_global_eval.test_loss'
working_folder: sha
1 change: 1 addition & 0 deletions federatedscope/autotune/baseline/fedhpo_vfl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ eval:
freq: 5
best_res_update_round_wise_key: test_loss
hpo:
use: True
scheduler: sha
num_workers: 0
init_cand_num: 9
Expand Down
13 changes: 13 additions & 0 deletions federatedscope/autotune/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import MutableMapping

import yaml
import logging
import pandas as pd
Expand Down Expand Up @@ -312,3 +314,14 @@ def log2wandb(trial, config, results, trial_cfg):
trial_cfg.hpo.metric: results[key1][key2],
}
wandb.log(log_res)


def flatten_dict(d, parent_key='', sep='.'):
items = []
for key, value in d.items():
new_key = parent_key + sep + key if parent_key else key
if isinstance(value, MutableMapping):
items.extend(flatten_dict(value, new_key, sep=sep).items())
else:
items.append((new_key, value))
return dict(items)
1 change: 0 additions & 1 deletion federatedscope/core/auxiliaries/worker_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def get_server_cls(cfg):
if cfg.hpo.fedex.use:
from federatedscope.autotune.fedex import FedExServer
return FedExServer

if cfg.hpo.fts.use:
from federatedscope.autotune.fts import FTSServer
return FTSServer
Expand Down
1 change: 1 addition & 0 deletions federatedscope/core/configs/cfg_hpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def extend_hpo_cfg(cfg):
# hpo related options
# ---------------------------------------------------------------------- #
cfg.hpo = CN()
cfg.hpo.use = False
cfg.hpo.trial_index = 0
cfg.hpo.working_folder = 'hpo'
cfg.hpo.ss = ''
Expand Down
6 changes: 3 additions & 3 deletions federatedscope/core/configs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ def assert_cfg(self, check_cfg=True):
def clean_unused_sub_cfgs(self):
"""
Clean the un-used secondary-level CfgNode, whose ``.use`` \
attribute is ``True``
attribute is ``True`` except `hpo`
"""
for v in self.values():
for key, v in self.items():
if isinstance(v, CfgNode) or isinstance(v, CN):
# sub-config
if hasattr(v, "use") and v.use is False:
if hasattr(v, "use") and v.use is False and key != 'hpo':
for k in copy.deepcopy(v).keys():
# delete the un-used attributes
if k == "use":
Expand Down
9 changes: 4 additions & 5 deletions federatedscope/core/feature/vfl/preprocess/instance_norm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def wrap_instance_norm_server(worker):
Returns:
Wrap vfl server with instance norm.
"""
def trigger_for_feat_engr(self,
trigger_train_func,
kwargs_for_trigger_train_func={}):
def trigger_for_train(self,
trigger_train_func,
kwargs_for_trigger_train_func={}):
# broadcast_model_para_func after feature engineering finishing
self.trigger_train_func = trigger_train_func
self.kwargs_for_trigger_train_func = \
Expand Down Expand Up @@ -108,8 +108,7 @@ def callback_func_for_ss_instance_sum_norm_square(self, message: Message):
self.trigger_train_func(**self.kwargs_for_trigger_train_func)

# Bind method to instance
worker.trigger_for_feat_engr = types.MethodType(trigger_for_feat_engr,
worker)
worker.trigger_for_train = types.MethodType(trigger_for_train, worker)
worker.callback_func_for_ss_instance_sum = types.MethodType(
callback_func_for_ss_instance_sum, worker)
worker.callback_func_for_ss_instance_sum_norm_square = types.MethodType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def wrap_correlation_filter_server(worker):
Returns:
Wrap vfl server with correlation_filter.
"""
def trigger_for_feat_engr(self,
trigger_train_func,
kwargs_for_trigger_train_func={}):
def trigger_for_train(self,
trigger_train_func,
kwargs_for_trigger_train_func={}):
logger.info('Start to execute correlation_filter, which requires FHE.')

self.msg_buffer['feat_dim'] = {}
Expand Down Expand Up @@ -96,8 +96,7 @@ def callbacks_funcs_for_feat_dim(self, message: Message):
self.trigger_train_func(**self.kwargs_for_trigger_train_func)

# Bind method to instance
worker.trigger_for_feat_engr = types.MethodType(trigger_for_feat_engr,
worker)
worker.trigger_for_train = types.MethodType(trigger_for_train, worker)
worker.callback_funcs_for_en_feat_corrcoef = types.MethodType(
callback_funcs_for_en_feat_corrcoef, worker)
worker.callbacks_funcs_for_feat_dim = types.MethodType(
Expand Down
9 changes: 4 additions & 5 deletions federatedscope/core/feature/vfl/selection/iv_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ def wrap_iv_filter_server(worker):
Returns:
Wrap vfl server with iv_filter.
"""
def trigger_for_feat_engr(self,
trigger_train_func,
kwargs_for_trigger_train_func={}):
def trigger_for_train(self,
trigger_train_func,
kwargs_for_trigger_train_func={}):
logger.info('Start to execute woe_filter, which requires HE.')
self.trigger_train_func = trigger_train_func
self.kwargs_for_trigger_train_func = \
Expand Down Expand Up @@ -78,8 +78,7 @@ def callbacks_funcs_for_feat_dim(self, message: Message):
self.trigger_train_func(**self.kwargs_for_trigger_train_func)

# Bind method to instance
worker.trigger_for_feat_engr = types.MethodType(trigger_for_feat_engr,
worker)
worker.trigger_for_train = types.MethodType(trigger_for_train, worker)
worker.callbacks_funcs_for_feat_dim = types.MethodType(
callbacks_funcs_for_feat_dim, worker)

Expand Down
21 changes: 19 additions & 2 deletions federatedscope/core/fed_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,16 @@ def _setup_server(self, resource_info=None, client_resource_info=None):
from federatedscope.core.workers.wrapper import wrap_swa_server
server = wrap_swa_server(server)
logger.info('Server has been set up ... ')
return self.feat_engr_wrapper_server(server)

if self.cfg.feat_engr.type:
server = self.feat_engr_wrapper_server(server)

if self.cfg.federate.mode == 'distributed' and self.cfg.hpo.use:
from federatedscope.core.workers.wrapper import \
wrap_autotune_server
server = wrap_autotune_server(server)

return server

def _setup_client(self,
client_id=-1,
Expand Down Expand Up @@ -223,7 +232,15 @@ def _setup_client(self,
else:
logger.info(f'Client {client_id} has been set up ... ')

return self.feat_engr_wrapper_client(client)
if self.cfg.feat_engr.type:
client = self.feat_engr_wrapper_client(client)

if self.cfg.federate.mode == 'distributed' and self.cfg.hpo.use:
from federatedscope.core.workers.wrapper import \
wrap_autotune_client
client = wrap_autotune_client(client)

return client

def check(self):
"""
Expand Down
2 changes: 2 additions & 0 deletions federatedscope/core/workers/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def __init__(self,
if config is None:
return

self.args, self.kwargs = args, kwargs

# the unseen_client indicates that whether this client contributes to
# FL process by training on its local data and uploading the local
# model update, which is useful for check the participation
Expand Down
Loading