Open in Colab

Get started with Flower#

Bienvenue au tutoriel sur l’apprentissage fédéré de la fleur !

In this notebook, we’ll build a federated learning system using Flower, Flower Datasets and PyTorch. In part 1, we use PyTorch for the model training pipeline and data loading. In part 2, we continue to federate the PyTorch-based pipeline using Flower.

Star Flower on GitHub ⭐️ et rejoignez la communauté Flower sur Slack pour vous connecter, poser des questions et obtenir de l’aide : Join Slack 🌼 Nous serions ravis d’avoir de vos nouvelles dans le canal #introductions ! Et si quelque chose n’est pas clair, rendez-vous sur le canal #questions.

Let’s get started!

Étape 0 : Préparation#

Avant de commencer à coder, assurons-nous que nous disposons de tout ce dont nous avons besoin.

Installation des dépendances#

Next, we install the necessary packages for PyTorch (torch and torchvision), Flower Datasets (flwr-datasets) and Flower (flwr):

[ ]:
!pip install -q flwr[simulation] flwr_datasets[vision] torch torchvision matplotlib

Maintenant que toutes les dépendances sont installées, nous pouvons importer tout ce dont nous avons besoin pour ce tutoriel :

[ ]:
from collections import OrderedDict
from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from datasets.utils.logging import disable_progress_bar
from torch.utils.data import DataLoader

import flwr as fl
from flwr.common import Metrics
from flwr_datasets import FederatedDataset

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(
    f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}"
)
disable_progress_bar()

It is possible to switch to a runtime that has GPU acceleration enabled (on Google Colab: Runtime > Change runtime type > Hardware accelerator: GPU > Save). Note, however, that Google Colab is not always able to offer GPU acceleration. If you see an error related to GPU availability in one of the following sections, consider switching back to CPU-based execution by setting DEVICE = torch.device("cpu"). If the runtime has GPU acceleration enabled, you should see the output Training on cuda, otherwise it’ll say Training on cpu.

Chargement des données#

Federated learning can be applied to many different types of tasks across different domains. In this tutorial, we introduce federated learning by training a simple convolutional neural network (CNN) on the popular CIFAR-10 dataset. CIFAR-10 can be used to train image classifiers that distinguish between images from ten different classes: “airplane”, “automobile”, “bird”, “cat”, “deer”, “dog”, “frog”, “horse”, “ship”, and “truck”.

Nous simulons le fait d’avoir plusieurs ensembles de données provenant de plusieurs organisations (également appelé le paramètre « cross-silo » dans l’apprentissage fédéré) en divisant l’ensemble de données CIFAR-10 original en plusieurs partitions. Chaque partition représentera les données d’une seule organisation. Nous faisons cela purement à des fins d’expérimentation, dans le monde réel, il n’y a pas besoin de diviser les données parce que chaque organisation a déjà ses propres données (les données sont donc naturellement partitionnées).

Each organization will act as a client in the federated learning system. So having ten organizations participate in a federation means having ten clients connected to the federated learning server.

Let’s now create the Federated Dataset abstraction that from flwr-datasets that partitions the CIFAR-10. We will create small training and test set for each edge device and wrap each of them into a PyTorch DataLoader:

[ ]:
NUM_CLIENTS = 10
BATCH_SIZE = 32


def load_datasets():
    fds = FederatedDataset(dataset="cifar10", partitioners={"train": NUM_CLIENTS})

    def apply_transforms(batch):
        # Instead of passing transforms to CIFAR10(..., transform=transform)
        # we will use this function to dataset.with_transform(apply_transforms)
        # The transforms object is exactly the same
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
            ]
        )
        batch["img"] = [transform(img) for img in batch["img"]]
        return batch

    # Create train/val for each partition and wrap it into DataLoader
    trainloaders = []
    valloaders = []
    for partition_id in range(NUM_CLIENTS):
        partition = fds.load_partition(partition_id, "train")
        partition = partition.with_transform(apply_transforms)
        partition = partition.train_test_split(train_size=0.8, seed=42)
        trainloaders.append(DataLoader(partition["train"], batch_size=BATCH_SIZE))
        valloaders.append(DataLoader(partition["test"], batch_size=BATCH_SIZE))
    testset = fds.load_split("test").with_transform(apply_transforms)
    testloader = DataLoader(testset, batch_size=BATCH_SIZE)
    return trainloaders, valloaders, testloader


trainloaders, valloaders, testloader = load_datasets()

Nous avons maintenant une liste de dix ensembles de formation et dix ensembles de validation (trainloaders et valloaders) représentant les données de dix organisations différentes. Chaque paire trainloader`/``valloader contient 4500 exemples de formation et 500 exemples de validation. Il y a également un seul testloader (nous n’avons pas divisé l’ensemble de test). Encore une fois, cela n’est nécessaire que pour construire des systèmes de recherche ou d’éducation, les systèmes d’apprentissage fédérés actuels ont leurs données naturellement distribuées à travers plusieurs partitions.

Jetons un coup d’œil au premier lot d’images et d’étiquettes du premier ensemble d’entraînement (c’est-à-dire trainloaders[0]) avant de poursuivre :

[ ]:
batch = next(iter(trainloaders[0]))
images, labels = batch["img"], batch["label"]
# Reshape and convert images to a NumPy array
# matplotlib requires images with the shape (height, width, 3)
images = images.permute(0, 2, 3, 1).numpy()
# Denormalize
images = images / 2 + 0.5

# Create a figure and a grid of subplots
fig, axs = plt.subplots(4, 8, figsize=(12, 6))

# Loop over the images and plot them
for i, ax in enumerate(axs.flat):
    ax.imshow(images[i])
    ax.set_title(trainloaders[0].dataset.features["label"].int2str([labels[i]])[0])
    ax.axis("off")

# Show the plot
fig.tight_layout()
plt.show()

La sortie ci-dessus montre un lot aléatoire d’images provenant du premier chargeur de formation de notre liste de dix chargeurs de formation. Elle imprime également les étiquettes associées à chaque image (c’est-à-dire l’une des dix étiquettes possibles que nous avons vues ci-dessus). Si tu exécutes à nouveau la cellule, tu devrais voir un autre lot d’images.

Étape 1 : Formation centralisée avec PyTorch#

Ensuite, nous allons utiliser PyTorch pour définir un simple réseau neuronal convolutif. Cette introduction suppose une familiarité de base avec PyTorch, elle ne couvre donc pas en détail les aspects liés à PyTorch. Si tu veux plonger plus profondément dans PyTorch, nous te recommandons DEEP LEARNING WITH PYTORCH : A 60 MINUTE BLITZ.

Définir le modèle#

Nous utilisons le CNN simple décrit dans le tutoriel PyTorch :

[ ]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

Poursuivons avec les fonctions habituelles de formation et de test :

[ ]:
def train(net, trainloader, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for batch in trainloader:
            images, labels = batch["img"].to(DEVICE), batch["label"].to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        if verbose:
            print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in testloader:
            images, labels = batch["img"].to(DEVICE), batch["label"].to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

Entraîne le modèle#

Nous avons maintenant tous les éléments de base dont nous avons besoin : un ensemble de données, un modèle, une fonction d’entraînement et une fonction de test. Assemblons-les pour entraîner le modèle sur l’ensemble de données de l’une de nos organisations (trainloaders[0]). Cela simule la réalité de la plupart des projets d’apprentissage automatique aujourd’hui : chaque organisation possède ses propres données et entraîne les modèles uniquement sur ces données internes :

[ ]:
trainloader = trainloaders[0]
valloader = valloaders[0]
net = Net().to(DEVICE)

for epoch in range(5):
    train(net, trainloader, 1)
    loss, accuracy = test(net, valloader)
    print(f"Epoch {epoch+1}: validation loss {loss}, accuracy {accuracy}")

loss, accuracy = test(net, testloader)
print(f"Final test set performance:\n\tloss {loss}\n\taccuracy {accuracy}")

L’entraînement du CNN simple sur notre fractionnement CIFAR-10 pendant 5 époques devrait se traduire par une précision de l’ensemble de test d’environ 41 %, ce qui n’est pas bon, mais en même temps, cela n’a pas vraiment d’importance pour les besoins de ce tutoriel. L’intention était juste de montrer un pipeline d’entraînement centralisé simpliste qui prépare le terrain pour ce qui vient ensuite - l’apprentissage fédéré !

Étape 2 : Apprentissage fédéré avec Flower#

L’étape 1 a montré un simple pipeline de formation centralisé. Toutes les données étaient au même endroit (c’est-à-dire un seul trainloader et un seul valloader). Ensuite, nous allons simuler une situation où nous avons plusieurs ensembles de données dans plusieurs organisations et où nous formons un modèle sur ces organisations à l’aide de l’apprentissage fédéré.

Mise à jour des paramètres du modèle#

Dans l’apprentissage fédéré, le serveur envoie les paramètres du modèle global au client, et le client met à jour le modèle local avec les paramètres reçus du serveur. Il entraîne ensuite le modèle sur les données locales (ce qui modifie les paramètres du modèle localement) et renvoie les paramètres du modèle mis à jour/changés au serveur (ou, alternativement, il renvoie seulement les gradients au serveur, et non pas les paramètres complets du modèle).

Nous avons besoin de deux fonctions d’aide pour mettre à jour le modèle local avec les paramètres reçus du serveur et pour obtenir les paramètres mis à jour du modèle local : set_parameters et get_parameters. Les deux fonctions suivantes font exactement cela pour le modèle PyTorch ci-dessus.

Les détails de ce fonctionnement ne sont pas vraiment importants ici (n’hésite pas à consulter la documentation PyTorch si tu veux en savoir plus). En substance, nous utilisons state_dict pour accéder aux tenseurs de paramètres du modèle PyTorch. Les tenseurs de paramètres sont ensuite convertis en/depuis une liste de ndarray NumPy (que Flower sait sérialiser/désérialiser) :

[ ]:
def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

Mise en place d’un client Flower#

Ceci étant dit, passons à la partie intéressante. Les systèmes d’apprentissage fédérés se composent d’un serveur et de plusieurs clients. Dans Flower, nous créons des clients en mettant en œuvre des sous-classes de flwr.client.Client ou de flwr.client.NumPyClient. Nous utilisons NumPyClient dans ce tutoriel parce qu’il est plus facile à mettre en œuvre et qu’il nous oblige à rédiger moins de modèles de chaudière.

Pour mettre en œuvre le client Flower, nous créons une sous-classe de flwr.client.NumPyClient et mettons en œuvre les trois méthodes get_parameters, fit et evaluate :

  • get_parameters : renvoie les paramètres du modèle local actuel

  • fit : reçoit les paramètres du modèle du serveur, entraîne les paramètres du modèle sur les données locales et renvoie les paramètres du modèle (mis à jour) au serveur

  • evaluate : reçoit les paramètres du modèle du serveur, évalue les paramètres du modèle sur les données locales et renvoie le résultat de l’évaluation au serveur

Nous avons mentionné que nos clients utiliseront les composants PyTorch définis précédemment pour la formation et l’évaluation des modèles. Voyons une simple mise en œuvre du client Flower qui réunit tout cela :

[ ]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, net, trainloader, valloader):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return get_parameters(self.net)

    def fit(self, parameters, config):
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

Our class FlowerClient defines how local training/evaluation will be performed and allows Flower to call the local training/evaluation through fit and evaluate. Each instance of FlowerClient represents a single client in our federated learning system. Federated learning systems have multiple clients (otherwise, there’s not much to federate), so each client will be represented by its own instance of FlowerClient. If we have, for example, three clients in our workload, then we’d have three instances of FlowerClient. Flower calls FlowerClient.fit on the respective instance when the server selects a particular client for training (and FlowerClient.evaluate for evaluation).

Utilisation du moteur du client virtuel#

Dans ce carnet, nous voulons simuler un système d’apprentissage fédéré avec 10 clients sur une seule machine. Cela signifie que le serveur et les 10 clients vivront sur une seule machine et partageront des ressources telles que le CPU, le GPU et la mémoire. Avoir 10 clients signifierait avoir 10 instances de FlowerClient en mémoire. Faire cela sur une seule machine peut rapidement épuiser les ressources mémoire disponibles, même si seulement un sous-ensemble de ces clients participe à un seul tour d’apprentissage fédéré.

In addition to the regular capabilities where server and clients run on multiple machines, Flower, therefore, provides special simulation capabilities that create FlowerClient instances only when they are actually necessary for training or evaluation. To enable the Flower framework to create clients when necessary, we need to implement a function called client_fn that creates a FlowerClient instance on demand. Flower calls client_fn whenever it needs an instance of one particular client to call fit or evaluate (those instances are usually discarded after use, so they should not keep any local state). Clients are identified by a client ID, or short cid. The cid can be used, for example, to load different local data partitions for different clients, as can be seen below:

[ ]:
def client_fn(cid: str) -> FlowerClient:
    """Create a Flower client representing a single organization."""

    # Load model
    net = Net().to(DEVICE)

    # Load data (CIFAR-10)
    # Note: each client gets a different trainloader/valloader, so each client
    # will train and evaluate on their own unique data
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(net, trainloader, valloader).to_client()

Commencer la formation#

Nous avons maintenant la classe FlowerClient qui définit l’entraînement/évaluation côté client et client_fn qui permet à Flower de créer des instances de FlowerClient chaque fois qu’il a besoin d’appeler fit ou evaluate sur un client particulier. La dernière étape consiste à démarrer la simulation réelle en utilisant flwr.simulation.start_simulation.

La fonction start_simulation accepte un certain nombre d’arguments, parmi lesquels le client_fn utilisé pour créer les instances FlowerClient, le nombre de clients à simuler (num_clients), le nombre de tours d’apprentissage fédéré (num_rounds), et la stratégie. La stratégie encapsule l’approche/algorithme d’apprentissage fédéré, par exemple, Federated Averaging (FedAvg).

Flower dispose d’un certain nombre de stratégies intégrées, mais nous pouvons également utiliser nos propres implémentations de stratégies pour personnaliser presque tous les aspects de l’approche de l’apprentissage fédéré. Pour cet exemple, nous utilisons l’implémentation intégrée FedAvg et nous la personnalisons en utilisant quelques paramètres de base. La dernière étape est l’appel à start_simulation qui - tu l’as deviné - démarre la simulation :

[ ]:
# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,  # Sample 100% of available clients for training
    fraction_evaluate=0.5,  # Sample 50% of available clients for evaluation
    min_fit_clients=10,  # Never sample less than 10 clients for training
    min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
    min_available_clients=10,  # Wait until all 10 clients are available
)

# Specify the resources each of your clients need. By default, each
# client will be allocated 1x CPU and 0x GPUs
client_resources = {"num_cpus": 1, "num_gpus": 0.0}
if DEVICE.type == "cuda":
    # here we are assigning an entire GPU for each client.
    client_resources = {"num_cpus": 1, "num_gpus": 1.0}
    # Refer to our documentation for more details about Flower Simulations
    # and how to setup these `client_resources`.

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=5),
    strategy=strategy,
    client_resources=client_resources,
)

Dans les coulisses#

Alors, comment cela fonctionne-t-il ? Comment Flower exécute-t-il cette simulation ?

Lorsque nous appelons start_simulation, nous disons à Flower qu’il y a 10 clients (num_clients=10). Flower demande alors à la stratégie FedAvg de sélectionner des clients. FedAvg` sait qu'il doit sélectionner 100% des clients disponibles (``fraction_fit=1.0), alors il choisit 10 clients au hasard (c’est à dire 100% de 10).

Flower demande ensuite aux 10 clients sélectionnés d’entraîner le modèle. Lorsque le serveur reçoit les mises à jour des paramètres du modèle de la part des clients, il les transmet à la stratégie (FedAvg) pour qu’elle les agrège. La stratégie agrège ces mises à jour et renvoie le nouveau modèle global, qui est ensuite utilisé dans le prochain cycle d’apprentissage fédéré.

Où est la précision ?#

Tu as peut-être remarqué que toutes les mesures, à l’exception de pertes_distribuées, sont vides. Où est passée la {"précision" : float(précision)} ?

Flower peut automatiquement agréger les pertes renvoyées par les clients individuels, mais il ne peut pas faire la même chose pour les mesures dans le dictionnaire de mesures générique (celui avec la clé accuracy). Les dictionnaires de mesures peuvent contenir des types de mesures très différents et même des paires clé/valeur qui ne sont pas des mesures du tout, donc le cadre ne sait pas (et ne peut pas) savoir comment les gérer automatiquement.

En tant qu’utilisateurs, nous devons indiquer au framework comment gérer/agréger ces métriques personnalisées, et nous le faisons en passant des fonctions d’agrégation de métriques à la stratégie. La stratégie appellera alors ces fonctions chaque fois qu’elle recevra des métriques d’ajustement ou d’évaluation de la part des clients. Les deux fonctions possibles sont fit_metrics_aggregation_fn et evaluate_metrics_aggregation_fn.

Créons une simple fonction de calcul de la moyenne pondérée pour agréger la mesure de « précision » que nous renvoie evaluate :

[ ]:
def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    # Multiply accuracy of each client by number of examples used
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    # Aggregate and return custom metric (weighted average)
    return {"accuracy": sum(accuracies) / sum(examples)}

La seule chose qui reste à faire est d’indiquer à la stratégie d’appeler cette fonction chaque fois qu’elle reçoit des dictionnaires de métriques d’évaluation de la part des clients :

[ ]:
# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,
    fraction_evaluate=0.5,
    min_fit_clients=10,
    min_evaluate_clients=5,
    min_available_clients=10,
    evaluate_metrics_aggregation_fn=weighted_average,  # <-- pass the metric aggregation function
)

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=5),
    strategy=strategy,
    client_resources=client_resources,
)

Nous avons maintenant un système complet qui effectue la formation fédérée et l’évaluation fédérée. Il utilise la fonction moyenne pondérée pour agréger les mesures d’évaluation personnalisées et calcule une seule mesure de précision pour tous les clients du côté du serveur.

Les deux autres catégories de mesures (pertes_centralisées et métriques_centralisées) sont toujours vides car elles ne s’appliquent que lorsque l’évaluation centralisée est utilisée. La deuxième partie du tutoriel sur les fleurs couvrira l’évaluation centralisée.

Remarques finales#

Félicitations, tu viens d’entraîner un réseau neuronal convolutif, fédéré sur 10 clients ! Avec ça, tu comprends les bases de l’apprentissage fédéré avec Flower. La même approche que tu as vue peut être utilisée avec d’autres cadres d’apprentissage automatique (pas seulement PyTorch) et d’autres tâches (pas seulement la classification des images CIFAR-10), par exemple le NLP avec Hugging Face Transformers ou la parole avec SpeechBrain.

Dans le prochain cahier, nous allons aborder des concepts plus avancés. Tu veux personnaliser ta stratégie ? Initialiser des paramètres côté serveur ? Ou évaluer le modèle agrégé côté serveur ? Nous aborderons tout cela et bien plus encore dans le prochain tutoriel.

Prochaines étapes#

Avant de continuer, n’oublie pas de rejoindre la communauté Flower sur Slack : Join Slack

Il existe un canal dédié aux questions si vous avez besoin d’aide, mais nous aimerions aussi savoir qui vous êtes dans #introductions !

The Flower Federated Learning Tutorial - Part 2 goes into more depth about strategies and all the advanced things you can build with them.


Open in Colab