Open in Colab

Use a federated learning strategy

Welcome to the next part of the federated learning tutorial. In previous parts of this tutorial, we introduced federated learning with PyTorch and Flower (part 1).

In this notebook, we’ll begin to customize the federated learning system we built in the introductory notebook again, using the Flower framework, Flower Datasets, and PyTorch.

Star Flower on GitHub ⭐️ and join the Flower community on Flower Discuss and the Flower Slack to connect, ask questions, and get help: - Join Flower Discuss We’d love to hear from you in the Introduction topic! If anything is unclear, post in Flower Help - Beginners. - Join Flower Slack We’d love to hear from you in the #introductions channel! If anything is unclear, head over to the #questions channel.

Let’s move beyond FedAvg with Flower strategies! 🌼

Préparation

Avant de commencer le code proprement dit, assurons-nous que nous disposons de tout ce dont nous avons besoin.

Installation des dépendances

Tout d’abord, nous installons les paquets nécessaires :

[ ]:
!pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision

Maintenant que toutes les dépendances sont installées, nous pouvons importer tout ce dont nous avons besoin pour ce tutoriel :

[ ]:
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

import flwr
from flwr.client import Client, ClientApp, NumPyClient
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg, FedAdagrad
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset
from flwr.common import ndarrays_to_parameters, NDArrays, Scalar, Context

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")

Il est possible de passer à un runtime dont l’accélération GPU est activée (sur Google Colab : Runtime > Change runtime type > Hardware acclerator : GPU > Save). Note cependant que Google Colab n’est pas toujours en mesure de proposer l’accélération GPU. Si tu vois une erreur liée à la disponibilité du GPU dans l’une des sections suivantes, envisage de repasser à une exécution basée sur le CPU en définissant DEVICE = torch.device("cpu"). Si le runtime a activé l’accélération GPU, tu devrais voir apparaître le résultat Training on cuda, sinon il dira Training on cpu.

Chargement des données

Let’s now load the CIFAR-10 training and test set, partition them into ten smaller datasets (each split into training and validation set), and wrap everything in their own DataLoader. We introduce a new parameter num_partitions which allows us to call load_datasets with different numbers of partitions.

[ ]:
NUM_PARTITIONS = 10
BATCH_SIZE = 32


def load_datasets(partition_id: int, num_partitions: int):
    fds = FederatedDataset(dataset="cifar10", partitioners={"train": num_partitions})
    partition = fds.load_partition(partition_id)
    # Divide data on each node: 80% train, 20% test
    partition_train_test = partition.train_test_split(test_size=0.2, seed=42)
    pytorch_transforms = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )

    def apply_transforms(batch):
        # Instead of passing transforms to CIFAR10(..., transform=transform)
        # we will use this function to dataset.with_transform(apply_transforms)
        # The transforms object is exactly the same
        batch["img"] = [pytorch_transforms(img) for img in batch["img"]]
        return batch

    partition_train_test = partition_train_test.with_transform(apply_transforms)
    trainloader = DataLoader(
        partition_train_test["train"], batch_size=BATCH_SIZE, shuffle=True
    )
    valloader = DataLoader(partition_train_test["test"], batch_size=BATCH_SIZE)
    testset = fds.load_split("test").with_transform(apply_transforms)
    testloader = DataLoader(testset, batch_size=BATCH_SIZE)
    return trainloader, valloader, testloader

Formation/évaluation du modèle

Continuons avec la définition habituelle du modèle (y compris set_parameters et get_parameters), les fonctions d’entraînement et de test :

[ ]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def train(net, trainloader, epochs: int):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for batch in trainloader:
            images, labels = batch["img"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in testloader:
            images, labels = batch["img"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

Client de Flower

To implement the Flower client, we (again) create a subclass of flwr.client.NumPyClient and implement the three methods get_parameters, fit, and evaluate. Here, we also pass the partition_id to the client and use it log additional details. We then create an instance of ClientApp and pass it the client_fn.

[ ]:
class FlowerClient(NumPyClient):
    def __init__(self, partition_id, net, trainloader, valloader):
        self.partition_id = partition_id
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.partition_id}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        print(f"[Client {self.partition_id}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.partition_id}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(context: Context) -> Client:
    net = Net().to(DEVICE)

    # Read the node_config to fetch data partition associated to this node
    partition_id = context.node_config["partition-id"]
    num_partitions = context.node_config["num-partitions"]

    trainloader, valloader, _ = load_datasets(partition_id, num_partitions)
    return FlowerClient(partition_id, net, trainloader, valloader).to_client()


# Create the ClientApp
client = ClientApp(client_fn=client_fn)

Personnalisation de la stratégie

Jusqu’à présent, tout devrait te sembler familier si tu as travaillé sur le cahier d’introduction. Avec cela, nous sommes prêts à présenter un certain nombre de nouvelles fonctionnalités.

Paramètres côté serveur initialisation

Flower, by default, initializes the global model by asking one random client for the initial parameters. In many cases, we want more control over parameter initialization though. Flower therefore allows you to directly pass the initial parameters to the Strategy. We create an instance of Net() and get the paramaters as follows:

[ ]:
# Create an instance of the model and get the parameters
params = get_parameters(Net())

Next, we create a server_fn that returns the components needed for the server. Within server_fn, we create a Strategy that uses the initial parameters.

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(
            params
        ),  # Pass initial model parameters
    )

    # Configure the server for 3 rounds of training
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)

Passing initial_parameters to the FedAvg strategy prevents Flower from asking one of the clients for the initial parameters. In server_fn, we pass this new strategy and a ServerConfig for defining the number of federated learning rounds (num_rounds).

Similar to the ClientApp, we now create the ServerApp using the server_fn:

[ ]:
# Create ServerApp
server = ServerApp(server_fn=server_fn)

Last but not least, we specify the resources for each client and run the simulation.

[ ]:
# Specify the resources each of your clients need
# If set to none, by default, each client will be allocated 2x CPU and 0x GPUs
backend_config = {"client_resources": None}
if DEVICE.type == "cuda":
    backend_config = {"client_resources": {"num_gpus": 1}}

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

If we look closely, we can see that the logs do not show any calls to the FlowerClient.get_parameters method.

Commencer par une stratégie personnalisée

We’ve seen the function run_simulation before. It accepts a number of arguments, amongst them the server_app which wraps around the strategy and number of training rounds, client_app which wraps around the client_fn used to create FlowerClient instances, and the number of clients to simulate which equals num_supernodes.

La stratégie englobe l’approche/l’algorithme d’apprentissage fédéré, par exemple, FedAvg ou FedAdagrad. Essayons d’utiliser une stratégie différente cette fois-ci :

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAdagrad strategy
    strategy = FedAdagrad(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
    )
    # Configure the server for 3 rounds of training
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

Paramètre côté serveur évaluation

Flower peut évaluer le modèle agrégé côté serveur ou côté client. Les évaluations côté client et côté serveur sont similaires à certains égards, mais différentes à d’autres.

L’évaluation centralisée (ou évaluation côté serveur) est conceptuellement simple : elle fonctionne de la même manière que l’évaluation dans l’apprentissage automatique centralisé. S’il existe un ensemble de données côté serveur qui peut être utilisé à des fins d’évaluation, alors c’est parfait. Nous pouvons évaluer le modèle nouvellement agrégé après chaque cycle de formation sans avoir à envoyer le modèle aux clients. Nous avons également la chance que l’ensemble de notre ensemble de données d’évaluation soit disponible à tout moment.

L’évaluation fédérée (ou évaluation côté client) est plus complexe, mais aussi plus puissante : elle ne nécessite pas d’ensemble de données centralisé et nous permet d’évaluer les modèles sur un plus grand ensemble de données, ce qui donne souvent des résultats d’évaluation plus réalistes. En fait, de nombreux scénarios exigent que nous utilisions l’évaluation fédérée** si nous voulons obtenir des résultats d’évaluation représentatifs. Mais cette puissance a un coût : une fois que nous commençons à évaluer côté client, nous devons savoir que notre ensemble de données d’évaluation peut changer au cours des cycles d’apprentissage consécutifs si ces clients ne sont pas toujours disponibles. De plus, l’ensemble de données détenu par chaque client peut également changer au cours des cycles consécutifs. Cela peut conduire à des résultats d’évaluation qui ne sont pas stables, donc même si nous ne changions pas le modèle, nous verrions nos résultats d’évaluation fluctuer au cours des cycles consécutifs.

Nous avons vu comment l’évaluation fédérée fonctionne du côté client (c’est-à-dire en implémentant la méthode evaluate dans FlowerClient). Voyons maintenant comment nous pouvons évaluer les paramètres du modèle agrégé du côté serveur :

[ ]:
# The `evaluate` function will be called by Flower after every round
def evaluate(
    server_round: int,
    parameters: NDArrays,
    config: Dict[str, Scalar],
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
    net = Net().to(DEVICE)
    _, _, testloader = load_datasets(0, NUM_PARTITIONS)
    set_parameters(net, parameters)  # Update model with the latest parameters
    loss, accuracy = test(net, testloader)
    print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
    return loss, {"accuracy": accuracy}

We create a FedAvg strategy and pass evaluate_fn to it. Then, we create a ServerApp that uses this strategy.

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create the FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
        evaluate_fn=evaluate,  # Pass the evaluation function
    )
    # Configure the server for 3 rounds of training
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

Finally, we run the simulation.

[ ]:
# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

Envoi/réception de valeurs arbitraires vers/depuis les clients

In some situations, we want to configure client-side execution (training, evaluation) from the server-side. One example for that is the server asking the clients to train for a certain number of local epochs. Flower provides a way to send configuration values from the server to the clients using a dictionary. Let’s look at an example where the clients receive values from the server through the config parameter in fit (config is also available in evaluate). The fit method receives the configuration dictionary through the config parameter and can then read values from this dictionary. In this example, it reads server_round and local_epochs and uses those values to improve the logging and configure the number of local training epochs:

[ ]:
class FlowerClient(NumPyClient):
    def __init__(self, pid, net, trainloader, valloader):
        self.pid = pid  # partition ID of a client
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.pid}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        # Read values from config
        server_round = config["server_round"]
        local_epochs = config["local_epochs"]

        # Use values provided by the config
        print(f"[Client {self.pid}, round {server_round}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=local_epochs)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.pid}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(context: Context) -> Client:
    net = Net().to(DEVICE)
    partition_id = context.node_config["partition-id"]
    num_partitions = context.node_config["num-partitions"]
    trainloader, valloader, _ = load_datasets(partition_id, num_partitions)
    return FlowerClient(partition_id, net, trainloader, valloader).to_client()


# Create the ClientApp
client = ClientApp(client_fn=client_fn)

Comment pouvons-nous donc envoyer ce dictionnaire de configuration du serveur aux clients ? Les stratégies de Flower intégrées fournissent un moyen de le faire, et cela fonctionne de la même façon que l’évaluation côté serveur. Nous fournissons une fonction à la stratégie, et la stratégie appelle cette fonction pour chaque cycle d’apprentissage fédéré :

[ ]:
def fit_config(server_round: int):
    """Return training configuration dict for each round.

    Perform two rounds of training with one local epoch, increase to two local
    epochs afterwards.
    """
    config = {
        "server_round": server_round,  # The current round of federated learning
        "local_epochs": 1 if server_round < 2 else 2,
    }
    return config

Next, we’ll pass this function to the FedAvg strategy before starting the simulation:

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
        evaluate_fn=evaluate,
        on_fit_config_fn=fit_config,  # Pass the fit_config function
    )
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

Comme nous pouvons le voir, les journaux des clients incluent maintenant le cycle actuel d’apprentissage fédéré (qu’ils lisent depuis le dictionnaire config). Nous pouvons également configurer l’apprentissage local pour qu’il s’exécute pendant une époque au cours du premier et du deuxième cycle d’apprentissage fédéré, puis pendant deux époques au cours du troisième cycle.

Les clients peuvent également renvoyer des valeurs arbitraires au serveur. Pour ce faire, ils renvoient un dictionnaire depuis fit et/ou evaluate. Nous avons vu et utilisé ce concept tout au long de ce carnet sans le mentionner explicitement : notre FlowerClient renvoie un dictionnaire contenant une paire clé/valeur personnalisée en tant que troisième valeur de retour dans evaluate.

Mise à l’échelle de l’apprentissage fédéré

Comme dernière étape de ce carnet, voyons comment nous pouvons utiliser Flower pour expérimenter avec un grand nombre de clients.

[ ]:
NUM_PARTITIONS = 1000

Note that we can reuse the ClientApp for different num-partitions since the Context is defined by the num_supernodes argument in run_simulation().

We now have 1000 partitions, each holding 45 training and 5 validation examples. Given that the number of training examples on each client is quite small, we should probably train the model a bit longer, so we configure the clients to perform 3 local training epochs. We should also adjust the fraction of clients selected for training during each round (we don’t want all 1000 clients participating in every round), so we adjust fraction_fit to 0.025, which means that only 2.5% of available clients (so 25 clients) will be selected for training each round:

[ ]:
def fit_config(server_round: int):
    config = {
        "server_round": server_round,
        "local_epochs": 3,
    }
    return config


def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.025,  # Train on 25 clients (each round)
        fraction_evaluate=0.05,  # Evaluate on 50 clients (each round)
        min_fit_clients=20,
        min_evaluate_clients=40,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
        on_fit_config_fn=fit_config,
    )
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

Récapitulation

Dans ce carnet, nous avons vu comment nous pouvons progressivement améliorer notre système en personnalisant la stratégie, en initialisant les paramètres côté serveur, en choisissant une stratégie différente et en évaluant les modèles côté serveur. C’est une sacrée flexibilité avec si peu de code, n’est-ce pas ?

Dans les sections ultérieures, nous avons vu comment nous pouvons communiquer des valeurs arbitraires entre le serveur et les clients pour personnaliser entièrement l’exécution côté client. Grâce à cette capacité, nous avons construit une simulation d’apprentissage fédéré à grande échelle en utilisant le moteur de client virtuel Flower et nous avons mené une expérience impliquant 1000 clients dans la même charge de travail - le tout dans un carnet Jupyter !

Prochaines étapes

Before you continue, make sure to join the Flower community on Flower Discuss (Join Flower Discuss) and on Slack (Join Slack).

Il existe un canal dédié aux questions si vous avez besoin d’aide, mais nous aimerions aussi savoir qui vous êtes dans #introductions !

The Flower Federated Learning Tutorial - Part 3 shows how to build a fully custom Strategy from scratch.


Open in Colab