Use a federated learning strategy#
Welcome to the next part of the federated learning tutorial. In previous parts of this tutorial, we introduced federated learning with PyTorch and Flower (part 1).
In this notebook, we’ll begin to customize the federated learning system we built in the introductory notebook again, using the Flower framework, Flower Datasets, and PyTorch.
Star Flower on GitHub ⭐️ and join the Flower community on Flower Discuss and the Flower Slack to connect, ask questions, and get help: - Join Flower Discuss We’d love to hear from you in the
Introduction
topic! If anything is unclear, post inFlower Help - Beginners
. - Join Flower Slack We’d love to hear from you in the#introductions
channel! If anything is unclear, head over to the#questions
channel.
Let’s move beyond FedAvg with Flower strategies! 🌼
Préparation#
Avant de commencer le code proprement dit, assurons-nous que nous disposons de tout ce dont nous avons besoin.
Installation des dépendances#
Tout d’abord, nous installons les paquets nécessaires :
[ ]:
!pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision
Maintenant que toutes les dépendances sont installées, nous pouvons importer tout ce dont nous avons besoin pour ce tutoriel :
[ ]:
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import flwr
from flwr.client import Client, ClientApp, NumPyClient
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg, FedAdagrad
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset
from flwr.common import ndarrays_to_parameters, NDArrays, Scalar, Context
DEVICE = torch.device("cpu") # Try "cuda" to train on GPU
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")
Il est possible de passer à un runtime dont l’accélération GPU est activée (sur Google Colab : Runtime > Change runtime type > Hardware acclerator : GPU > Save
). Note cependant que Google Colab n’est pas toujours en mesure de proposer l’accélération GPU. Si tu vois une erreur liée à la disponibilité du GPU dans l’une des sections suivantes, envisage de repasser à une exécution basée sur le CPU en définissant DEVICE = torch.device("cpu")
. Si le runtime a activé l’accélération GPU, tu devrais voir apparaître le résultat Training on cuda
, sinon il dira Training on cpu
.
Chargement des données#
Let’s now load the CIFAR-10 training and test set, partition them into ten smaller datasets (each split into training and validation set), and wrap everything in their own DataLoader
. We introduce a new parameter num_partitions
which allows us to call load_datasets
with different numbers of partitions.
[ ]:
NUM_PARTITIONS = 10
BATCH_SIZE = 32
def load_datasets(partition_id: int, num_partitions: int):
fds = FederatedDataset(dataset="cifar10", partitioners={"train": num_partitions})
partition = fds.load_partition(partition_id)
# Divide data on each node: 80% train, 20% test
partition_train_test = partition.train_test_split(test_size=0.2, seed=42)
pytorch_transforms = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
def apply_transforms(batch):
# Instead of passing transforms to CIFAR10(..., transform=transform)
# we will use this function to dataset.with_transform(apply_transforms)
# The transforms object is exactly the same
batch["img"] = [pytorch_transforms(img) for img in batch["img"]]
return batch
partition_train_test = partition_train_test.with_transform(apply_transforms)
trainloader = DataLoader(
partition_train_test["train"], batch_size=BATCH_SIZE, shuffle=True
)
valloader = DataLoader(partition_train_test["test"], batch_size=BATCH_SIZE)
testset = fds.load_split("test").with_transform(apply_transforms)
testloader = DataLoader(testset, batch_size=BATCH_SIZE)
return trainloader, valloader, testloader
Formation/évaluation du modèle#
Continuons avec la définition habituelle du modèle (y compris set_parameters
et get_parameters
), les fonctions d’entraînement et de test :
[ ]:
class Net(nn.Module):
def __init__(self) -> None:
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def get_parameters(net) -> List[np.ndarray]:
return [val.cpu().numpy() for _, val in net.state_dict().items()]
def set_parameters(net, parameters: List[np.ndarray]):
params_dict = zip(net.state_dict().keys(), parameters)
state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
net.load_state_dict(state_dict, strict=True)
def train(net, trainloader, epochs: int):
"""Train the network on the training set."""
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters())
net.train()
for epoch in range(epochs):
correct, total, epoch_loss = 0, 0, 0.0
for batch in trainloader:
images, labels = batch["img"], batch["label"]
images, labels = images.to(DEVICE), labels.to(DEVICE)
optimizer.zero_grad()
outputs = net(images)
loss = criterion(net(images), labels)
loss.backward()
optimizer.step()
# Metrics
epoch_loss += loss
total += labels.size(0)
correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
epoch_loss /= len(trainloader.dataset)
epoch_acc = correct / total
print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")
def test(net, testloader):
"""Evaluate the network on the entire test set."""
criterion = torch.nn.CrossEntropyLoss()
correct, total, loss = 0, 0, 0.0
net.eval()
with torch.no_grad():
for batch in testloader:
images, labels = batch["img"], batch["label"]
images, labels = images.to(DEVICE), labels.to(DEVICE)
outputs = net(images)
loss += criterion(outputs, labels).item()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
loss /= len(testloader.dataset)
accuracy = correct / total
return loss, accuracy
Client de Flower#
To implement the Flower client, we (again) create a subclass of flwr.client.NumPyClient
and implement the three methods get_parameters
, fit
, and evaluate
. Here, we also pass the partition_id
to the client and use it log additional details. We then create an instance of ClientApp
and pass it the client_fn
.
[ ]:
class FlowerClient(NumPyClient):
def __init__(self, partition_id, net, trainloader, valloader):
self.partition_id = partition_id
self.net = net
self.trainloader = trainloader
self.valloader = valloader
def get_parameters(self, config):
print(f"[Client {self.partition_id}] get_parameters")
return get_parameters(self.net)
def fit(self, parameters, config):
print(f"[Client {self.partition_id}] fit, config: {config}")
set_parameters(self.net, parameters)
train(self.net, self.trainloader, epochs=1)
return get_parameters(self.net), len(self.trainloader), {}
def evaluate(self, parameters, config):
print(f"[Client {self.partition_id}] evaluate, config: {config}")
set_parameters(self.net, parameters)
loss, accuracy = test(self.net, self.valloader)
return float(loss), len(self.valloader), {"accuracy": float(accuracy)}
def client_fn(context: Context) -> Client:
net = Net().to(DEVICE)
# Read the node_config to fetch data partition associated to this node
partition_id = context.node_config["partition-id"]
num_partitions = context.node_config["num-partitions"]
trainloader, valloader, _ = load_datasets(partition_id, num_partitions)
return FlowerClient(partition_id, net, trainloader, valloader).to_client()
# Create the ClientApp
client = ClientApp(client_fn=client_fn)
Personnalisation de la stratégie#
Jusqu’à présent, tout devrait te sembler familier si tu as travaillé sur le cahier d’introduction. Avec cela, nous sommes prêts à présenter un certain nombre de nouvelles fonctionnalités.
Paramètres côté serveur initialisation#
Flower, by default, initializes the global model by asking one random client for the initial parameters. In many cases, we want more control over parameter initialization though. Flower therefore allows you to directly pass the initial parameters to the Strategy. We create an instance of Net()
and get the paramaters as follows:
[ ]:
# Create an instance of the model and get the parameters
params = get_parameters(Net())
Next, we create a server_fn
that returns the components needed for the server. Within server_fn
, we create a Strategy that uses the initial parameters.
[ ]:
def server_fn(context: Context) -> ServerAppComponents:
# Create FedAvg strategy
strategy = FedAvg(
fraction_fit=0.3,
fraction_evaluate=0.3,
min_fit_clients=3,
min_evaluate_clients=3,
min_available_clients=NUM_PARTITIONS,
initial_parameters=ndarrays_to_parameters(
params
), # Pass initial model parameters
)
# Configure the server for 3 rounds of training
config = ServerConfig(num_rounds=3)
return ServerAppComponents(strategy=strategy, config=config)
Passing initial_parameters
to the FedAvg
strategy prevents Flower from asking one of the clients for the initial parameters. In server_fn
, we pass this new strategy
and a ServerConfig
for defining the number of federated learning rounds (num_rounds
).
Similar to the ClientApp
, we now create the ServerApp
using the server_fn
:
[ ]:
# Create ServerApp
server = ServerApp(server_fn=server_fn)
Last but not least, we specify the resources for each client and run the simulation.
[ ]:
# Specify the resources each of your clients need
# If set to none, by default, each client will be allocated 2x CPU and 0x GPUs
backend_config = {"client_resources": None}
if DEVICE.type == "cuda":
backend_config = {"client_resources": {"num_gpus": 1}}
# Run simulation
run_simulation(
server_app=server,
client_app=client,
num_supernodes=NUM_PARTITIONS,
backend_config=backend_config,
)
If we look closely, we can see that the logs do not show any calls to the FlowerClient.get_parameters
method.
Commencer par une stratégie personnalisée#
We’ve seen the function run_simulation
before. It accepts a number of arguments, amongst them the server_app
which wraps around the strategy and number of training rounds, client_app
which wraps around the client_fn
used to create FlowerClient
instances, and the number of clients to simulate which equals num_supernodes
.
La stratégie englobe l’approche/l’algorithme d’apprentissage fédéré, par exemple, FedAvg
ou FedAdagrad
. Essayons d’utiliser une stratégie différente cette fois-ci :
[ ]:
def server_fn(context: Context) -> ServerAppComponents:
# Create FedAdagrad strategy
strategy = FedAdagrad(
fraction_fit=0.3,
fraction_evaluate=0.3,
min_fit_clients=3,
min_evaluate_clients=3,
min_available_clients=NUM_PARTITIONS,
initial_parameters=ndarrays_to_parameters(params),
)
# Configure the server for 3 rounds of training
config = ServerConfig(num_rounds=3)
return ServerAppComponents(strategy=strategy, config=config)
# Create the ServerApp
server = ServerApp(server_fn=server_fn)
# Run simulation
run_simulation(
server_app=server,
client_app=client,
num_supernodes=NUM_PARTITIONS,
backend_config=backend_config,
)
Paramètre côté serveur évaluation#
Flower peut évaluer le modèle agrégé côté serveur ou côté client. Les évaluations côté client et côté serveur sont similaires à certains égards, mais différentes à d’autres.
L’évaluation centralisée (ou évaluation côté serveur) est conceptuellement simple : elle fonctionne de la même manière que l’évaluation dans l’apprentissage automatique centralisé. S’il existe un ensemble de données côté serveur qui peut être utilisé à des fins d’évaluation, alors c’est parfait. Nous pouvons évaluer le modèle nouvellement agrégé après chaque cycle de formation sans avoir à envoyer le modèle aux clients. Nous avons également la chance que l’ensemble de notre ensemble de données d’évaluation soit disponible à tout moment.
L’évaluation fédérée (ou évaluation côté client) est plus complexe, mais aussi plus puissante : elle ne nécessite pas d’ensemble de données centralisé et nous permet d’évaluer les modèles sur un plus grand ensemble de données, ce qui donne souvent des résultats d’évaluation plus réalistes. En fait, de nombreux scénarios exigent que nous utilisions l’évaluation fédérée** si nous voulons obtenir des résultats d’évaluation représentatifs. Mais cette puissance a un coût : une fois que nous commençons à évaluer côté client, nous devons savoir que notre ensemble de données d’évaluation peut changer au cours des cycles d’apprentissage consécutifs si ces clients ne sont pas toujours disponibles. De plus, l’ensemble de données détenu par chaque client peut également changer au cours des cycles consécutifs. Cela peut conduire à des résultats d’évaluation qui ne sont pas stables, donc même si nous ne changions pas le modèle, nous verrions nos résultats d’évaluation fluctuer au cours des cycles consécutifs.
Nous avons vu comment l’évaluation fédérée fonctionne du côté client (c’est-à-dire en implémentant la méthode evaluate
dans FlowerClient
). Voyons maintenant comment nous pouvons évaluer les paramètres du modèle agrégé du côté serveur :
[ ]:
# The `evaluate` function will be called by Flower after every round
def evaluate(
server_round: int,
parameters: NDArrays,
config: Dict[str, Scalar],
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
net = Net().to(DEVICE)
_, _, testloader = load_datasets(0, NUM_PARTITIONS)
set_parameters(net, parameters) # Update model with the latest parameters
loss, accuracy = test(net, testloader)
print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
return loss, {"accuracy": accuracy}
We create a FedAvg
strategy and pass evaluate_fn
to it. Then, we create a ServerApp
that uses this strategy.
[ ]:
def server_fn(context: Context) -> ServerAppComponents:
# Create the FedAvg strategy
strategy = FedAvg(
fraction_fit=0.3,
fraction_evaluate=0.3,
min_fit_clients=3,
min_evaluate_clients=3,
min_available_clients=NUM_PARTITIONS,
initial_parameters=ndarrays_to_parameters(params),
evaluate_fn=evaluate, # Pass the evaluation function
)
# Configure the server for 3 rounds of training
config = ServerConfig(num_rounds=3)
return ServerAppComponents(strategy=strategy, config=config)
# Create the ServerApp
server = ServerApp(server_fn=server_fn)
Finally, we run the simulation.
[ ]:
# Run simulation
run_simulation(
server_app=server,
client_app=client,
num_supernodes=NUM_PARTITIONS,
backend_config=backend_config,
)
Envoi/réception de valeurs arbitraires vers/depuis les clients#
In some situations, we want to configure client-side execution (training, evaluation) from the server-side. One example for that is the server asking the clients to train for a certain number of local epochs. Flower provides a way to send configuration values from the server to the clients using a dictionary. Let’s look at an example where the clients receive values from the server through the config
parameter in fit
(config
is also available in evaluate
). The fit
method
receives the configuration dictionary through the config
parameter and can then read values from this dictionary. In this example, it reads server_round
and local_epochs
and uses those values to improve the logging and configure the number of local training epochs:
[ ]:
class FlowerClient(NumPyClient):
def __init__(self, pid, net, trainloader, valloader):
self.pid = pid # partition ID of a client
self.net = net
self.trainloader = trainloader
self.valloader = valloader
def get_parameters(self, config):
print(f"[Client {self.pid}] get_parameters")
return get_parameters(self.net)
def fit(self, parameters, config):
# Read values from config
server_round = config["server_round"]
local_epochs = config["local_epochs"]
# Use values provided by the config
print(f"[Client {self.pid}, round {server_round}] fit, config: {config}")
set_parameters(self.net, parameters)
train(self.net, self.trainloader, epochs=local_epochs)
return get_parameters(self.net), len(self.trainloader), {}
def evaluate(self, parameters, config):
print(f"[Client {self.pid}] evaluate, config: {config}")
set_parameters(self.net, parameters)
loss, accuracy = test(self.net, self.valloader)
return float(loss), len(self.valloader), {"accuracy": float(accuracy)}
def client_fn(context: Context) -> Client:
net = Net().to(DEVICE)
partition_id = context.node_config["partition-id"]
num_partitions = context.node_config["num-partitions"]
trainloader, valloader, _ = load_datasets(partition_id, num_partitions)
return FlowerClient(partition_id, net, trainloader, valloader).to_client()
# Create the ClientApp
client = ClientApp(client_fn=client_fn)
Comment pouvons-nous donc envoyer ce dictionnaire de configuration du serveur aux clients ? Les stratégies de Flower intégrées fournissent un moyen de le faire, et cela fonctionne de la même façon que l’évaluation côté serveur. Nous fournissons une fonction à la stratégie, et la stratégie appelle cette fonction pour chaque cycle d’apprentissage fédéré :
[ ]:
def fit_config(server_round: int):
"""Return training configuration dict for each round.
Perform two rounds of training with one local epoch, increase to two local
epochs afterwards.
"""
config = {
"server_round": server_round, # The current round of federated learning
"local_epochs": 1 if server_round < 2 else 2,
}
return config
Next, we’ll pass this function to the FedAvg strategy before starting the simulation:
[ ]:
def server_fn(context: Context) -> ServerAppComponents:
# Create FedAvg strategy
strategy = FedAvg(
fraction_fit=0.3,
fraction_evaluate=0.3,
min_fit_clients=3,
min_evaluate_clients=3,
min_available_clients=NUM_PARTITIONS,
initial_parameters=ndarrays_to_parameters(params),
evaluate_fn=evaluate,
on_fit_config_fn=fit_config, # Pass the fit_config function
)
config = ServerConfig(num_rounds=3)
return ServerAppComponents(strategy=strategy, config=config)
# Create the ServerApp
server = ServerApp(server_fn=server_fn)
# Run simulation
run_simulation(
server_app=server,
client_app=client,
num_supernodes=NUM_PARTITIONS,
backend_config=backend_config,
)
Comme nous pouvons le voir, les journaux des clients incluent maintenant le cycle actuel d’apprentissage fédéré (qu’ils lisent depuis le dictionnaire config
). Nous pouvons également configurer l’apprentissage local pour qu’il s’exécute pendant une époque au cours du premier et du deuxième cycle d’apprentissage fédéré, puis pendant deux époques au cours du troisième cycle.
Les clients peuvent également renvoyer des valeurs arbitraires au serveur. Pour ce faire, ils renvoient un dictionnaire depuis fit
et/ou evaluate
. Nous avons vu et utilisé ce concept tout au long de ce carnet sans le mentionner explicitement : notre FlowerClient
renvoie un dictionnaire contenant une paire clé/valeur personnalisée en tant que troisième valeur de retour dans evaluate
.
Mise à l’échelle de l’apprentissage fédéré#
Comme dernière étape de ce carnet, voyons comment nous pouvons utiliser Flower pour expérimenter avec un grand nombre de clients.
[ ]:
NUM_PARTITIONS = 1000
Note that we can reuse the ClientApp
for different num-partitions
since the Context is defined by the num_supernodes
argument in run_simulation()
.
We now have 1000 partitions, each holding 45 training and 5 validation examples. Given that the number of training examples on each client is quite small, we should probably train the model a bit longer, so we configure the clients to perform 3 local training epochs. We should also adjust the fraction of clients selected for training during each round (we don’t want all 1000 clients participating in every round), so we adjust fraction_fit
to 0.025
, which means that only 2.5% of available
clients (so 25 clients) will be selected for training each round:
[ ]:
def fit_config(server_round: int):
config = {
"server_round": server_round,
"local_epochs": 3,
}
return config
def server_fn(context: Context) -> ServerAppComponents:
# Create FedAvg strategy
strategy = FedAvg(
fraction_fit=0.025, # Train on 25 clients (each round)
fraction_evaluate=0.05, # Evaluate on 50 clients (each round)
min_fit_clients=20,
min_evaluate_clients=40,
min_available_clients=NUM_PARTITIONS,
initial_parameters=ndarrays_to_parameters(params),
on_fit_config_fn=fit_config,
)
config = ServerConfig(num_rounds=3)
return ServerAppComponents(strategy=strategy, config=config)
# Create the ServerApp
server = ServerApp(server_fn=server_fn)
# Run simulation
run_simulation(
server_app=server,
client_app=client,
num_supernodes=NUM_PARTITIONS,
backend_config=backend_config,
)
Récapitulation#
Dans ce carnet, nous avons vu comment nous pouvons progressivement améliorer notre système en personnalisant la stratégie, en initialisant les paramètres côté serveur, en choisissant une stratégie différente et en évaluant les modèles côté serveur. C’est une sacrée flexibilité avec si peu de code, n’est-ce pas ?
Dans les sections ultérieures, nous avons vu comment nous pouvons communiquer des valeurs arbitraires entre le serveur et les clients pour personnaliser entièrement l’exécution côté client. Grâce à cette capacité, nous avons construit une simulation d’apprentissage fédéré à grande échelle en utilisant le moteur de client virtuel Flower et nous avons mené une expérience impliquant 1000 clients dans la même charge de travail - le tout dans un carnet Jupyter !
Prochaines étapes#
Before you continue, make sure to join the Flower community on Flower Discuss (Join Flower Discuss) and on Slack (Join Slack).
Il existe un canal dédié aux questions
si vous avez besoin d’aide, mais nous aimerions aussi savoir qui vous êtes dans #introductions
!
The Flower Federated Learning Tutorial - Part 3 shows how to build a fully custom Strategy
from scratch.