Federated Evaluation (évaluation fédérée)¶
Il existe deux approches principales pour évaluer les modèles dans les systèmes d’apprentissage fédérés : l’évaluation centralisée (ou côté serveur) et l’évaluation fédérée (ou côté client).
Évaluation centralisée¶
Stratégies intégrées¶
Toutes les stratégies intégrées prennent en charge l’évaluation centralisée en fournissant une fonction d’évaluation lors de l’initialisation. Une fonction d’évaluation est une fonction qui peut prendre les paramètres du modèle global actuel comme entrée et renvoyer les résultats de l’évaluation :
from flwr.app import Context
from flwr.common import NDArrays, Scalar
from flwr.server import ServerAppComponents, ServerConfig
from flwr.server.strategy import FedAvg
from flwr.serverapp import ServerApp
from typing import Dict, Optional, Tuple
def get_evaluate_fn(model):
"""Return an evaluation function for server-side evaluation."""
# Load data and model here to avoid the overhead of doing it in `evaluate` itself
(x_train, y_train), _ = tf.keras.datasets.cifar10.load_data()
# Use the last 5k training examples as a validation set
x_val, y_val = x_train[45000:50000], y_train[45000:50000]
# The `evaluate` function will be called after every round
def evaluate(
server_round: int, parameters: NDArrays, config: Dict[str, Scalar]
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
model.set_weights(parameters) # Update model with the latest parameters
loss, accuracy = model.evaluate(x_val, y_val)
return loss, {"accuracy": accuracy}
return evaluate
def server_fn(context: Context):
# Read from config
num_rounds = context.run_config["num-server-rounds"]
config = ServerConfig(num_rounds=num_rounds)
# Load and compile model for server-side parameter evaluation
model = tf.keras.applications.EfficientNetB0(
input_shape=(32, 32, 3), weights=None, classes=10
)
model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])
# Create strategy
strategy = FedAvg(
# ... other FedAvg arguments
evaluate_fn=get_evaluate_fn(model),
)
return ServerAppComponents(strategy=strategy, config=config)
# Create ServerApp
app = ServerApp(server_fn=server_fn)
Stratégies personnalisées¶
L’abstraction Strategy fournit une méthode appelée evaluate qui peut être utilisée directement pour évaluer les paramètres du modèle global actuel. La mise en œuvre serveur appelle ensuite evaluate après l’agrégation des paramètres et avant l’évaluation fédérée (voir le paragraphe suivant).
Federated Evaluation (évaluation fédérée)¶
Mise en œuvre de l’évaluation fédérée¶
L’évaluation côté client se produit dans la méthode Client.evaluate et peut être configurée depuis le serveur.
from flwr.client import NumPyClient
class FlowerClient(NumPyClient):
def __init__(self, model, x_train, y_train, x_test, y_test):
self.model = model
self.x_train, self.y_train = x_train, y_train
self.x_test, self.y_test = x_test, y_test
def fit(self, parameters, config):
# ...
pass
def evaluate(self, parameters, config):
"""Evaluate parameters on the locally held test set."""
# Update local model with global parameters
self.model.set_weights(parameters)
# Get config values
steps: int = config["val_steps"]
# Evaluate global model parameters on the local test data and return results
loss, accuracy = self.model.evaluate(self.x_test, self.y_test, 32, steps=steps)
num_examples_test = len(self.x_test)
return loss, num_examples_test, {"accuracy": accuracy}
Configuration de l’évaluation fédérée¶
L’évaluation fédérée peut être configurée du côté du serveur. Les stratégies intégrées prennent en charge les arguments suivants :
fraction_evaluate: unefloatdéfinissant la fraction de clients qui seront sélectionnés pour l’évaluation. Sifraction_evaluateest fixé à0.1et que100clients sont connectés au serveur, alors10sera sélectionné aléatoirement pour l’évaluation. Sifraction_evaluateest fixé à0.0, l’évaluation fédérée sera désactivée.min_evaluate_clients: unint: le nombre minimum de clients qui doivent être sélectionnés pour l’évaluation. Sifraction_evaluateest fixé à0.1,min_evaluate_clientsest fixé à 20 et que100clients sont connectés au serveur, alors20clients seront sélectionnés pour l’évaluation.min_available_clients: unintqui définit le nombre minimum de clients qui doivent être connectés au serveur avant de pouvoir démarrer une ronde d’évaluation fédérée. Si moins demin_available_clientssont connectés au serveur, le serveur attendra jusqu’à ce que plus de clients soient connectés avant de continuer à sélectionner des clients pour l’évaluation.on_evaluate_config_fn: une fonction qui retourne un dictionnaire de configuration qui sera envoyé aux clients sélectionnés. La fonction sera appelée lors de chaque ronde et fournit un moyen pratique de personnaliser l’évaluation côté client depuis le serveur, par exemple pour configurer le nombre d’étapes de validation effectuées.
from flwr.app import Context
from flwr.server import ServerAppComponents, ServerConfig
from flwr.server.strategy import FedAvg
from flwr.serverapp import ServerApp
def evaluate_config(server_round: int):
"""Return evaluation configuration dict for each round.
Perform five local evaluation steps on each client (i.e., use five
batches) during rounds, one to three, then increase to ten local
evaluation steps.
"""
val_steps = 5 if server_round < 4 else 10
return {"val_steps": val_steps}
# Create strategy
strategy = FedAvg(
# ... other FedAvg arguments
fraction_evaluate=0.2,
min_evaluate_clients=2,
min_available_clients=10,
on_evaluate_config_fn=evaluate_config,
)
def server_fn(context: Context):
num_rounds = context.run_config["num-server-rounds"]
config = ServerConfig(num_rounds=num_rounds)
return ServerAppComponents(strategy=strategy, config=config)
# Create ServerApp
app = ServerApp(server_fn=server_fn)
Évaluer les mises à jour du modèle local pendant la formation¶
Les paramètres du modèle peuvent également être évalués pendant l’entraînement. Client.fit peut retourner des résultats d’évaluation arbitraires sous forme de dictionnaire :
from flwr.client import NumPyClient
class FlowerClient(NumPyClient):
def __init__(self, model, x_train, y_train, x_test, y_test):
self.model = model
self.x_train, self.y_train = x_train, y_train
self.x_test, self.y_test = x_test, y_test
def fit(self, parameters, config):
"""Train parameters on the locally held training set."""
# Update local model parameters
self.model.set_weights(parameters)
# Train the model using hyperparameters from config
history = self.model.fit(
self.x_train, self.y_train, batch_size=32, epochs=2, validation_split=0.1
)
# Return updated model parameters and validation results
parameters_prime = self.model.get_weights()
num_examples_train = len(self.x_train)
results = {
"loss": history.history["loss"][0],
"accuracy": history.history["accuracy"][0],
"val_loss": history.history["val_loss"][0],
"val_accuracy": history.history["val_accuracy"][0],
}
return parameters_prime, num_examples_train, results
def evaluate(self, parameters, config):
# ...
pass
Exemple de code complet¶
Pour un exemple de code complet qui utilise à la fois l’évaluation centralisée et fédérée, voir le Advanced PyTorch Example (la même approche peut être appliquée aux charges de travail implémentées dans tout autre cadre).