Implement strategies#

L’abstraction de la stratégie permet de mettre en œuvre des stratégies entièrement personnalisées. Une stratégie est essentiellement l’algorithme d’apprentissage fédéré qui s’exécute sur le serveur. Les stratégies décident comment échantillonner les clients, comment configurer les clients pour la formation, comment agréger les mises à jour et comment évaluer les modèles. Flower fournit quelques stratégies intégrées qui sont basées sur la même API que celle décrite ci-dessous.

L’abstraction Stratégie#

Toutes les implémentations de stratégies sont dérivées de la classe de base abstraite flwr.server.strategy.Strategy, qu’il s’agisse d’implémentations intégrées ou d’implémentations tierces. Cela signifie que les implémentations de stratégies personnalisées ont exactement les mêmes capacités à leur disposition que les implémentations intégrées.

L’abstraction de la stratégie définit quelques méthodes abstraites qui doivent être mises en œuvre :

class Strategy(ABC):
    """Abstract base class for server strategy implementations."""

    @abstractmethod
    def initialize_parameters(
        self, client_manager: ClientManager
    ) -> Optional[Parameters]:
        """Initialize the (global) model parameters."""

    @abstractmethod
    def configure_fit(
        self,
        server_round: int,
        parameters: Parameters,
        client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, FitIns]]:
        """Configure the next round of training."""

    @abstractmethod
    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
        """Aggregate training results."""

    @abstractmethod
    def configure_evaluate(
        self,
        server_round: int,
        parameters: Parameters,
        client_manager: ClientManager
    ) -> List[Tuple[ClientProxy, EvaluateIns]]:
        """Configure the next round of evaluation."""

    @abstractmethod
    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        """Aggregate evaluation results."""

    @abstractmethod
    def evaluate(
        self, parameters: Parameters
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        """Evaluate the current model parameters."""

La création d’une nouvelle stratégie implique la mise en œuvre d’une nouvelle classe (dérivée de la classe de base abstraite Stratégie) qui met en œuvre les méthodes abstraites présentées précédemment :

class SotaStrategy(Strategy):
    def initialize_parameters(self, client_manager):
        # Your implementation here

    def configure_fit(self, server_round, parameters, client_manager):
        # Your implementation here

    def aggregate_fit(self, server_round, results, failures):
        # Your implementation here

    def configure_evaluate(self, server_round, parameters, client_manager):
        # Your implementation here

    def aggregate_evaluate(self, server_round, results, failures):
        # Your implementation here

    def evaluate(self, parameters):
        # Your implementation here

Le serveur Flower appelle ces méthodes dans l’ordre suivant :

sequenceDiagram participant Strategy participant S as Flower Server<br/>start_server participant C1 as Flower Client participant C2 as Flower Client Note left of S: Get initial <br/>model parameters S->>Strategy: initialize_parameters activate Strategy Strategy-->>S: Parameters deactivate Strategy Note left of S: Federated<br/>Training rect rgb(249, 219, 130) S->>Strategy: configure_fit activate Strategy Strategy-->>S: List[Tuple[ClientProxy, FitIns]] deactivate Strategy S->>C1: FitIns activate C1 S->>C2: FitIns activate C2 C1-->>S: FitRes deactivate C1 C2-->>S: FitRes deactivate C2 S->>Strategy: aggregate_fit<br/>List[FitRes] activate Strategy Strategy-->>S: Aggregated model parameters deactivate Strategy end Note left of S: Centralized<br/>Evaluation rect rgb(249, 219, 130) S->>Strategy: evaluate activate Strategy Strategy-->>S: Centralized evaluation result deactivate Strategy end Note left of S: Federated<br/>Evaluation rect rgb(249, 219, 130) S->>Strategy: configure_evaluate activate Strategy Strategy-->>S: List[Tuple[ClientProxy, EvaluateIns]] deactivate Strategy S->>C1: EvaluateIns activate C1 S->>C2: EvaluateIns activate C2 C1-->>S: EvaluateRes deactivate C1 C2-->>S: EvaluateRes deactivate C2 S->>Strategy: aggregate_evaluate<br/>List[EvaluateRes] activate Strategy Strategy-->>S: Aggregated evaluation results deactivate Strategy end Note left of S: Next round, continue<br/>with federated training

Les sections suivantes décrivent chacune de ces méthodes plus en détail.

La méthode initialize_parameters (initialisation des paramètres)#

initialize_parameters n’est appelé qu’une seule fois, au tout début d’une exécution. Il est chargé de fournir les paramètres initiaux du modèle global sous une forme sérialisée (c’est-à-dire sous la forme d’un objet Parameters).

Les stratégies intégrées renvoient les paramètres initiaux fournis par l’utilisateur. L’exemple suivant montre comment les paramètres initiaux peuvent être transmis à FedAvg :

import flwr as fl
import tensorflow as tf

# Load model for server-side parameter initialization
model = tf.keras.applications.EfficientNetB0(
    input_shape=(32, 32, 3), weights=None, classes=10
)
model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])

# Get model weights as a list of NumPy ndarray's
weights = model.get_weights()

# Serialize ndarrays to `Parameters`
parameters = fl.common.ndarrays_to_parameters(weights)

# Use the serialized parameters as the initial global parameters
strategy = fl.server.strategy.FedAvg(
    initial_parameters=parameters,
)
fl.server.start_server(config=fl.server.ServerConfig(num_rounds=3), strategy=strategy)

Le serveur Flower appelle initialize_parameters, qui renvoie les paramètres passés à initial_parameters, ou None. Si aucun paramètre n’est renvoyé par initialize_parameters (c’est-à-dire None), le serveur sélectionne au hasard un client et lui demande de fournir ses paramètres. Il s’agit d’une fonction de commodité qui n’est pas recommandée dans la pratique, mais qui peut être utile pour le prototypage. Dans la pratique, il est recommandé de toujours utiliser l’initialisation des paramètres du côté du serveur.

Note

L’initialisation des paramètres côté serveur est un mécanisme puissant. Elle peut être utilisée, par exemple, pour reprendre l’entraînement à partir d’un point de contrôle précédemment sauvegardé. C’est également la capacité fondamentale nécessaire pour mettre en œuvre des approches hybrides, par exemple, pour affiner un modèle pré-entraîné à l’aide de l’apprentissage fédéré.

La méthode configure_fit#

configure_fit est chargé de configurer le prochain tour de formation. Que signifie configurer dans ce contexte ? Configurer un tour signifie sélectionner des clients et décider des instructions à leur envoyer. La signature de configure_fit l’indique clairement :

@abstractmethod
def configure_fit(
    self,
    server_round: int,
    parameters: Parameters,
    client_manager: ClientManager
) -> List[Tuple[ClientProxy, FitIns]]:
    """Configure the next round of training."""

La valeur de retour est une liste de tuples, chacun représentant les instructions qui seront envoyées à un client particulier. Les implémentations de stratégies effectuent généralement les étapes suivantes dans configure_fit :

  • Utilise le client_manager pour échantillonner au hasard tous les clients disponibles (ou un sous-ensemble d’entre eux) (chacun représenté par un objet ClientProxy)

  • Associe chaque ClientProxy au même FitIns contenant le modèle global actuel parameters et config dict

More sophisticated implementations can use configure_fit to implement custom client selection logic. A client will only participate in a round if the corresponding ClientProxy is included in the list returned from configure_fit.

Note

La structure de cette valeur de retour offre beaucoup de souplesse à l’utilisateur. Comme les instructions sont définies par client, des instructions différentes peuvent être envoyées à chaque client, ce qui permet d’élaborer des stratégies personnalisées pour former, par exemple, différents modèles sur différents clients, ou utiliser différents hyperparamètres sur différents clients (via le dict config).

La méthode aggregate_fit (agrégation)#

aggregate_fit est chargé d’agréger les résultats renvoyés par les clients qui ont été sélectionnés et à qui on a demandé de s’entraîner dans configure_fit.

@abstractmethod
def aggregate_fit(
    self,
    server_round: int,
    results: List[Tuple[ClientProxy, FitRes]],
    failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
    """Aggregate training results."""

Bien sûr, des échecs peuvent se produire, il n’y a donc aucune garantie que le serveur obtienne des résultats de tous les clients auxquels il a envoyé des instructions (via configure_fit). aggregate_fit reçoit donc une liste de résultats, mais aussi une liste de échecs.

aggregate_fit renvoie un objet Parameters facultatif et un dictionnaire de métriques agrégées. La valeur de retour Parameters est facultative car aggregate_fit peut décider que les résultats fournis ne sont pas suffisants pour l’agrégation (par exemple, trop d’échecs).

La méthode configure_evaluate (en anglais)#

configure_evaluate est chargé de configurer le prochain tour d’évaluation. Que signifie configurer dans ce contexte ? Configurer un tour signifie sélectionner des clients et décider des instructions à leur envoyer. La signature de configure_evaluate l’indique clairement :

@abstractmethod
def configure_evaluate(
    self,
    server_round: int,
    parameters: Parameters,
    client_manager: ClientManager
) -> List[Tuple[ClientProxy, EvaluateIns]]:
    """Configure the next round of evaluation."""

La valeur de retour est une liste de tuples, chacun représentant les instructions qui seront envoyées à un client particulier. Les implémentations de stratégies effectuent généralement les étapes suivantes dans configure_evaluate :

  • Utilise le client_manager pour échantillonner au hasard tous les clients disponibles (ou un sous-ensemble d’entre eux) (chacun représenté par un objet ClientProxy)

  • Associe chaque ClientProxy au même EvaluateIns contenant le modèle global actuel parameters et config dict

More sophisticated implementations can use configure_evaluate to implement custom client selection logic. A client will only participate in a round if the corresponding ClientProxy is included in the list returned from configure_evaluate.

Note

La structure de cette valeur de retour offre beaucoup de souplesse à l’utilisateur. Comme les instructions sont définies par client, des instructions différentes peuvent être envoyées à chaque client. Cela permet aux stratégies personnalisées d’évaluer, par exemple, différents modèles sur différents clients, ou d’utiliser différents hyperparamètres sur différents clients (via le dict config).

La méthode aggregate_evaluate (agréger_évaluer)#

aggregate_evaluate est chargé d’agréger les résultats renvoyés par les clients qui ont été sélectionnés et à qui l’on a demandé d’évaluer dans configure_evaluate.

@abstractmethod
def aggregate_evaluate(
    self,
    server_round: int,
    results: List[Tuple[ClientProxy, EvaluateRes]],
    failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
) -> Tuple[Optional[float], Dict[str, Scalar]]:
    """Aggregate evaluation results."""

Bien sûr, des échecs peuvent se produire, il n’y a donc aucune garantie que le serveur obtienne des résultats de tous les clients auxquels il a envoyé des instructions (via configure_evaluate). aggregate_evaluate reçoit donc une liste de résultats, mais aussi une liste d” échecs.

aggregate_evaluate renvoie un float facultatif (perte) et un dictionnaire de mesures agrégées. La valeur de retour float est facultative car aggregate_evaluate peut décider que les résultats fournis ne sont pas suffisants pour l’agrégation (par exemple, trop d’échecs).

La méthode évaluer#

le fait d’avoir evaluate en plus de configure_evaluate/aggregate_evaluate permet aux stratégies d’effectuer des évaluations à la fois côté serveur et côté client (fédéré).

@abstractmethod
def evaluate(
    self, parameters: Parameters
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
    """Evaluate the current model parameters."""

La valeur de retour est à nouveau facultative parce que la stratégie peut ne pas avoir besoin de mettre en œuvre l’évaluation côté serveur ou parce que la méthode evaluate définie par l’utilisateur peut ne pas se terminer avec succès (par exemple, elle peut échouer à charger les données de l’évaluation côté serveur).