Open in Colab

开始使用Flower#

欢迎阅读Flower联邦学习教程!

In this notebook, we’ll build a federated learning system using Flower, Flower Datasets and PyTorch. In part 1, we use PyTorch for the model training pipeline and data loading. In part 2, we continue to federate the PyTorch-based pipeline using Flower.

Star Flower on GitHub ⭐️ 并加入 Slack 上的 Flower 社区,进行交流、提问并获得帮助: 加入 Slack <https://flower.ai/join-slack>`__ 🌼 我们希望在 #introductions 频道听到您的声音!如果有任何不清楚的地方,请访问 #questions 频道。

让我们开始吧!

步骤 0:准备工作#

在开始编写实际代码之前,让我们先确保我们已经准备好了所需的一切。

安装依赖项#

Next, we install the necessary packages for PyTorch (torch and torchvision), Flower Datasets (flwr-datasets) and Flower (flwr):

[ ]:
!pip install -q flwr[simulation] flwr_datasets[vision] torch torchvision matplotlib

现在我们已经安装了所有依赖项,可以导入本教程所需的所有内容:

[ ]:
from collections import OrderedDict
from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from datasets.utils.logging import disable_progress_bar
from torch.utils.data import DataLoader

import flwr as fl
from flwr.common import Metrics
from flwr_datasets import FederatedDataset

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(
    f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}"
)
disable_progress_bar()

It is possible to switch to a runtime that has GPU acceleration enabled (on Google Colab: Runtime > Change runtime type > Hardware accelerator: GPU > Save). Note, however, that Google Colab is not always able to offer GPU acceleration. If you see an error related to GPU availability in one of the following sections, consider switching back to CPU-based execution by setting DEVICE = torch.device("cpu"). If the runtime has GPU acceleration enabled, you should see the output Training on cuda, otherwise it’ll say Training on cpu.

加载数据#

Federated learning can be applied to many different types of tasks across different domains. In this tutorial, we introduce federated learning by training a simple convolutional neural network (CNN) on the popular CIFAR-10 dataset. CIFAR-10 can be used to train image classifiers that distinguish between images from ten different classes: 'airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', and 'truck'.

我们通过将原始 CIFAR-10 数据集拆分成多个分区来模拟来自多个组织的多个数据集(也称为联邦学习中的 "跨分区 "设置)。每个分区代表一个组织的数据。我们这样做纯粹是为了实验目的,在现实世界中不需要拆分数据,因为每个组织都已经有了自己的数据(所以数据是自然分区的)。

Each organization will act as a client in the federated learning system. So having ten organizations participate in a federation means having ten clients connected to the federated learning server.

Let’s now create the Federated Dataset abstraction that from flwr-datasets that partitions the CIFAR-10. We will create small training and test set for each edge device and wrap each of them into a PyTorch DataLoader:

[ ]:
NUM_CLIENTS = 10
BATCH_SIZE = 32


def load_datasets():
    fds = FederatedDataset(dataset="cifar10", partitioners={"train": NUM_CLIENTS})

    def apply_transforms(batch):
        # Instead of passing transforms to CIFAR10(..., transform=transform)
        # we will use this function to dataset.with_transform(apply_transforms)
        # The transforms object is exactly the same
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
            ]
        )
        batch["img"] = [transform(img) for img in batch["img"]]
        return batch

    # Create train/val for each partition and wrap it into DataLoader
    trainloaders = []
    valloaders = []
    for partition_id in range(NUM_CLIENTS):
        partition = fds.load_partition(partition_id, "train")
        partition = partition.with_transform(apply_transforms)
        partition = partition.train_test_split(train_size=0.8, seed=42)
        trainloaders.append(DataLoader(partition["train"], batch_size=BATCH_SIZE))
        valloaders.append(DataLoader(partition["test"], batch_size=BATCH_SIZE))
    testset = fds.load_split("test").with_transform(apply_transforms)
    testloader = DataLoader(testset, batch_size=BATCH_SIZE)
    return trainloaders, valloaders, testloader


trainloaders, valloaders, testloader = load_datasets()

现在,我们有一个包含十个训练集和十个验证集(trainloaders` 和`valloaders``)的列表,代表十个不同组织的数据。每对 trainloader/valloader 都包含 4500 个训练示例和 500 个验证数据。还有一个单独的 ``测试加载器``(我们没有拆分测试集)。同样,这只有在构建研究或教育系统时才有必要,实际的联邦学习系统的数据自然分布在多个分区中。

在继续之前,让我们先看看第一个训练集中的第一批图像和标签(即 trainloaders[0]):

[ ]:
batch = next(iter(trainloaders[0]))
images, labels = batch["img"], batch["label"]
# Reshape and convert images to a NumPy array
# matplotlib requires images with the shape (height, width, 3)
images = images.permute(0, 2, 3, 1).numpy()
# Denormalize
images = images / 2 + 0.5

# Create a figure and a grid of subplots
fig, axs = plt.subplots(4, 8, figsize=(12, 6))

# Loop over the images and plot them
for i, ax in enumerate(axs.flat):
    ax.imshow(images[i])
    ax.set_title(trainloaders[0].dataset.features["label"].int2str([labels[i]])[0])
    ax.axis("off")

# Show the plot
fig.tight_layout()
plt.show()

上面的输出显示了来自十个 "trainloader "列表中第一个 "trainloader "的随机图像。它还打印了与每幅图像相关的标签(即我们上面看到的十个可能标签之一)。如果您再次运行该单元,应该会看到另一批图像。

步骤 1:使用 PyTorch 进行集中训练#

接下来,我们将使用 PyTorch 来定义一个简单的卷积神经网络。本介绍假定您对 PyTorch 有基本的了解,因此不会详细介绍与 PyTorch 相关的内容。如果你想更深入地了解 PyTorch,我们推荐你阅读 DEEP LEARNING WITH PYTORCH: a 60 minute blitz

定义模型#

我们使用` PyTorch 教程 <https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#define-a-convolutional-neural-network>`__ 中描述的简单 CNN:

[ ]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

让我们继续进行常规的训练和测试功能:

[ ]:
def train(net, trainloader, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for batch in trainloader:
            images, labels = batch["img"].to(DEVICE), batch["label"].to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        if verbose:
            print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in testloader:
            images, labels = batch["img"].to(DEVICE), batch["label"].to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

训练模型#

现在我们拥有了所需的所有基本构件:数据集、模型、训练函数和测试函数。让我们把它们放在一起,在我们其中一个组织的数据集(trainloaders[0])上训练模型。这模拟了当今大多数机器学习项目的实际情况:每个组织都有自己的数据,并且只在这些内部数据上训练模型:

[ ]:
trainloader = trainloaders[0]
valloader = valloaders[0]
net = Net().to(DEVICE)

for epoch in range(5):
    train(net, trainloader, 1)
    loss, accuracy = test(net, valloader)
    print(f"Epoch {epoch+1}: validation loss {loss}, accuracy {accuracy}")

loss, accuracy = test(net, testloader)
print(f"Final test set performance:\n\tloss {loss}\n\taccuracy {accuracy}")

在我们的 CIFAR-10 分片上对简单 CNN 进行 5 个遍历的训练后,测试集的准确率应为 41%,这并不理想,但同时对本教程而言也并不重要。我们只是想展示一个简单的集中式训练流程,为接下来的联邦学习做好铺垫!

步骤 2:使用 Flower 联邦学习#

步骤 1 演示了一个简单的集中式训练流程。所有数据都在一个地方(即一个 "trainloader "和一个 "valloader")。接下来,我们将模拟在多个组织中拥有多个数据集的情况,并使用联邦学习在这些组织中训练一个模型。

更新模型参数#

在联邦学习中,服务器将全局模型参数发送给客户端,客户端根据从服务器接收到的参数更新本地模型。然后,客户端根据本地数据对模型进行训练(在本地更改模型参数),并将更新/更改后的模型参数发回服务器(或者,客户端只将梯度参数发回服务器,而不是全部模型参数)。

我们需要两个辅助函数,用从服务器接收到的参数更新本地模型,并从本地模型获取更新后的模型参数:`` set_parameters```和`get_parameters``。下面两个函数就是为上面的 PyTorch 模型做这些工作的。

在这里,如何工作的细节并不重要(如果你想了解更多,请随时查阅 PyTorch 文档)。本质上,我们使用 state_dict 访问 PyTorch 模型参数张量。然后,参数张量会被转换成/转换成 NumPy ndarray 列表(Flower 知道如何序列化/反序列化):

[ ]:
def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

实现 Flower 客户端#

说完这些,让我们进入有趣的部分。联邦学习系统由一个服务器和多个客户端组成。在 Flower 中,我们通过实现 flwr.client.Clientflwr.client.NumPyClient 的子类来创建客户端。在本教程中,我们使用``NumPyClient``,因为它更容易实现,需要我们编写的模板也更少。

为实现 Flower 客户端,我们创建了 flwr.client.NumPyClient 的子类,并实现了 get_parametersfit 和``evaluate`` 三个方法:

  • get_parameters: 返回当前本地模型参数

  • fit: 从服务器接收模型参数,在本地数据上训练模型参数,并将(更新的)模型参数返回服务器

  • ``evaluate ``: 从服务器接收模型参数,在本地数据上评估模型参数,并将评估结果返回服务器

我们提到,我们的客户端将使用之前定义的 PyTorch 组件进行模型训练和评估。让我们来看看一个简单的 Flower 客户端实现,它将一切都整合在一起:

[ ]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, net, trainloader, valloader):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return get_parameters(self.net)

    def fit(self, parameters, config):
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}

我们的类 FlowerClient 定义了本地训练/评估的执行方式,并允许 Flower 通过 fitevaluate 调用本地训练/评估。每个 FlowerClient 实例都代表联邦学习系统中的*单个客户端*。联邦学习系统有多个客户端(否则就没有什么可联邦的),因此每个客户端都将由自己的 FlowerClient 实例来代表。例如,如果我们的工作负载中有三个客户端,那么我们就会有三个 FlowerClient 实例。当服务器选择特定客户端进行训练时,Flower 会调用相应实例上的 FlowerClient.fit (评估时调用 FlowerClient.evaluate)。

使用虚拟客户端引擎#

在本笔记中,我们要模拟一个联邦学习系统,在一台机器上有 10 个客户端。这意味着服务器和所有 10 个客户端都将位于一台机器上,并共享 CPU、GPU 和内存等资源。有 10 个客户端就意味着内存中有 10 个 FlowerClient 实例。在单台机器上这样做会很快耗尽可用的内存资源,即使这些客户端中只有一个子集参与了一轮联邦学习。

除了服务器和客户端在多台机器上运行的常规功能外,Flower 还提供了特殊的模拟功能,即只有在训练或评估实际需要时才创建 FlowerClient 实例。为了让 Flower 框架能在必要时创建客户端,我们需要实现一个名为 client_fn 的函数,它能按需创建一个 FlowerClient 实例。每当 Flower 需要一个特定的客户端实例来调用 fitevaluate 时,它就会调用 client_fn``(这些实例在使用后通常会被丢弃,因此它们不应保留任何本地状态)。客户端由一个客户端 ID 或简短的 ``cid 标识。例如,可以使用 cid 为不同的客户端加载不同的本地数据分区,如下所示:

[ ]:
def client_fn(cid: str) -> FlowerClient:
    """Create a Flower client representing a single organization."""

    # Load model
    net = Net().to(DEVICE)

    # Load data (CIFAR-10)
    # Note: each client gets a different trainloader/valloader, so each client
    # will train and evaluate on their own unique data
    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(net, trainloader, valloader).to_client()

开始训练#

现在我们有了定义客户端训练/评估的类 FlowerClient 和允许 Flower 在需要调用某个客户端的 fit` ``evaluate` 时创建 ``FlowerClient 实例的 client_fn` 类。最后一步是使用 ``flwr.simulation.start_simulation 启动实际模拟。

函数 start_simulation 接受许多参数,其中包括用于创建 FlowerClient 实例的 client_fn、要模拟的客户端数量(num_clients)、联邦学习轮数(num_rounds)和策略。策略封装了联邦学习方法/算法,例如*联邦平均* (FedAvg)。

Flower 有许多内置策略,但我们也可以使用自己的策略实现来定制联邦学习方法的几乎所有方面。在本例中,我们使用内置的 FedAvg 实现,并使用一些基本参数对其进行定制。最后一步是实际调用 ``start_simulation``开始模拟:

[ ]:
# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,  # Sample 100% of available clients for training
    fraction_evaluate=0.5,  # Sample 50% of available clients for evaluation
    min_fit_clients=10,  # Never sample less than 10 clients for training
    min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
    min_available_clients=10,  # Wait until all 10 clients are available
)

# Specify the resources each of your clients need. By default, each
# client will be allocated 1x CPU and 0x GPUs
client_resources = {"num_cpus": 1, "num_gpus": 0.0}
if DEVICE.type == "cuda":
    # here we are assigning an entire GPU for each client.
    client_resources = {"num_cpus": 1, "num_gpus": 1.0}
    # Refer to our documentation for more details about Flower Simulations
    # and how to setup these `client_resources`.

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=5),
    strategy=strategy,
    client_resources=client_resources,
)

幕后#

那么它是如何工作的呢?Flower 如何进行模拟?

当我们调用 start_simulation 时,我们会告诉 Flower 有 10 个客户(num_clients=10`)。然后,Flower 会要求 FedAvg 策略选择客户。FedAvg 知道它应该选择 100%的可用客户(fraction_fit=1.0),所以它会随机选择 10 个客户(即 10 的 100%)。

然后,Flower 会要求选定的 10 个客户端对模型进行训练。服务器收到客户端的模型参数更新后,会将这些更新交给策略(FedAvg)进行聚合。策略会聚合这些更新并返回新的全局模型,然后将其用于下一轮联邦学习。

准确度在哪里找?#

您可能已经注意到,除了 losses_distributed 以外,所有指标都是空的。{"准确度": float(准确度)}``去哪儿了?

Flower 可以自动汇总单个客户端返回的损失值,但无法对通用度量字典中的度量进行同样的处理(即带有 "准确度 "键的度量字典)。度量值字典可以包含非常不同种类的度量值,甚至包含根本不是度量值的键/值对,因此框架不知道(也无法知道)如何自动处理这些度量值。

作为用户,我们需要告诉框架如何处理/聚合这些自定义指标,为此,我们将指标聚合函数传递给策略。然后,只要从客户端接收到拟合或评估指标,策略就会调用这些函数。两个可能的函数是 fit_metrics_aggregation_fnevaluate_metrics_aggregation_fn

让我们创建一个简单的加权平均函数来汇总从 evaluate 返回的 accuracy 指标:

[ ]:
def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    # Multiply accuracy of each client by number of examples used
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    # Aggregate and return custom metric (weighted average)
    return {"accuracy": sum(accuracies) / sum(examples)}

剩下要做的就是告诉策略,每当它从客户端接收到评估度量字典时,都要调用这个函数:

[ ]:
# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,
    fraction_evaluate=0.5,
    min_fit_clients=10,
    min_evaluate_clients=5,
    min_available_clients=10,
    evaluate_metrics_aggregation_fn=weighted_average,  # <-- pass the metric aggregation function
)

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=5),
    strategy=strategy,
    client_resources=client_resources,
)

我们现在有了一个完整的系统,可以执行联邦训练和联邦评估。它使用 weighted_average 函数汇总自定义评估指标,并在服务器端计算所有客户端的单一 accuracy 指标。

其他两类指标(losses_centralized`metrics_centralized)仍然是空的,因为它们只适用于集中评估。Flower 教程的第二部分将介绍集中式评估。

结束语#

恭喜您,你刚刚训练了一个由 10 个客户端组成的卷积神经网络!这样,你就了解了使用 Flower 进行联邦学习的基础知识。你所看到的方法同样适用于其他机器学习框架(不只是 PyTorch)和任务(不只是 CIFAR-10 图像分类),例如使用 Hugging Face Transformers 的 NLP 或使用 SpeechBrain 的语音。

在下一个笔记中,我们将介绍一些更先进的概念。想定制你的策略吗?在服务器端初始化参数?或者在服务器端评估聚合模型?我们将在下一个教程中介绍所有这些内容以及更多。

接下来的步骤#

在继续之前,请务必加入 Slack 上的 Flower 社区:Join Slack

如果您需要帮助,我们有专门的 #questions 频道,但我们也很乐意在 #introductions 中了解您是谁!

Flower 联邦学习教程 - 第 2 部分 更深入地介绍了策略以及可以使用策略构建的所有高级功能。


Open in Colab