Skip to content

Jobs

Classes

FARequestJob

FARequestJob(experiment_id, fa_id, federated_dataset, stats_args, stats, dataset_schema, **kwargs)

Bases: Job

Federated Analytics (FA) request job.

Sends an FA request to all participating nodes and collects their replies.

Parameters:

Name Type Description Default
experiment_id str

ID of the experiment this job belongs to.

required
fa_id str

Unique identifier for this FA job.

required
federated_dataset FederatedDataSet

Federated dataset mapping node IDs to dataset metadata.

required
stats_args Optional[dict]

Keyword arguments passed to the statistics functions.

required
stats Optional[list]

List of statistic names to compute.

required
dataset_schema Optional[list]

Optional schema descriptor used to validate the dataset.

required
**kwargs

Forwarded to the base Job constructor.

{}
Source code in fedbiomed/researcher/federated_workflows/jobs/_fa_request_job.py
def __init__(
    self,
    experiment_id: str,
    fa_id: str,
    federated_dataset: FederatedDataSet,
    stats_args: Optional[dict],
    stats: Optional[list],
    dataset_schema: Optional[list],
    **kwargs,
) -> None:
    """Initialize the FARequestJob.

    Args:
        experiment_id: ID of the experiment this job belongs to.
        fa_id: Unique identifier for this FA job.
        federated_dataset: Federated dataset mapping node IDs to dataset metadata.
        stats_args: Keyword arguments passed to the statistics functions.
        stats: List of statistic names to compute.
        dataset_schema: Optional schema descriptor used to validate the dataset.
        **kwargs: Forwarded to the base `Job` constructor.
    """
    super().__init__(**kwargs)

    # Validate that exactly one stats specification is provided and that schema is not mixed with stats_args.
    if not stats and not stats_args:
        raise FedbiomedError(
            "At least one of 'stats' or 'stats_args' must be provided."
        )
    if stats_args and stats:
        raise FedbiomedError(
            "'stats_args' and 'stats' are mutually exclusive. "
            "Use 'stats_args' for fine-grained control or 'stats' for a flat list."
        )
    if stats_args and dataset_schema is not None:
        raise FedbiomedError(
            "'stats_args' and 'dataset_schema' are mutually exclusive. "
            "Schema selection is encoded in the structure of 'stats_args'."
        )

    self._experiment_id = experiment_id
    self._fa_id = fa_id
    self._federated_dataset = federated_dataset
    self._stats_args = stats_args
    self._dataset_schema = dataset_schema
    self._stats = stats

Functions

execute
execute()

Send the FA request to all nodes and collect replies.

Raises:

Type Description
FedbiomedError

If any node returns an error, or if no replies are received.

Returns:

Type Description
Dict[str, FAReply]

A dictionary mapping node IDs to their FAReply.

Source code in fedbiomed/researcher/federated_workflows/jobs/_fa_request_job.py
def execute(self) -> Dict[str, FAReply]:
    """Send the FA request to all nodes and collect replies.

    Raises:
        FedbiomedError: If any node returns an error, or if no replies are received.

    Returns:
        A dictionary mapping node IDs to their `FAReply`.
    """
    fa_request = dict(
        researcher_id=self._researcher_id,
        experiment_id=self._experiment_id,
        stats=self._stats,
        fa_id=self._fa_id,
        stats_args=self._stats_args,
        dataset_schema=self._dataset_schema,
    )

    requests = MessagesByNode()
    for node in self._nodes:
        requests[node] = FARequest(
            **{
                **fa_request,
                "dataset_id": self._federated_dataset.data()[node]["dataset_id"],
            }
        )

    with self._reqs.send(requests, self._nodes, self._policies) as responses:
        errors: Dict[str, ErrorMessage] = responses.errors()
        replies: Dict[str, FAReply] = responses.replies()

    if errors:
        # Log errors encountered during the execution of the FA request
        for node_id, error in errors.items():
            logger.error(
                f"Node {node_id} analytics error [{error.errnum}]: {error.extra_msg}"
            )
        # Treat any node error as fatal
        raise FedbiomedError(
            f"FA request execution failed with errors from nodes: {', '.join(errors.keys())}"
        )

    # Ensure there is at least one successful reply
    if not replies:
        raise FedbiomedError("No successful replies received.")

    return replies

Job

Job(*, researcher_id, requests, nodes)

Bases: ABC

Job represents a task to be executed on the node.

This is a base class that provides the basic functionality necessary to establish communication with the remote nodes. Actual tasks should inherit from Job to implement their own domain logic.

Functional life-cycle

Jobs must follow a "functional" life-cycle, meaning that they should be created just before the execution of the task, and destroyed shortly after. Jobs should not persist outside the scope of the function that requested the execution of the task.

Attributes:

Name Type Description
requests Requests

read-only Requests object handling communication with remote nodes

nodes List[str]

node IDs participating in the task

Parameters:

Name Type Description Default
researcher_id str

Unique ID of the researcher

required
requests Requests

Object for handling communications

required
nodes List[str] | None

A dict of node_id containing the nodes used for training

required
keep_files_dir

Directory for storing files created by the job that we want to keep beyond the execution of the job.

required
Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
def __init__(
    self,
    *,
    researcher_id: str,
    requests: Requests,
    nodes: List[str] | None,
):
    """Constructor of the class

    Args:
        researcher_id: Unique ID of the researcher
        requests: Object for handling communications
        nodes: A dict of node_id containing the nodes used for training
        keep_files_dir: Directory for storing files created by the job that we want to keep beyond the execution
            of the job.

    """

    self._researcher_id = researcher_id
    self._reqs = requests
    self._nodes: List[str] = (
        nodes or []
    )  # List of node ids participating in this task
    self._policies: List[RequestPolicy] = []

Attributes

nodes property
nodes
requests property
requests

Classes

RequestTimer
RequestTimer(nodes)

Context manager that computes the processing time elapsed for the request and the reply

Usage:

nodes = ['node_1', 'node_2']
job = Job(nodes, file)
with job._timer() as my_timer:
    # ... send some request

my_timer
# {node_1: 2.22, node_2: 2.21} # request time for each Node in second

Parameters:

Name Type Description Default
nodes List[str]

existing nodes that will be requested for the Job

required
Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
def __init__(self, nodes: List[str]):
    """
    Constructor of NodeTimer

    Args:
        nodes: existing nodes that will be requested for the Job
    """
    self._timer = {node_id: 0.0 for node_id in nodes}

Functions

execute abstractmethod
execute()

Payload of the job.

Completes a request to the job's nodes and collects replies.

Returns:

Type Description
Any

values specific to the type of job

Source code in fedbiomed/researcher/federated_workflows/jobs/_job.py
@abstractmethod
def execute(self) -> Any:
    """Payload of the job.

    Completes a request to the job's nodes and collects replies.

    Returns:
        values specific to the type of job
    """

PreprocRequestJob

PreprocRequestJob(experiment_id, preproc_type, preproc_step, preproc_id, federated_dataset, preproc_args, state_id=None, **kwargs)

Bases: Job

Preprocessing Request Job class.

This class represents a preprocessing request job in the Fed-BioMed framework. It inherits from the base Job class and is used to handle preprocessing requests.

experiment_id: The experiment ID associated with this preprocessing job.
preproc_type: The type of preprocessing to be performed.
preproc_step: The step of preprocessing to be executed.
preproc_id: The unique identifier for the preprocessing task.
federated_dataset: The federated dataset on which preprocessing is to be performed.
preproc_args: The arguments required for the preprocessing task.
state_id: Optional dictionary mapping node IDs to their respective state IDs.
**kwargs: Named arguments of parent class. Please see
    [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
Source code in fedbiomed/researcher/federated_workflows/jobs/_preproc_request_job.py
def __init__(
    self,
    experiment_id: str,
    preproc_type: PreprocType,
    preproc_step: PreprocStep,
    preproc_id: str,
    federated_dataset: FederatedDataset,
    preproc_args: dict,
    state_id: dict[str, str] | None = None,
    **kwargs,
) -> None:
    """Initialize the FARequestJob with the given FA request.
    Args:
        experiment_id: The experiment ID associated with this preprocessing job.
        preproc_type: The type of preprocessing to be performed.
        preproc_step: The step of preprocessing to be executed.
        preproc_id: The unique identifier for the preprocessing task.
        federated_dataset: The federated dataset on which preprocessing is to be performed.
        preproc_args: The arguments required for the preprocessing task.
        state_id: Optional dictionary mapping node IDs to their respective state IDs.
        **kwargs: Named arguments of parent class. Please see
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
    """
    super().__init__(**kwargs)

    self._experiment_id = experiment_id
    self._preproc_type = preproc_type.value
    self._preproc_step = preproc_step.value
    self._preproc_id = preproc_id
    self._federated_dataset = federated_dataset
    self._preproc_args = preproc_args
    self._state_id = state_id or {}

Functions

execute
execute()

Executes preprocessing request

Returns:

Type Description
Dict[str, PreprocReply]

A dictionary mapping node IDs to their respective preprocessing replies.

Source code in fedbiomed/researcher/federated_workflows/jobs/_preproc_request_job.py
def execute(self) -> Dict[str, PreprocReply]:
    """Executes preprocessing request

    Returns:
        A dictionary mapping node IDs to their respective preprocessing replies.
    """
    preproc_request = dict(
        researcher_id=self._researcher_id,
        experiment_id=self._experiment_id,
        preproc_type=self._preproc_type,
        preproc_step=self._preproc_step,
        preproc_id=self._preproc_id,
    )

    requests = MessagesByNode()
    for node in self._nodes:
        # Note: deepcopy is used to avoid mutation issues across nodes
        # may be later reconsidered depending on size and content of preproc_args
        preproc_args = deepcopy(self._preproc_args)

        requests[node] = PreprocRequest(
            **{
                **preproc_request,
                "dataset_id": self._federated_dataset.data()[node]["dataset_id"],
                "state_id": self._state_id.get(node),
                "preproc_args": preproc_args,
            }
        )

    with self._reqs.send(requests, self._nodes, self._policies) as responses:
        errors: Dict[str, ErrorMessage] = responses.errors()
        replies: Dict[str, PreprocReply] = responses.replies()

    if errors:
        # Handle errors appropriately (logging, raising exceptions, etc.)
        for node_id, error in errors.items():
            logger.error(
                "Error message received during preprocessing "
                f"in node_id={node_id}: {error.errnum}. {error.extra_msg}"
            )

    logger.debug(f"Replies are: {replies} for step {self._preproc_step}")

    return replies

TrainingJob

TrainingJob(experiment_id, round_, training_plan, training_args, model_args, data, nodes_state_ids, aggregator_args, keep_files_dir, secagg_arguments=None, do_training=True, optim_aux_var=None, **kwargs)

Bases: Job

TrainingJob is a task for training an ML model on the nodes by executing a TrainingPlan.

Parameters:

Name Type Description Default
experiment_id str

unique ID of this experiment

required
round_ int

current number of round the algorithm is performing (a round is considered to be all the training steps of a federated model between 2 aggregations).

required
training_plan BaseTrainingPlan

TrainingPlan with properly initialized model and optimizer

required
training_args TrainingArgs

arguments for training

required
model_args Optional[dict]

arguments for the model

required
data FederatedDataset

metadata of the federated data set

required
nodes_state_ids Dict[str, str]

unique IDs of the node states saved remotely

required
aggregator_args Dict[str, Dict[str, Any]]

aggregator arguments required for remote execution

required
keep_files_dir str

Directory for storing files created by the job that we want to keep beyond the execution of the job.

required
secagg_arguments Union[Dict, None]

Secure aggregation arguments, some depending on scheme used

None
do_training bool

if False, skip training in this round (do only validation). Defaults to True.

True
optim_aux_var Optional[Dict[str, AuxVar]]

Auxiliary variables of the researcher-side Optimizer, if any. Note that such variables may only be used if both the Experiment and node-side training plan hold a declearn-based Optimizer, and their plug-ins are coherent with each other as to expected information exchange.

None
**kwargs

Named arguments of parent class. Please see Job

{}
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_job.py
def __init__(
    self,
    experiment_id: str,
    round_: int,
    training_plan: BaseTrainingPlan,
    training_args: TrainingArgs,
    model_args: Optional[dict],
    data: FederatedDataset,
    nodes_state_ids: Dict[str, str],
    aggregator_args: Dict[str, Dict[str, Any]],
    keep_files_dir: str,
    secagg_arguments: Union[Dict, None] = None,
    do_training: bool = True,
    optim_aux_var: Optional[Dict[str, AuxVar]] = None,
    **kwargs,
):
    """Constructor of the class

    Args:
        experiment_id: unique ID of this experiment
        round_: current number of round the algorithm is performing (a round is considered to be all the
            training steps of a federated model between 2 aggregations).
        training_plan: TrainingPlan with properly initialized model and optimizer
        training_args: arguments for training
        model_args: arguments for the model
        data: metadata of the federated data set
        nodes_state_ids: unique IDs of the node states saved remotely
        aggregator_args: aggregator arguments required for remote execution
        keep_files_dir: Directory for storing files created by the job that we want to keep beyond the execution
            of the job.
        secagg_arguments: Secure aggregation arguments, some depending on scheme used
        do_training: if False, skip training in this round (do only validation). Defaults to True.
        optim_aux_var: Auxiliary variables of the researcher-side Optimizer, if any.
            Note that such variables may only be used if both the Experiment and node-side training plan
            hold a declearn-based [Optimizer][fedbiomed.common.optimizers.Optimizer], and their plug-ins
            are coherent with each other as to expected information exchange.
        **kwargs: Named arguments of parent class. Please see
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
    """
    super().__init__(**kwargs)
    # to be used for `execute()`
    self._experiment_id = experiment_id
    self._round_ = round_
    self._training_plan = training_plan
    self._training_args = training_args
    self._model_args = model_args
    self._data = data
    self._nodes_state_ids = nodes_state_ids
    self._aggregator_args = aggregator_args
    self._secagg_arguments = (
        secagg_arguments or {}
    )  # Assign empty dict to secagg arguments if it is None
    self._do_training = do_training
    self._optim_aux_var = optim_aux_var
    self._keep_files_dir = keep_files_dir

Functions

execute
execute()

Sends training request to nodes and waits for the responses

Returns:

Type Description
Tuple[Dict[str, Dict[str, Any]], Union[Dict[str, Dict[str, AuxVar]], Dict[str, EncryptedAuxVar]]]

A tuple of * training replies for this round * node-wise optimizer auxiliary variables, as a dict with format {node_name: encrypted_aux_var} is secagg is used, and {node_name: {module_name: module_aux_var}} otherwise.

Source code in fedbiomed/researcher/federated_workflows/jobs/_training_job.py
def execute(
    self,
) -> Tuple[
    Dict[str, Dict[str, Any]],  # inner dicts are TrainReply dumps
    Union[Dict[str, Dict[str, AuxVar]], Dict[str, EncryptedAuxVar]],
]:
    """Sends training request to nodes and waits for the responses

    Returns:
        A tuple of
          * training replies for this round
          * node-wise optimizer auxiliary variables, as a dict with format
            `{node_name: encrypted_aux_var}` is secagg is used, and
            `{node_name: {module_name: module_aux_var}}` otherwise.
    """

    # Populate request message
    msg = {
        "researcher_id": self._researcher_id,
        "experiment_id": self._experiment_id,
        "training_args": self._training_args.dict(),
        "training": self._do_training,
        "model_args": self._model_args if self._model_args is not None else {},
        "round": self._round_,
        "training_plan": self._training_plan.source(),
        "training_plan_class": self._training_plan.__class__.__name__,
        "params": self._training_plan.get_model_params(
            exclude_buffers=not self._training_args.dict()[
                "share_persistent_buffers"
            ],
            local_params=self._training_plan.local_params,
        ),
        "secagg_arguments": self._secagg_arguments,
        "aggregator_args": {},
        "optim_aux_var": self._optim_aux_var,
    }

    # Loop over nodes, add node specific data and send train request
    messages = MessagesByNode()

    for node in self._nodes:
        msg["dataset_id"] = self._data.data()[node]["dataset_id"]
        msg["state_id"] = self._nodes_state_ids.get(node)

        # add aggregator parameters to message header
        msg["aggregator_args"] = (
            self._aggregator_args.get(node, {}) if self._aggregator_args else {}
        )

        self._log_round_info(node=node, training=self._do_training)

        messages.update({node: TrainRequest(**msg)})  # send request to node

    with self.RequestTimer(self._nodes) as timer:  # compute request time
        # Send training request
        with self._reqs.send(
            messages, self._nodes, self._policies
        ) as federated_req:
            errors = federated_req.errors()
            replies = federated_req.replies()

    # Loop over errors
    for node_id, error in errors.items():
        logger.warning(
            f"Error message received during training: {error.errnum}. {error.extra_msg}"
        )
        self._nodes.remove(node_id)

    training_replies = self._get_training_results(replies=replies)

    timing_results = self._get_timing_results(replies, timer)
    # `training_replies` can be empty if there wasnot any replies
    for node_id in replies:
        if training_replies.get(node_id):
            training_replies[node_id].update({"timing": timing_results[node_id]})

    # Extract aux variables from training replies.
    if self._do_training:
        aux_vars = self._extract_received_optimizer_aux_var_from_round(replies)
    else:
        aux_vars = {}
    return training_replies, aux_vars

TrainingPlanApproveJob

TrainingPlanApproveJob(training_plan, description, **kwargs)

Bases: Job

Task for requesting nodes approval for running a given TrainingPlan on these nodes.

Parameters:

Name Type Description Default
training_plan BaseTrainingPlan

an instance of a TrainingPlan object

required
description str

human-readable description of the TrainingPlan for the reviewer on the node

required
*args

Positional argument of parent class Job

required
**kwargs

Named arguments of parent class. Please see Job

{}
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def __init__(self, training_plan: BaseTrainingPlan, description: str, **kwargs):
    """Constructor of the class.

    Args:
        training_plan: an instance of a TrainingPlan object
        description: human-readable description of the TrainingPlan for the reviewer on the node
        *args: Positional argument of parent class
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
        **kwargs: Named arguments of parent class. Please see
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]
    """
    super().__init__(**kwargs)
    self._policies = [DiscardOnTimeout(5)]  # specific policy for TrainingApproval
    self._training_plan = training_plan
    self._description = description

Functions

execute
execute()

Requests the approval of the provided TrainingPlan.

Returns:

Name Type Description
Dict

a dictionary of pairs (node_id: status), where status indicates to the researcher

Dict

that the training plan has been correctly downloaded on the node side.

Warning Dict

status does not mean that the training plan is approved, only that it has been added

Dict

to the "approval queue" on the node side.

Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def execute(self) -> Dict:
    """Requests the approval of the provided TrainingPlan.

    Returns:
        a dictionary of pairs (node_id: status), where status indicates to the researcher
        that the training plan has been correctly downloaded on the node side.
        Warning: status does not mean that the training plan is approved, only that it has been added
        to the "approval queue" on the node side.
    """
    return self._reqs.training_plan_approve(
        self._training_plan, self._description, self._nodes, self._policies
    )

TrainingPlanCheckJob

TrainingPlanCheckJob(experiment_id, training_plan, **kwargs)

Bases: Job

Task for checking if nodes accept running a given TrainingPlan.

Parameters:

Name Type Description Default
experiment_id str

unique ID of this experiment

required
training_plan BaseTrainingPlan

an instance of a TrainingPlan object

required
**kwargs

Named arguments of parent class. Please see Job

{}
Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def __init__(self, experiment_id: str, training_plan: BaseTrainingPlan, **kwargs):
    """Constructor of the class.

    Args:
        experiment_id: unique ID of this experiment
        training_plan: an instance of a TrainingPlan object
        **kwargs: Named arguments of parent class. Please see
            [`Job`][fedbiomed.researcher.federated_workflows.jobs.Job]

    """
    super().__init__(**kwargs)
    self._policies = [DiscardOnTimeout(5)]  # specific policy for TrainingApproval
    self._experiment_id = experiment_id
    self._training_plan = training_plan

Functions

execute
execute()

Checks whether model is approved or not.

This method sends training-plan-status request to the nodes. It should be run before running experiment. So, researchers can find out if their model has been approved

Returns:

Type Description
Dict

A dict of Message objects indexed by node ID, one for each job's nodes

Source code in fedbiomed/researcher/federated_workflows/jobs/_training_plan_approval_job.py
def execute(self) -> Dict:
    """Checks whether model is approved or not.

    This method sends `training-plan-status` request to the nodes. It should be run before running experiment.
    So, researchers can find out if their model has been approved

    Returns:
        A dict of `Message` objects indexed by node ID, one for each job's nodes
    """

    message = TrainingPlanStatusRequest(
        **{
            "researcher_id": self._researcher_id,
            "experiment_id": self._experiment_id,
            "training_plan": self._training_plan.source(),
        }
    )

    # Send message to each node that has been found after dataset search request
    # TODO: add timer to compute request time
    with self._reqs.send(
        message, self._nodes, policies=self._policies
    ) as federated_req:
        replies = federated_req.replies()

        for node_id, reply in replies.items():
            if reply.success is True:
                if reply.approval_obligation is True:
                    if reply.status == TrainingPlanApprovalStatus.APPROVED.value:
                        logger.info(
                            f"Training plan has been approved by the node: {node_id}"
                        )
                    else:
                        logger.warning(
                            f"Training plan has NOT been approved by the node: {node_id}."
                            + f"Training plan status : {reply.status}"
                        )
                else:
                    logger.info(
                        f"Training plan approval is not required by the node: {node_id}"
                    )
            else:
                logger.warning(f"Node : {node_id} : {reply.msg}")

    # Get the nodes that haven't replied training-plan-status request
    non_replied_nodes = list(set(self._nodes) - set(replies.keys()))
    if non_replied_nodes:
        logger.warning(
            f"Request for checking training plan status hasn't been replied \
                         by the nodes: {non_replied_nodes}. You might get error \
                             while running your experiment. "
        )

    return replies