Skip to content

Node

Core code of the node component.

Attributes

Classes

Node

Node(config, node_args=None)

Core code of the node component.

Defines the behaviour of the node, while communicating with the researcher through the Messaging, parsing messages from the researcher, either treating them instantly or queuing them, executing tasks requested by researcher stored in the queue.

Attributes:

Name Type Description
config

Node configuration

node_args

Command line arguments for node.

Source code in fedbiomed/node/node.py
def __init__(
    self,
    config: NodeConfig,
    node_args: Union[dict, None] = None,
):
    """Constructor of the class.

    Attributes:
        config: Node configuration
        node_args: Command line arguments for node.
    """
    self.node_args = node_args or {}
    self._debug = bool(self.node_args.get("debug", False)) or os.environ.get(
        "FBM_DEBUG", ""
    ).lower() in ("1", "true", "yes")

    self._config = config
    self._node_id = self._config.get("default", "id")
    self._node_name = self._config.get("default", "name")

    self._tasks_queue = TasksQueue(
        os.path.join(self._config.root, "var", f"queue_{self._node_id}"),
        str(os.path.join(self._config.root, "var", "tmp")),
    )

    self._grpc_client = GrpcController(
        node_id=self._node_id,
        researchers=[
            ResearcherCredentials(
                port=self._config.get("researcher", "port"),
                host=self._config.get("researcher", "ip"),
                certificate=self._config.get(
                    "researcher", "certificate", fallback=None
                ),
            )
        ],
        on_message=self.on_message,
    )
    self._db_path = os.path.abspath(
        os.path.join(
            self._config.root, CONFIG_FOLDER_NAME, self._config.get("default", "db")
        )
    )

    self._pending_requests = EventWaitExchange(remove_delivered=True)
    self._controller_data = EventWaitExchange(remove_delivered=False)
    self._n2n_router = NodeToNodeRouter(
        self._node_id,
        self._db_path,
        self._grpc_client,
        self._pending_requests,
        self._controller_data,
    )

    self.dataset_manager = DatasetManager(
        path=self._db_path,
        min_samples=self._config.getint("security", "minimum_samples"),
    )
    self.tp_security_manager = TrainingPlanSecurityManager(
        db=self._db_path,
        node_id=self._node_id,
        node_name=self._node_name,
        hashing=self._config.get("security", "hashing_algorithm"),
        tp_approval=self._config.getbool("security", "training_plan_approval"),
    )

    # Initialize security audit logging
    logger.set_security_logs(root_path=self._config.root)

    logger.configure_security(
        component_id=self._node_id,
        component_name=ComponentType.NODE,
        fedbiomed_version=__version__,
    )

    # Log node creation
    logger.security_event(
        operation="node_initialized",
        status="success",
        researcher_id=None,
        node_name=self._node_name,
        config_path=self._config.root,
    )

Attributes

config property
config

Return node config

dataset_manager instance-attribute
dataset_manager = DatasetManager(path=(_db_path), min_samples=(getint('security', 'minimum_samples')))
node_args instance-attribute
node_args = node_args or {}
node_id property
node_id

Returns id of the node

node_name property
node_name

Returns id of the node

tp_security_manager instance-attribute
tp_security_manager = TrainingPlanSecurityManager(db=(_db_path), node_id=(_node_id), node_name=(_node_name), hashing=(get('security', 'hashing_algorithm')), tp_approval=(getbool('security', 'training_plan_approval')))

Functions

add_task
add_task(task)

Adds a task to the pending tasks queue.

Parameters:

Name Type Description Default
task dict

A Message object describing a training task

required
Source code in fedbiomed/node/node.py
def add_task(self, task: dict):
    """Adds a task to the pending tasks queue.

    Args:
        task: A `Message` object describing a training task
    """
    self._tasks_queue.add(task)
    # Log task added to queue
    logger.security_event(
        operation="task_queued",
        status="queued",
        researcher_id=getattr(task, "researcher_id", None),
        message_type=task.__name__,
        experiment_id=getattr(task, "experiment_id", None),
        request_id=getattr(task, "request_id", None),
    )
is_connected
is_connected()

Checks if node is ready for communication with researcher

Returns:

Type Description
bool

True if node is ready, False if node is not ready

Source code in fedbiomed/node/node.py
def is_connected(self) -> bool:
    """Checks if node is ready for communication with researcher

    Returns:
        True if node is ready, False if node is not ready
    """
    return self._grpc_client.is_connected()
on_message
on_message(msg)

Handler to be used with Messaging class (ie the messager).

Called when a message arrives through the Messaging. It reads and triggers instructions received by node from Researcher, mainly: - ping requests, - train requests (then a new task will be added on node's task queue), - search requests (for searching data in node's database).

Parameters:

Name Type Description Default
msg dict

Incoming message from Researcher.

required
Source code in fedbiomed/node/node.py
def on_message(self, msg: dict):
    """Handler to be used with `Messaging` class (ie the messager).

    Called when a  message arrives through the `Messaging`.
    It reads and triggers instructions received by node from Researcher,
    mainly:
    - ping requests,
    - train requests (then a new task will be added on node's task queue),
    - search requests (for searching data in node's database).

    Args:
        msg: Incoming message from Researcher.
    """
    message: Message
    try:
        message = Message.from_dict(msg)
    except FedbiomedError as e:
        logger.error(e)  # Message was not properly formatted
        resid = msg.get("researcher_id", "unknown_researcher_id")
        self.send_error(
            ErrorNumbers.FB301,
            extra_msg="Message was not properly formatted",
            researcher_id=resid,
        )
    else:
        logger.debug(
            "Received researcher message type=%s req=%s researcher=%s experiment=%s dataset=%s round=%s",
            message.__name__,
            getattr(message, "request_id", None),
            getattr(message, "researcher_id", None),
            getattr(message, "experiment_id", None),
            getattr(message, "dataset_id", None),
            getattr(message, "round", None),
        )

        # Set security context for all logs related to this message
        with logger.security_context(
            researcher_id=getattr(message, "researcher_id", None),
            message_type=message.__name__,
            request_id=getattr(message, "request_id", None),
            experiment_id=getattr(message, "experiment_id", None),
        ):
            # Log incoming message
            logger.security_event(
                operation="message_received_from_researcher",
                status="received",
            )

            match message.__name__:
                case (
                    TrainRequest.__name__
                    | SecaggRequest.__name__
                    | AdditiveSSSetupRequest.__name__
                    | FARequest.__name__
                    | PreprocRequest.__name__
                ):
                    logger.debug(
                        "Queueing node task type=%s req=%s experiment=%s",
                        message.__name__,
                        getattr(message, "request_id", None),
                        getattr(message, "experiment_id", None),
                    )
                    self.add_task(message)
                case SecaggDeleteRequest.__name__:
                    self._task_secagg_delete(message)
                case OverlayMessage.__name__:
                    self._n2n_router.submit(message)
                case SearchRequest.__name__:
                    databases = self.dataset_manager.dataset_table.search_by_tags(
                        message.tags
                    )
                    if len(databases) != 0:
                        databases = (
                            self.dataset_manager.obfuscate_private_information(
                                databases
                            )
                        )
                    reply = SearchReply(
                        request_id=message.request_id,
                        node_id=self._node_id,
                        node_name=self._node_name,
                        researcher_id=message.researcher_id,
                        databases=databases,
                        count=len(databases),
                    )
                    self._grpc_client.send(reply)
                    # Log outgoing reply
                    logger.security_event(
                        operation="SearchReply_sent",
                        status="sent",
                    )
                case ListRequest.__name__:
                    # Get list of all datasets
                    databases = self.dataset_manager.list_my_datasets(verbose=False)
                    databases = self.dataset_manager.obfuscate_private_information(
                        databases
                    )
                    reply = ListReply(
                        success=True,
                        request_id=message.request_id,
                        node_id=self._node_id,
                        node_name=self._node_name,
                        researcher_id=message.researcher_id,
                        databases=databases,
                        count=len(databases),
                    )
                    self._grpc_client.send(reply)
                    # Log outgoing reply
                    logger.security_event(
                        operation="ListReply_sent",
                        status="sent",
                    )

                case PingRequest.__name__:
                    reply = PingReply(
                        request_id=message.request_id,
                        researcher_id=message.researcher_id,
                        node_id=self._node_id,
                        node_name=self._node_name,
                    )
                    self._grpc_client.send(reply)
                    # Log outgoing reply
                    logger.security_event(
                        operation="PingReply_sent",
                        status="sent",
                    )
                case ApprovalRequest.__name__:
                    reply = self.tp_security_manager.reply_training_plan_approval_request(
                        message
                    )
                    self._grpc_client.send(reply)
                    # Log outgoing reply
                    logger.security_event(
                        operation="ApprovalReply_sent",
                        status="sent",
                    )
                case TrainingPlanStatusRequest.__name__:
                    reply = (
                        self.tp_security_manager.reply_training_plan_status_request(
                            message
                        )
                    )
                    self._grpc_client.send(reply)
                    # Log outgoing reply
                    logger.security_event(
                        operation="TrainingPlanStatusReply_sent",
                        status="sent",
                    )
                case _:
                    resid = msg.get("researcher_id", "unknown_researcher_id")
                    self.send_error(
                        ErrorNumbers.FB301,
                        extra_msg="This request handler is not implemented "
                        f"{message.__class__.__name__} is not implemented",
                        researcher_id=resid,
                    )
parser_task_train
parser_task_train(msg)

Parses a given training task message to create a round instance

Parameters:

Name Type Description Default
msg TrainRequest

TrainRequest message object to parse

required

Returns:

Type Description
Union[Round, None]

a Round object for the training to perform, or None if no training

Source code in fedbiomed/node/node.py
def parser_task_train(self, msg: TrainRequest) -> Union[Round, None]:
    """Parses a given training task message to create a round instance

    Args:
        msg: `TrainRequest` message object to parse

    Returns:
        a `Round` object for the training to perform, or None if no training
    """
    round_ = None
    hist_monitor = HistoryMonitor(
        node_id=self._node_id,
        node_name=self._node_name,
        experiment_id=msg.experiment_id,
        researcher_id=msg.researcher_id,
        send=self._grpc_client.send,
    )
    dataset_id = msg.get_param("dataset_id")
    data = self.dataset_manager.dataset_table.get_by_id(dataset_id)

    if data is None:
        return self.send_error(
            extra_msg=(
                f"{ErrorNumbers.FB313.value}: Did not find proper data in local datasets "
                f"on node={self._node_id} for dataset_id={dataset_id}"
            ),
            request_id=msg.request_id,
            researcher_id=msg.researcher_id,
            errnum=ErrorNumbers.FB313,
        )
    logger.debug(
        "Preparing training round req=%s experiment=%s round=%s dataset=%s training_plan=%s training=%s state_id=%s has_aux_var=%s",
        msg.request_id,
        msg.experiment_id,
        msg.round,
        dataset_id,
        msg.get_param("training_plan_class"),
        bool(msg.get_param("training")),
        msg.get_param("state_id"),
        msg.get_param("optim_aux_var") is not None,
    )

    dlp_and_loading_block_metadata = None
    if "dlp_id" in data:
        dlp_and_loading_block_metadata = self.dataset_manager.get_dlp_by_id(
            data["dlp_id"]
        )
    else:
        logger.debug("No data loading plan metadata for dataset=%s", dataset_id)

    round_ = Round(
        root_dir=self._config.root,
        db=self._db_path,
        node_id=self._node_id,
        node_name=self._node_name,
        training_plan=msg.get_param("training_plan"),
        training_plan_class=msg.get_param("training_plan_class"),
        model_kwargs=msg.get_param("model_args") or {},
        training_kwargs=msg.get_param("training_args") or {},
        training=msg.get_param("training") or False,
        dataset=data,
        params=msg.get_param("params"),
        experiment_id=msg.get_param("experiment_id"),
        researcher_id=msg.get_param("researcher_id"),
        history_monitor=hist_monitor,
        aggregator_args=msg.get_param("aggregator_args") or None,
        node_args=self.node_args,
        tp_security_manager=self.tp_security_manager,
        round_number=msg.get_param("round"),
        dlp_and_loading_block_metadata=dlp_and_loading_block_metadata,
        aux_vars=msg.get_param("optim_aux_var"),
    )

    # the round raises an error if it cannot initialize
    try:
        err_msg = round_.initialize_arguments(msg.get_param("state_id"))
    except Exception:
        self.send_error(
            errnum=ErrorNumbers.FB300,
            extra_msg=f"{ErrorNumbers.FB300.value}: Could not initialize arguments",
            researcher_id=msg.researcher_id,
            request_id=msg.request_id,
        )
        logger.debug(
            f"Training round initialize arguments error. Details are: {traceback.format_exc()}"
        )
        return None

    if err_msg is not None:
        self.send_error(
            errnum=ErrorNumbers.FB300,
            extra_msg=(
                f"{ErrorNumbers.FB300.value}: Could not initialize arguments for training round: {err_msg}"
            ),
            researcher_id=msg.researcher_id,
            request_id=msg.request_id,
        )
        return None

    return round_
send_error
send_error(errnum=ErrorNumbers.FB300, extra_msg='', researcher_id='<unknown>', broadcast=False, request_id=None)

Sends an error message.

Parameters:

Name Type Description Default
errnum ErrorNumbers

Code of the error.

FB300
extra_msg str

Additional human readable error message.

''
researcher_id str

Destination researcher.

'<unknown>'
broadcast bool

Broadcast the message all available researchers regardless of specific researcher.

False
request_id str

Optional request i to reply as error to a request.

None
Source code in fedbiomed/node/node.py
def send_error(
    self,
    errnum: ErrorNumbers = ErrorNumbers.FB300,
    extra_msg: str = "",
    researcher_id: str = "<unknown>",
    broadcast: bool = False,
    request_id: str = None,
):
    """Sends an error message.

    Args:
        errnum: Code of the error.
        extra_msg: Additional human readable error message.
        researcher_id: Destination researcher.
        broadcast: Broadcast the message all available researchers
            regardless of specific researcher.
        request_id: Optional request i to reply as error to a request.
    """
    researcher_host = self._config.get("researcher", "ip")
    researcher_port = self._config.get("researcher", "port")
    try:
        connected = self.is_connected()
    except Exception:
        connected = False

    try:
        logger.debug(
            "Preparing error reply errnum=%s req=%s researcher=%s broadcast=%s connected=%s destination=%s:%s msg_len=%d",
            errnum.name,
            request_id,
            researcher_id,
            broadcast,
            connected,
            researcher_host,
            researcher_port,
            len(extra_msg),
            stack_info=True,
        )

        # Log error to console and security audit log in one call
        logger.error(
            extra_msg,
            extra={
                "is_security": True,
                "operation": "error_sent",
                "status": "error",
                "request_id": request_id,
                "error_code": errnum.name,
                "error_message": extra_msg,
                "broadcast": broadcast,
            },
            researcher_id=researcher_id if researcher_id != "<unknown>" else None,
            broadcast=broadcast,
        )

        self._grpc_client.send(
            ErrorMessage(
                request_id=request_id,
                errnum=errnum.name,
                node_id=self._node_id,
                node_name=self._node_name,
                extra_msg=extra_msg,
                researcher_id=researcher_id,
            ),
            broadcast=broadcast,
        )

        logger.debug(
            "Error reply dispatched errnum=%s req=%s researcher=%s broadcast=%s connected=%s",
            errnum.name,
            request_id,
            researcher_id,
            broadcast,
            connected,
        )
    except Exception as e:
        logger.error(
            f"{ErrorNumbers.FB601.value}: Cannot send error message: {e}",
            exc_info=True,
        )
start_messaging
start_messaging(on_finish=None)

Calls the start method of messaging class.

Parameters:

Name Type Description Default
on_finish Optional[Callable]

Called when the tasks for handling all known researchers have finished. Callable has no argument.

None
Source code in fedbiomed/node/node.py
def start_messaging(self, on_finish: Optional[Callable] = None):
    """Calls the start method of messaging class.

    Args:
        on_finish: Called when the tasks for handling all known researchers have finished.
            Callable has no argument.
    """
    # Log node start
    logger.security_event(
        operation="node_started",
        status="success",
        researcher_id=None,
        node_name=self._node_name,
    )
    self._grpc_client.start(on_finish)
start_protocol
start_protocol()

Start the node to node router thread, for handling node to node message

Source code in fedbiomed/node/node.py
def start_protocol(self) -> None:
    """Start the node to node router thread, for handling node to node message"""
    self._n2n_router.start()
task_manager
task_manager()

Manages training tasks in the queue.

Source code in fedbiomed/node/node.py
def task_manager(self):
    """Manages training tasks in the queue."""

    while True:
        item: Message = self._tasks_queue.get()
        # don't want to treat again in case of failure
        self._tasks_queue.task_done()

        logger.info(
            f"[TASKS QUEUE] Task received by task manager: "
            f"Researcher: {item.researcher_id} "
            f"Experiment: {item.experiment_id}"
        )

        # Set security context for all logs in this task
        with logger.security_context(
            researcher_id=item.researcher_id,
            experiment_id=item.experiment_id,
            request_id=item.request_id,
        ):
            try:
                match item.__name__:
                    case TrainRequest.__name__:
                        round_ = self.parser_task_train(item)
                        # once task is out of queue, initiate training rounds
                        if round_ is not None:
                            # Capture start time
                            start_time = time.time()

                            # Log training round start
                            logger.security_event(
                                operation="training_round_start",
                                status="initiated",
                                dataset_id=round_.dataset.get("dataset_id"),
                                training_plan_id=item.get_param(
                                    "training_plan_class"
                                ),
                                round_number=item.round,
                            )
                            logger.debug(
                                "Starting node training req=%s experiment=%s round=%s dataset=%s plan=%s",
                                item.request_id,
                                item.experiment_id,
                                item.round,
                                round_.dataset.get("dataset_id"),
                                item.get_param("training_plan_class"),
                            )
                            msg = round_.run_model_training(
                                tp_approval=self._config.getbool(
                                    "security", "training_plan_approval"
                                ),
                                secagg_insecure_validation=self._config.getbool(
                                    "security", "secagg_insecure_validation"
                                ),
                                secagg_active=self._config.getbool(
                                    "security", "secure_aggregation"
                                ),
                                force_secagg=self._config.getbool(
                                    "security", "force_secure_aggregation"
                                ),
                                secagg_arguments=item.get_param("secagg_arguments"),
                            )
                            msg.request_id = item.request_id
                            self._grpc_client.send(msg)

                            # Calculate duration
                            duration_seconds = time.time() - start_time

                            # Log training round completion
                            logger.security_event(
                                operation="training_round_complete",
                                status="success",
                                dataset_id=round_.dataset.get("dataset_id"),
                                training_plan_id=item.get_param(
                                    "training_plan_class"
                                ),
                                round_number=item.round,
                                duration_seconds=round(duration_seconds, 2),
                            )
                            logger.debug(
                                "Finished node training req=%s experiment=%s round=%s reply_type=%s success=%s duration_s=%.2f",
                                item.request_id,
                                item.experiment_id,
                                item.round,
                                msg.__class__.__name__,
                                getattr(msg, "success", None),
                                duration_seconds,
                            )
                            del round_

                    case SecaggRequest.__name__ | AdditiveSSSetupRequest.__name__:
                        # Log secagg setup start
                        logger.security_event(
                            operation="secagg_setup_start",
                            status="initiated",
                            secagg_id=getattr(item, "secagg_id", None),
                        )
                        self._task_secagg(item)
                        # Log secagg setup complete
                        logger.security_event(
                            operation="secagg_setup_complete",
                            status="success",
                            secagg_id=getattr(item, "secagg_id", None),
                        )
                    case FARequest.__name__:
                        # Log federated analytics start
                        logger.security_event(
                            operation="federated_analytics_start",
                            status="initiated",
                        )
                        fa_job = FAJob(
                            root_dir=self._config.root,
                            dataset_manager=self.dataset_manager,
                            node_id=self._node_id,
                            node_name=self._node_name,
                            request=item,
                            allow_fa=self.config.getbool(
                                "security", "allow_federated_analytics"
                            ),
                        )
                        response = fa_job.run()
                        self._grpc_client.send(response)
                        # Log federated analytics complete
                        logger.security_event(
                            operation="federated_analytics_complete",
                            status="success",
                        )
                    case PreprocRequest.__name__:
                        # Log preprocessing start
                        logger.security_event(
                            operation="preprocessing_start",
                            status="initiated",
                        )
                        preproc_job = PreprocJob(
                            root_dir=self._config.root,
                            dataset_manager=self.dataset_manager,
                            node_id=self._node_id,
                            node_name=self._node_name,
                            request=item,
                            allow_preproc=self.config.getbool(
                                "security", "allow_preproc"
                            ),
                        )
                        response = preproc_job.run()
                        self._grpc_client.send(response)
                        # Log preprocessing complete
                        logger.security_event(
                            operation="preprocessing_complete",
                            status="success",
                        )
                    case _:
                        errmess = (
                            f"{ErrorNumbers.FB319.value}: Undefined request message"
                        )
                        self.send_error(
                            errnum=ErrorNumbers.FB319, extra_msg=errmess
                        )

            # TODO: Test exception
            except Exception as e:
                self.send_error(
                    request_id=item.request_id,
                    researcher_id=item.researcher_id,
                    errnum=ErrorNumbers.FB300,
                    extra_msg="Round error: " + str(e),
                )