Open in Colab

Use a federated learning strategy#

Welcome to the next part of the federated learning tutorial. In previous parts of this tutorial, we introduced federated learning with PyTorch and Flower (part 1).

Dans ce carnet, nous allons commencer à personnaliser le système d’apprentissage fédéré que nous avons construit dans le carnet d’introduction (toujours en utilisant Flower et PyTorch).

Star Flower on GitHub ⭐️ et rejoignez la communauté Flower sur Slack pour vous connecter, poser des questions et obtenir de l’aide : Join Slack 🌼 Nous serions ravis d’avoir de vos nouvelles dans le canal #introductions ! Et si quelque chose n’est pas clair, rendez-vous sur le canal #questions.

Let’s move beyond FedAvg with Flower strategies!

Préparation#

Avant de commencer le code proprement dit, assurons-nous que nous disposons de tout ce dont nous avons besoin.

Installation des dépendances#

Tout d’abord, nous installons les paquets nécessaires :

[ ]:
!pip install -q flwr[simulation] torch torchvision

Maintenant que toutes les dépendances sont installées, nous pouvons importer tout ce dont nous avons besoin pour ce tutoriel :

[ ]:
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import CIFAR10

import flwr as fl

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(
    f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}"
)

Il est possible de passer à un runtime dont l’accélération GPU est activée (sur Google Colab : Runtime > Change runtime type > Hardware acclerator : GPU > Save). Note cependant que Google Colab n’est pas toujours en mesure de proposer l’accélération GPU. Si tu vois une erreur liée à la disponibilité du GPU dans l’une des sections suivantes, envisage de repasser à une exécution basée sur le CPU en définissant DEVICE = torch.device("cpu"). Si le runtime a activé l’accélération GPU, tu devrais voir apparaître le résultat Training on cuda, sinon il dira Training on cpu.

Chargement des données#

Chargeons maintenant les ensembles d’entraînement et de test CIFAR-10, divisons-les en dix ensembles de données plus petits (chacun divisé en ensemble d’entraînement et de validation), et enveloppons le tout dans leur propre DataLoader. Nous introduisons un nouveau paramètre num_clients qui nous permet d’appeler load_datasets avec différents nombres de clients.

[ ]:
NUM_CLIENTS = 10


def load_datasets(num_clients: int):
    # Download and transform CIFAR-10 (train and test)
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )
    trainset = CIFAR10("./dataset", train=True, download=True, transform=transform)
    testset = CIFAR10("./dataset", train=False, download=True, transform=transform)

    # Split training set into `num_clients` partitions to simulate different local datasets
    partition_size = len(trainset) // num_clients
    lengths = [partition_size] * num_clients
    datasets = random_split(trainset, lengths, torch.Generator().manual_seed(42))

    # Split each partition into train/val and create DataLoader
    trainloaders = []
    valloaders = []
    for ds in datasets:
        len_val = len(ds) // 10  # 10 % validation set
        len_train = len(ds) - len_val
        lengths = [len_train, len_val]
        ds_train, ds_val = random_split(ds, lengths, torch.Generator().manual_seed(42))
        trainloaders.append(DataLoader(ds_train, batch_size=32, shuffle=True))
        valloaders.append(DataLoader(ds_val, batch_size=32))
    testloader = DataLoader(testset, batch_size=32)
    return trainloaders, valloaders, testloader


trainloaders, valloaders, testloader = load_datasets(NUM_CLIENTS)

Formation/évaluation du modèle#

Continuons avec la définition habituelle du modèle (y compris set_parameters et get_parameters), les fonctions d’entraînement et de test :

[ ]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def train(net, trainloader, epochs: int):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for images, labels in trainloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

Client de Flower#

Pour mettre en œuvre le client Flower, nous créons (à nouveau) une sous-classe de flwr.client.NumPyClient et mettons en œuvre les trois méthodes get_parameters, fit et evaluate. Ici, nous transmettons également le cid au client et l’utilisons pour consigner des détails supplémentaires :

[ ]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, cid, net, trainloader, valloader):
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.cid}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        print(f"[Client {self.cid}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.cid}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(cid) -> FlowerClient:
    net = Net().to(DEVICE)
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]
    return FlowerClient(cid, net, trainloader, valloader)

Personnalisation de la stratégie#

Jusqu’à présent, tout devrait te sembler familier si tu as travaillé sur le cahier d’introduction. Avec cela, nous sommes prêts à présenter un certain nombre de nouvelles fonctionnalités.

Paramètres côté serveur initialisation#

Flower, par défaut, initialise le modèle global en demandant à un client aléatoire les paramètres initiaux. Dans de nombreux cas, nous voulons cependant avoir plus de contrôle sur l’initialisation des paramètres. Flower te permet donc de passer directement les paramètres initiaux à la Stratégie :

[ ]:
# Create an instance of the model and get the parameters
params = get_parameters(Net())

# Pass parameters to the Strategy for server-side parameter initialization
strategy = fl.server.strategy.FedAvg(
    fraction_fit=0.3,
    fraction_evaluate=0.3,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=NUM_CLIENTS,
    initial_parameters=fl.common.ndarrays_to_parameters(params),
)

# Specify client resources if you need GPU (defaults to 1 CPU and 0 GPU)
client_resources = None
if DEVICE.type == "cuda":
    client_resources = {"num_gpus": 1}

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=3),  # Just three rounds
    strategy=strategy,
    client_resources=client_resources,
)

Le fait de passer initial_parameters à la stratégie FedAvg empêche Flower de demander les paramètres initiaux à l’un des clients. Si nous regardons de près, nous pouvons voir que les journaux ne montrent aucun appel à la méthode FlowerClient.get_parameters.

Commencer par une stratégie personnalisée#

Elle accepte un certain nombre d’arguments, parmi lesquels le client_fn utilisé pour créer les instances de FlowerClient, le nombre de clients à simuler num_clients, le nombre de rounds num_rounds, et la stratégie.

La stratégie englobe l’approche/l’algorithme d’apprentissage fédéré, par exemple, FedAvg ou FedAdagrad. Essayons d’utiliser une stratégie différente cette fois-ci :

[ ]:
# Create FedAdam strategy
strategy = fl.server.strategy.FedAdagrad(
    fraction_fit=0.3,
    fraction_evaluate=0.3,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=NUM_CLIENTS,
    initial_parameters=fl.common.ndarrays_to_parameters(get_parameters(Net())),
)

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=3),  # Just three rounds
    strategy=strategy,
    client_resources=client_resources,
)

Paramètre côté serveur évaluation#

Flower peut évaluer le modèle agrégé côté serveur ou côté client. Les évaluations côté client et côté serveur sont similaires à certains égards, mais différentes à d’autres.

L’évaluation centralisée (ou évaluation côté serveur) est conceptuellement simple : elle fonctionne de la même manière que l’évaluation dans l’apprentissage automatique centralisé. S’il existe un ensemble de données côté serveur qui peut être utilisé à des fins d’évaluation, alors c’est parfait. Nous pouvons évaluer le modèle nouvellement agrégé après chaque cycle de formation sans avoir à envoyer le modèle aux clients. Nous avons également la chance que l’ensemble de notre ensemble de données d’évaluation soit disponible à tout moment.

L’évaluation fédérée (ou évaluation côté client) est plus complexe, mais aussi plus puissante : elle ne nécessite pas d’ensemble de données centralisé et nous permet d’évaluer les modèles sur un plus grand ensemble de données, ce qui donne souvent des résultats d’évaluation plus réalistes. En fait, de nombreux scénarios exigent que nous utilisions l’évaluation fédérée** si nous voulons obtenir des résultats d’évaluation représentatifs. Mais cette puissance a un coût : une fois que nous commençons à évaluer côté client, nous devons savoir que notre ensemble de données d’évaluation peut changer au cours des cycles d’apprentissage consécutifs si ces clients ne sont pas toujours disponibles. De plus, l’ensemble de données détenu par chaque client peut également changer au cours des cycles consécutifs. Cela peut conduire à des résultats d’évaluation qui ne sont pas stables, donc même si nous ne changions pas le modèle, nous verrions nos résultats d’évaluation fluctuer au cours des cycles consécutifs.

Nous avons vu comment l’évaluation fédérée fonctionne du côté client (c’est-à-dire en implémentant la méthode evaluate dans FlowerClient). Voyons maintenant comment nous pouvons évaluer les paramètres du modèle agrégé du côté serveur :

[ ]:
# The `evaluate` function will be by Flower called after every round
def evaluate(
    server_round: int,
    parameters: fl.common.NDArrays,
    config: Dict[str, fl.common.Scalar],
) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]:
    net = Net().to(DEVICE)
    valloader = valloaders[0]
    set_parameters(net, parameters)  # Update model with the latest parameters
    loss, accuracy = test(net, valloader)
    print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
    return loss, {"accuracy": accuracy}
[ ]:
strategy = fl.server.strategy.FedAvg(
    fraction_fit=0.3,
    fraction_evaluate=0.3,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=NUM_CLIENTS,
    initial_parameters=fl.common.ndarrays_to_parameters(get_parameters(Net())),
    evaluate_fn=evaluate,  # Pass the evaluation function
)

fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=3),  # Just three rounds
    strategy=strategy,
    client_resources=client_resources,
)

Envoi/réception de valeurs arbitraires vers/depuis les clients#

In some situations, we want to configure client-side execution (training, evaluation) from the server-side. One example for that is the server asking the clients to train for a certain number of local epochs. Flower provides a way to send configuration values from the server to the clients using a dictionary. Let’s look at an example where the clients receive values from the server through the config parameter in fit (config is also available in evaluate). The fit method receives the configuration dictionary through the config parameter and can then read values from this dictionary. In this example, it reads server_round and local_epochs and uses those values to improve the logging and configure the number of local training epochs:

[ ]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, cid, net, trainloader, valloader):
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.cid}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        # Read values from config
        server_round = config["server_round"]
        local_epochs = config["local_epochs"]

        # Use values provided by the config
        print(f"[Client {self.cid}, round {server_round}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=local_epochs)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.cid}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(cid) -> FlowerClient:
    net = Net().to(DEVICE)
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]
    return FlowerClient(cid, net, trainloader, valloader)

Comment pouvons-nous donc envoyer ce dictionnaire de configuration du serveur aux clients ? Les stratégies de Flower intégrées fournissent un moyen de le faire, et cela fonctionne de la même façon que l’évaluation côté serveur. Nous fournissons une fonction à la stratégie, et la stratégie appelle cette fonction pour chaque cycle d’apprentissage fédéré :

[ ]:
def fit_config(server_round: int):
    """Return training configuration dict for each round.

    Perform two rounds of training with one local epoch, increase to two local
    epochs afterwards.
    """
    config = {
        "server_round": server_round,  # The current round of federated learning
        "local_epochs": 1 if server_round < 2 else 2,  #
    }
    return config

Ensuite, nous allons simplement passer cette fonction à la stratégie FedAvg avant de commencer la simulation :

[ ]:
strategy = fl.server.strategy.FedAvg(
    fraction_fit=0.3,
    fraction_evaluate=0.3,
    min_fit_clients=3,
    min_evaluate_clients=3,
    min_available_clients=NUM_CLIENTS,
    initial_parameters=fl.common.ndarrays_to_parameters(get_parameters(Net())),
    evaluate_fn=evaluate,
    on_fit_config_fn=fit_config,  # Pass the fit_config function
)

fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=3),  # Just three rounds
    strategy=strategy,
    client_resources=client_resources,
)

Comme nous pouvons le voir, les journaux des clients incluent maintenant le cycle actuel d’apprentissage fédéré (qu’ils lisent depuis le dictionnaire config). Nous pouvons également configurer l’apprentissage local pour qu’il s’exécute pendant une époque au cours du premier et du deuxième cycle d’apprentissage fédéré, puis pendant deux époques au cours du troisième cycle.

Les clients peuvent également renvoyer des valeurs arbitraires au serveur. Pour ce faire, ils renvoient un dictionnaire depuis fit et/ou evaluate. Nous avons vu et utilisé ce concept tout au long de ce carnet sans le mentionner explicitement : notre FlowerClient renvoie un dictionnaire contenant une paire clé/valeur personnalisée en tant que troisième valeur de retour dans evaluate.

Mise à l’échelle de l’apprentissage fédéré#

Comme dernière étape de ce carnet, voyons comment nous pouvons utiliser Flower pour expérimenter avec un grand nombre de clients.

[ ]:
NUM_CLIENTS = 1000

trainloaders, valloaders, testloader = load_datasets(NUM_CLIENTS)

Nous avons maintenant 1000 partitions, chacune contenant 45 exemples d’entraînement et 5 exemples de validation. Etant donné que le nombre d’exemples d’entraînement sur chaque client est assez faible, nous devrions probablement entraîner le modèle un peu plus longtemps, nous configurons donc les clients pour qu’ils effectuent 3 époques d’entraînement local. Nous devrions également ajuster la fraction de clients sélectionnés pour l’entraînement à chaque tour (nous ne voulons pas que les 1000 clients participent à chaque tour), nous ajustons donc fraction_fit à 0.05, ce qui signifie que seulement 5% des clients disponibles (donc 50 clients) seront sélectionnés pour l’entraînement à chaque tour :

[ ]:
def fit_config(server_round: int):
    config = {
        "server_round": server_round,
        "local_epochs": 3,
    }
    return config


strategy = fl.server.strategy.FedAvg(
    fraction_fit=0.025,  # Train on 25 clients (each round)
    fraction_evaluate=0.05,  # Evaluate on 50 clients (each round)
    min_fit_clients=20,
    min_evaluate_clients=40,
    min_available_clients=NUM_CLIENTS,
    initial_parameters=fl.common.ndarrays_to_parameters(get_parameters(Net())),
    on_fit_config_fn=fit_config,
)

fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=3),  # Just three rounds
    strategy=strategy,
    client_resources=client_resources,
)

Récapitulation#

Dans ce carnet, nous avons vu comment nous pouvons progressivement améliorer notre système en personnalisant la stratégie, en initialisant les paramètres côté serveur, en choisissant une stratégie différente et en évaluant les modèles côté serveur. C’est une sacrée flexibilité avec si peu de code, n’est-ce pas ?

Dans les sections ultérieures, nous avons vu comment nous pouvons communiquer des valeurs arbitraires entre le serveur et les clients pour personnaliser entièrement l’exécution côté client. Grâce à cette capacité, nous avons construit une simulation d’apprentissage fédéré à grande échelle en utilisant le moteur de client virtuel Flower et nous avons mené une expérience impliquant 1000 clients dans la même charge de travail - le tout dans un carnet Jupyter !

Prochaines étapes#

Avant de continuer, n’oublie pas de rejoindre la communauté Flower sur Slack : Join Slack

Il existe un canal dédié aux questions si vous avez besoin d’aide, mais nous aimerions aussi savoir qui vous êtes dans #introductions !

The Flower Federated Learning Tutorial - Part 3 shows how to build a fully custom Strategy from scratch.


Open in Colab