implementation of Round class of the node component
Attributes¶
Classes¶
Round ¶
Round(root_dir, db, node_id, node_name, training_plan, training_plan_class, model_kwargs, training_kwargs, training, dataset, params, experiment_id, researcher_id, history_monitor, aggregator_args, node_args, tp_security_manager, round_number=0, dlp_and_loading_block_metadata=None, aux_vars=None)
This class represents the training part execute by a node in a given round
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
root_dir | str | Root fedbiomed directory where node instance files will be stored. | required |
db | str | Path to node database file. | required |
node_id | str | Node id | required |
node_name | str | Node name (Hospital name) | required |
training_plan | str | code of the training plan for this round | required |
training_plan_class | str | class name of the training plan | required |
model_kwargs | dict | contains model args. Defaults to None. | required |
training_kwargs | dict | contains training arguments. Defaults to None. | required |
training | bool | whether to perform a model training or just to perform a validation check (model inferring) | required |
dataset | dict | dataset details to use in this round. It contains the dataset name, dataset's id, data path, its shape, its description... . Defaults to None. | required |
params | str | parameters of the model | required |
experiment_id | str | experiment id | required |
researcher_id | str | researcher id | required |
history_monitor | HistoryMonitor | Sends real-time feed-back to end-user during training | required |
aggregator_args | Dict[str, Any] | Arguments managed by and shared with the researcher-side aggregator. | required |
node_args | Dict | command line arguments for node. Can include: - | required |
tp_security_manager | TrainingPlanSecurityManager | Training plan security manager instance. | required |
dlp_and_loading_block_metadata | Optional[Tuple[dict, List[dict]]] | Data loading plan to apply, or None if no DLP for this round. | None |
round_number | int | number of the iteration for this experiment | 0 |
aux_vars | Optional[Dict[str, AuxVar]] | Optional optimizer auxiliary variables. | None |
Source code in fedbiomed/node/round.py
def __init__(
self,
root_dir: str,
db: str,
node_id: str,
node_name: str,
training_plan: str,
training_plan_class: str,
model_kwargs: dict,
training_kwargs: dict,
training: bool,
dataset: dict,
params: str,
experiment_id: str,
researcher_id: str,
history_monitor: HistoryMonitor,
aggregator_args: Dict[str, Any],
node_args: Dict,
tp_security_manager: TrainingPlanSecurityManager,
round_number: int = 0,
dlp_and_loading_block_metadata: Optional[Tuple[dict, List[dict]]] = None,
aux_vars: Optional[Dict[str, AuxVar]] = None,
) -> None:
"""Constructor of the class
Args:
root_dir: Root fedbiomed directory where node instance files will be stored.
db: Path to node database file.
node_id: Node id
node_name: Node name (Hospital name)
training_plan: code of the training plan for this round
training_plan_class: class name of the training plan
model_kwargs: contains model args. Defaults to None.
training_kwargs: contains training arguments. Defaults to None.
training: whether to perform a model training or just to perform a validation check (model inferring)
dataset: dataset details to use in this round. It contains the dataset name, dataset's id,
data path, its shape, its description... . Defaults to None.
params: parameters of the model
experiment_id: experiment id
researcher_id: researcher id
history_monitor: Sends real-time feed-back to end-user during training
aggregator_args: Arguments managed by and shared with the
researcher-side aggregator.
node_args: command line arguments for node. Can include:
- `gpu (bool)`: propose use a GPU device if any is available.
- `gpu_num (Union[int, None])`: if not None, use the specified GPU device instead of default
GPU device if this GPU device is available.
- `gpu_only (bool)`: force use of a GPU device if any available, even if researcher
doesn't request for using a GPU.
tp_security_manager: Training plan security manager instance.
dlp_and_loading_block_metadata: Data loading plan to apply, or None if no DLP for this round.
round_number: number of the iteration for this experiment
aux_vars: Optional optimizer auxiliary variables.
"""
self._node_id = node_id
self._node_name = node_name
self._db = db
self._dir = root_dir
self.dataset = dataset
self.training_plan_source = training_plan
self.training_plan_class = training_plan_class
self.params = params
self.experiment_id = experiment_id
self.researcher_id = researcher_id
self.history_monitor = history_monitor
self.aggregator_args = aggregator_args
self.aux_vars = aux_vars or {}
self.node_args = node_args
self.training = training
self._dlp_and_loading_block_metadata = dlp_and_loading_block_metadata
self.training_kwargs = training_kwargs
self.model_arguments = model_kwargs
# Class attributes
self.tp_security_manager = tp_security_manager
self.training_plan = None
self.testing_arguments = None
self.loader_arguments = None
self.training_arguments = None
self._secure_aggregation = None
self.is_test_data_shuffled: bool = False
self._testing_indexes: Dict = {
"testing_index": [],
"training_index": [],
"test_ratio": None,
}
self._round = round_number
self._node_state_manager: NodeStateManager = NodeStateManager(
self._dir, self._node_id, self._db
)
self._temp_dir = tempfile.TemporaryDirectory()
self._keep_files_dir = self._temp_dir.name
self._persistent_model_weights = None
Attributes¶
Functions¶
collect_optim_aux_var ¶
collect_optim_aux_var()
Collect auxiliary variables from the wrapped Optimizer, if any, and remove auxiliary variables related to local parameters, if any.
If the TrainingPlan does not use a Fed-BioMed Optimizer, return an empty dict. If it does not hold any BaseOptimizer however, raise a FedbiomedRoundError.
Returns:
| Type | Description |
|---|---|
Dict[str, AuxVar] | Auxiliary variables, as a |
Source code in fedbiomed/node/round.py
def collect_optim_aux_var(
self,
) -> Dict[str, AuxVar]:
"""Collect auxiliary variables from the wrapped Optimizer, if any,
and remove auxiliary variables related to local parameters, if any.
If the TrainingPlan does not use a Fed-BioMed Optimizer, return an
empty dict. If it does not hold any BaseOptimizer however, raise a
FedbiomedRoundError.
Returns:
Auxiliary variables, as a `{module_name: module_auxvar}` dict.
"""
optimizer = self._get_base_optimizer()
if isinstance(optimizer.optimizer, Optimizer):
full_aux_var = optimizer.optimizer.get_aux()
# Remove auxiliary variables related to local parameters
full_model_weights = self.training_plan.get_model_params(
only_trainable=False, exclude_buffers=False, local_params=None
)
local_params = list(
self.training_plan.filter_model_params_by_tags(
full_model_weights, required_tags={"local"}
).keys()
)
aux_var = optimizer.optimizer.filter_aux(full_aux_var, local_params)
return aux_var
return {}
initialize_arguments ¶
initialize_arguments(previous_state_id=None)
Initializes arguments for training and testing and the NodeStateManager, the latter handling Node state loading and saving.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
previous_state_id | Optional[str] | previous Node state id. Defaults to None (which is the state_id default value for the first Round). | None |
Returns:
| Type | Description |
|---|---|
Optional[Dict[str, Any]] | A dictionary containing the error message if an error is triggered while parsing training and testing |
Optional[Dict[str, Any]] | arguments, None otherwise. |
!!! "Note" If secure aggregation is activated, model weights will be encrypted as well as the optimizer's auxiliary variables (only if the optimizer used is a DeclearnOptimizer).
Source code in fedbiomed/node/round.py
def initialize_arguments(
self, previous_state_id: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""Initializes arguments for training and testing and the NodeStateManager, the latter handling
Node state loading and saving.
Args:
previous_state_id: previous Node state id. Defaults to None (which is the state_id default value for the first Round).
Returns:
A dictionary containing the error message if an error is triggered while parsing training and testing
arguments, None otherwise.
!!! "Note"
If secure aggregation is activated, model weights will be encrypted as well as the
optimizer's auxiliary variables (only if the optimizer used is a `DeclearnOptimizer`).
"""
# initialize Node State Manager
self._node_state_manager.initialize(
previous_state_id=previous_state_id, testing=not self.training
)
return self._initialize_validate_training_arguments()
process_optim_aux_var ¶
process_optim_aux_var()
Process researcher-emitted Optimizer auxiliary variables, if any. Reset auxiliary variables to zero vectors for local parameters.
Returns:
| Type | Description |
|---|---|
Optional[str] | Error message, empty if the operation was successful. |
Source code in fedbiomed/node/round.py
def process_optim_aux_var(self) -> Optional[str]:
"""Process researcher-emitted Optimizer auxiliary variables, if any.
Reset auxiliary variables to zero vectors for local parameters.
Returns:
Error message, empty if the operation was successful.
"""
# Early-exit if there are no auxiliary variables to process.
if not any(self.aux_vars):
return None
# Fetch the training plan's BaseOptimizer.
try:
optimizer = self._get_base_optimizer()
except FedbiomedRoundError as exc:
return str(exc)
# Verify that the BaseOptimizer wraps an Optimizer.
if not isinstance(optimizer.optimizer, Optimizer):
return (
"Received Optimizer auxiliary variables, but the "
"TrainingPlan does not manage a compatible Optimizer."
)
# Pass auxiliary variables to the Optimizer.
try:
full_model_weights = self.training_plan.get_model_params(
only_trainable=False, exclude_buffers=False, local_params=None
)
local_params = list(
self.training_plan.filter_model_params_by_tags(
full_model_weights, required_tags={"local"}
).keys()
)
aux_vars = optimizer.optimizer.restore_aux(
self.aux_vars, full_model_weights, local_params
)
optimizer.optimizer.set_aux(aux_vars)
except FedbiomedOptimizerError as exc:
return (
"TrainingPlan Optimizer failed to ingest the provided "
f"auxiliary variables: {repr(exc)}"
)
# early stop if secagg is activated and optimizer has more than one module that accepts
# auxiliary variable
if optimizer.count_nb_auxvar() > 1 and self._secure_aggregation.use_secagg:
return (
"Can not parse more than one `declearn` module requiring auxiliary variables while"
" Secure Aggregation activated. Aborting..."
)
return None
run_model_training ¶
run_model_training(tp_approval, secagg_insecure_validation, secagg_active, force_secagg, secagg_arguments=None)
Runs one round of model training
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tp_approval | bool | True if training plan approval by node is requested | required |
secagg_insecure_validation | bool | True if (potentially insecure) consistency check is enabled | required |
secagg_active | bool | True if secure aggregation is enabled on node | required |
force_secagg | bool | True is secure aggregation is mandatory on node | required |
secagg_arguments | Union[Dict[str, Any], None] | arguments for secure aggregation, some are specific to the scheme | None |
Returns:
| Type | Description |
|---|---|
TrainReply | Returns the corresponding node message, training reply instance |
Source code in fedbiomed/node/round.py
def run_model_training(
self,
tp_approval: bool,
secagg_insecure_validation: bool,
secagg_active: bool,
force_secagg: bool,
secagg_arguments: Union[Dict[str, Any], None] = None,
) -> TrainReply:
"""Runs one round of model training
Args:
tp_approval: True if training plan approval by node is requested
secagg_insecure_validation: True if (potentially insecure) consistency check is enabled
secagg_active: True if secure aggregation is enabled on node
force_secagg: True is secure aggregation is mandatory on node
secagg_arguments: arguments for secure aggregation, some are specific to the scheme
Returns:
Returns the corresponding node message, training reply instance
"""
dataset_id = (
self.dataset.get("dataset_id") if isinstance(self.dataset, dict) else None
)
dp_active = (
self.training_arguments.get("dp_args") is not None
if self.training_arguments is not None
else None
)
logger.debug(
f"Starting round execution: node_id={self._node_id} "
f"experiment={self.experiment_id} round={self._round} "
f"training={self.training} dataset={dataset_id} "
f"secagg_active={secagg_active} force_secagg={force_secagg} "
f"dp_active={dp_active} secagg_args_keys={sorted((secagg_arguments or {}).keys())}"
)
# Validate secagg status. Raises error if the training request is not compatible with
# secure aggregation settings
try:
self._secure_aggregation = SecaggRound(
db=self._db,
node_id=self._node_id,
secagg_arguments=secagg_arguments,
secagg_active=secagg_active,
force_secagg=force_secagg,
experiment_id=self.experiment_id,
)
except FedbiomedSecureAggregationError as e:
logger.error(repr(e))
logger.debug(
f"Secure aggregation configuration error details: {traceback.format_exc()}"
)
return self._send_round_reply(
success=False, message="Could not configure secure aggregation on node"
)
# Validate and load training plan
if tp_approval:
approved, training_plan_ = (
self.tp_security_manager.check_training_plan_status(
self.training_plan_source, TrainingPlanApprovalStatus.APPROVED
)
)
if not approved:
if training_plan_ is None:
logger.info("Training plan is not registered on this node.")
status_msg = "not registered"
else:
logger.info(
f"Training plan '{training_plan_['name']}' is not approved by this node."
)
status_msg = "not approved"
return self._send_round_reply(
False,
f"Training plan is {status_msg} on node id={self._node_id} name={self._node_name}",
)
else:
logger.info(
f"Training plan has been approved by the node "
f"{training_plan_['name']} researcher_id={self.researcher_id}"
)
# Import training plan, save to file, reload, instantiate a training plan
try:
CurrentTPModule, CurrentTrainingPlan = utils.import_class_from_spec(
code=self.training_plan_source, class_name=self.training_plan_class
)
self.training_plan = CurrentTrainingPlan()
except Exception as e:
error_message = "Cannot instantiate training plan object."
logger.error(f"{error_message} Details: {repr(e)}")
logger.debug(
f"Training plan instantiation error details: {traceback.format_exc()}"
)
return self._send_round_reply(success=False, message=error_message)
# save and load training plan to a file to be sure
# 1. a file is associated to training plan so we can read its source, etc.
# 2. all dependencies are applied
training_plan_module = "model_" + str(uuid.uuid4())
training_plan_file = os.path.join(
self._keep_files_dir, training_plan_module + ".py"
)
try:
self.training_plan.save_code(
training_plan_file, from_code=self.training_plan_source
)
except Exception as e:
error_message = "Cannot save the training plan to a local tmp dir"
logger.error(f"Cannot save the training plan to a local tmp dir : {e}")
logger.debug(f"Training plan save error details: {traceback.format_exc()}")
return self._send_round_reply(success=False, message=error_message)
del CurrentTrainingPlan
del CurrentTPModule
try:
CurrentTPModule, self.training_plan = utils.import_class_object_from_file(
training_plan_file, self.training_plan_class
)
except Exception as e:
error_message = "Cannot load training plan object from file."
logger.error(f"{error_message} Details: {repr(e)}")
logger.debug(f"Training plan load error details: {traceback.format_exc()}")
return self._send_round_reply(success=False, message=error_message)
try:
self.training_plan.post_init(
model_args=self.model_arguments,
training_args=self.training_arguments,
aggregator_args=self.aggregator_args,
node_id=self._node_id,
)
logger.debug(
f"Training plan initialized for round: experiment={self.experiment_id} "
f"round={self._round} plan={self.training_plan.__class__.__name__} "
f"training={self.training} dp_active={self.training_arguments.get('dp_args') is not None} "
f"aggregator={self.aggregator_args.get('aggregator_name') if self.aggregator_args else None}",
)
except Exception as e:
error_message = "Can't initialize training plan with the arguments."
logger.error(f"{error_message} Details: {repr(e)}")
logger.debug(
f"Training plan initialization error details: {traceback.format_exc()}"
)
return self._send_round_reply(success=False, message=error_message)
# load all initial models weights
initial_model_weights = self.training_plan.get_model_params(
only_trainable=False, exclude_buffers=False, local_params=None
)
# load node state
previous_state_id = self._node_state_manager.previous_state_id
if previous_state_id is not None:
try:
self._load_round_state(previous_state_id)
except Exception as e:
logger.error(f"Can't read previous node state. Details: {repr(e)}")
logger.debug(
f"Previous node state load error details: {traceback.format_exc()}"
)
# don't send error details to researcher
return self._send_round_reply(
success=False, message="Can't read previous node state."
)
# Reconstruct full parameters
full_model_weights = self._reconstruct_full_params(
initial_model_weights, self.params, self._persistent_model_weights
)
# Load full model parameters
try:
self.training_plan.set_model_params(full_model_weights, local_params=None)
except Exception as e:
error_message = "Cannot initialize model parameters."
logger.error(f"{error_message} Details: {repr(e)}")
logger.debug(
f"Model parameters initialization error details: {traceback.format_exc()}"
)
return self._send_round_reply(success=False, message=error_message)
# ---------------------------------------------------------------------
# Process Optimizer auxiliary variables, if any.
try:
error_message = self.process_optim_aux_var()
except Exception as e:
logger.error(
f"Error while processing optimizer auxiliary variables: Details: {repr(e)}"
)
logger.debug(
f"Optimizer auxiliary variables processing error details: {traceback.format_exc()}"
)
return self._send_round_reply(success=False, message=error_message)
if error_message is not None:
logger.error(
f"Error while processing optimizer auxiliary variables: {error_message}"
)
return self._send_round_reply(success=False, message=error_message)
# Split training and validation data -------------------------------------
try:
self._set_training_testing_data_loaders()
except FedbiomedError as fe:
error_message = f"Can not create validation/train data: {repr(fe)}"
logger.error(error_message)
logger.debug(
f"Validation/train data creation error details: {traceback.format_exc()}"
)
return self._send_round_reply(success=False, message=error_message)
except Exception as e:
error_message = (
f"Undetermined error while creating data for training/validation. Can not create "
f"validation/train data: {repr(e)}"
)
logger.error(error_message)
logger.debug(
f"Validation/train data creation error details: {traceback.format_exc()}"
)
return self._send_round_reply(success=False, message=error_message)
# ------------------------------------------------------------------------
# Validation Before Training
if self.testing_arguments.get("test_on_global_updates", False) is not False:
# Last control to make sure validation data loader is set.
if self.training_plan.testing_data_loader is not None:
try:
self.training_plan.testing_routine(
metric=self.testing_arguments.get("test_metric", None),
metric_args=self.testing_arguments.get("test_metric_args", {}),
history_monitor=self.history_monitor,
before_train=True,
)
except FedbiomedError as e:
logger.error(
f"{ErrorNumbers.FB314}: During the validation phase on global parameter updates; "
f"{repr(e)}",
researcher_id=self.researcher_id,
)
logger.debug(
f"Validation on global parameter updates error details: {traceback.format_exc()}"
)
except Exception as e:
logger.error(
f"Undetermined error during the testing phase on global parameter updates: "
f"{repr(e)}",
researcher_id=self.researcher_id,
)
logger.debug(
f"Validation on global parameter updates error details: {traceback.format_exc()}"
)
else:
logger.error(
f"{ErrorNumbers.FB314}: Can not execute validation routine due to missing testing dataset"
f"Please make sure that `test_ratio` has been set correctly",
researcher_id=self.researcher_id,
)
# If training is activated.
if self.training:
logger.debug(
"Executing training phase for round: experiment=%s round=%s dataset=%s has_testing_loader=%s has_training_loader=%s",
self.experiment_id,
self._round,
dataset_id,
getattr(self.training_plan, "testing_data_loader", None) is not None,
getattr(self.training_plan, "training_data_loader", None) is not None,
)
results = {} # type: Dict[str, Any]
# Perform the training round.
if self.training_plan.training_data_loader is not None:
try:
rtime_before = time.perf_counter()
ptime_before = time.process_time()
self.training_plan.training_routine(
history_monitor=self.history_monitor, node_args=self.node_args
)
rtime_after = time.perf_counter()
ptime_after = time.process_time()
except Exception as exc:
error_message = f"Cannot train model in round: {repr(exc)}"
logger.error(error_message)
logger.debug(f"Training error details: {traceback.format_exc()}")
return self._send_round_reply(success=False, message=error_message)
# Collect Optimizer auxiliary variables, if any.
try:
results["optim_aux_var"] = self.collect_optim_aux_var()
except (FedbiomedOptimizerError, FedbiomedRoundError) as exc:
error_message = (
f"Cannot collect Optimizer auxiliary variables: {repr(exc)}"
)
logger.error(error_message)
logger.debug(
f"Optimizer auxiliary variables collecting error details: {traceback.format_exc()}"
)
return self._send_round_reply(success=False, message=error_message)
# Validation after training
if self.testing_arguments.get("test_on_local_updates", False) is not False:
if self.training_plan.testing_data_loader is not None:
try:
self.training_plan.testing_routine(
metric=self.testing_arguments.get("test_metric", None),
metric_args=self.testing_arguments.get(
"test_metric_args", {}
),
history_monitor=self.history_monitor,
before_train=False,
)
except FedbiomedError as e:
logger.error(
f"{ErrorNumbers.FB314.value}: During the validation phase on local parameter updates; "
f"{repr(e)}",
researcher_id=self.researcher_id,
)
logger.debug(
f"Validation on local parameter updates error details: {traceback.format_exc()}"
)
except Exception as e:
logger.error(
f"Undetermined error during the validation phase on local parameter updates"
f"{repr(e)}",
researcher_id=self.researcher_id,
)
logger.debug(
f"Validation on local parameter updates error details: {traceback.format_exc()}"
)
else:
logger.error(
f"{ErrorNumbers.FB314.value}: Can not execute validation routine due to missing testing "
f"dataset please make sure that test_ratio has been set correctly",
researcher_id=self.researcher_id,
)
# FIXME: this will fail if `self.training_plan.training_data_loader = None` (see issue )
results["sample_size"] = len(
self.training_plan.training_data_loader.dataset
)
logger.debug(
f"Collected round outputs before reply assembly: experiment={self.experiment_id} "
f"round={self._round} sample_size={results['sample_size']} "
f"has_aux_var={results['optim_aux_var'] is not None} "
f"dp_active={self.training_arguments.get('dp_args') is not None} "
f"flatten_for_secagg={self._secure_aggregation.use_secagg}"
)
results["encrypted"] = False
model_weights = self.training_plan.after_training_params(
flatten=self._secure_aggregation.use_secagg
)
logger.debug(
f"Collected training parameters for round reply: experiment={self.experiment_id} "
f"round={self._round} parameter_count={len(model_weights)} "
f"secagg_enabled={self._secure_aggregation.use_secagg}"
)
if self._secure_aggregation.use_secagg:
logger.debug(
f'SecAgg active: encrypting model parameters with the secure aggregation scheme "{self._secure_aggregation.scheme.__class__.__name__}" for round {self._round}'
)
if results["optim_aux_var"]:
logger.debug(
"Optimizer Auxiliary variables found, they will also be encrypted."
)
model_weights, enc_factor, aux_var = self._encrypt_weights_and_auxvar(
model_weights=model_weights,
optim_aux_var=results["optim_aux_var"],
sample_size=results["sample_size"],
secagg_insecure_validation=secagg_insecure_validation,
)
results["encrypted"] = True
results["encryption_factor"] = enc_factor
logger.debug(
f"Model parameters encrypted for round {self._round} , with encryption factor {enc_factor} and scheme {self._secure_aggregation.scheme.__class__.__name__}."
)
if aux_var is not None:
results["optim_aux_var"] = aux_var.to_dict()
results["params"] = model_weights
results["optimizer_args"] = self.training_plan.optimizer_args()
results["state_id"] = self._node_state_manager.state_id
try:
self._save_round_state()
except Exception as e:
# don't send details to researcher
logger.error(f"Error while saving round state: {repr(e)}")
logger.debug(
f"Round state saving error details: {traceback.format_exc()}"
)
return self._send_round_reply(
success=False, message="Can't save new node state."
)
# end : clean the namespace
try:
del self.training_plan
del CurrentTPModule
except Exception as e:
logger.error(
f"Exception raised while deleting training plan instance: {repr(e)}"
)
logger.debug(
f"Training plan instance deletion error details: {traceback.format_exc()}"
)
return self._send_round_reply(
success=True,
timing={
"rtime_training": rtime_after - rtime_before,
"ptime_training": ptime_after - ptime_before,
},
extend_with=results,
)
else:
# Only for validation
logger.debug(
f"Skipping training execution for round: experiment={self.experiment_id} "
f"round={self._round} dataset={self.dataset.get('dataset_id')} reason=training_disabled"
)
return self._send_round_reply(success=True)