Open in Colab

使用联邦学习策略

欢迎来到联邦学习教程的下一部分。在本教程的前几部分,我们介绍了使用 PyTorch 和 Flower 进行联邦学习(`第 1 部分 <https://flower.ai/docs/framework/tutorial-get-started-with-flower-pytorch.html>`___)。

In this notebook, we’ll begin to customize the federated learning system we built in the introductory notebook again, using the Flower framework, Flower Datasets, and PyTorch.

Star Flower on GitHub ⭐️ and join the Flower community on Flower Discuss and the Flower Slack to connect, ask questions, and get help: - Join Flower Discuss We’d love to hear from you in the Introduction topic! If anything is unclear, post in Flower Help - Beginners. - Join Flower Slack We’d love to hear from you in the #introductions channel! If anything is unclear, head over to the #questions channel.

Let’s move beyond FedAvg with Flower strategies! 🌼

准备工作

在开始实际代码之前,让我们先确保我们已经准备好了所需的一切。

安装依赖项

首先,我们安装必要的软件包:

[ ]:
!pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision

现在我们已经安装了所有依赖项,可以导入本教程所需的所有内容:

[ ]:
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

import flwr
from flwr.client import Client, ClientApp, NumPyClient
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg, FedAdagrad
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset
from flwr.common import ndarrays_to_parameters, NDArrays, Scalar, Context

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")

可以切换到已启用 GPU 加速的运行时(在 Google Colab 上: 运行时 > 更改运行时类型 > 硬件加速: GPU > 保存``)。但请注意,Google Colab 并非总能提供 GPU 加速。如果在以下部分中看到与 GPU 可用性相关的错误,请考虑通过设置 DEVICE = torch.device("cpu") 切回基于 CPU 的执行。如果运行时已启用 GPU 加速,你应该会看到输出``Training on cuda``,否则会显示``Training on cpu``。

数据加载

Let’s now load the CIFAR-10 training and test set, partition them into ten smaller datasets (each split into training and validation set), and wrap everything in their own DataLoader. We introduce a new parameter num_partitions which allows us to call load_datasets with different numbers of partitions.

[ ]:
NUM_PARTITIONS = 10
BATCH_SIZE = 32


def load_datasets(partition_id: int, num_partitions: int):
    fds = FederatedDataset(dataset="cifar10", partitioners={"train": num_partitions})
    partition = fds.load_partition(partition_id)
    # Divide data on each node: 80% train, 20% test
    partition_train_test = partition.train_test_split(test_size=0.2, seed=42)
    pytorch_transforms = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )

    def apply_transforms(batch):
        # Instead of passing transforms to CIFAR10(..., transform=transform)
        # we will use this function to dataset.with_transform(apply_transforms)
        # The transforms object is exactly the same
        batch["img"] = [pytorch_transforms(img) for img in batch["img"]]
        return batch

    partition_train_test = partition_train_test.with_transform(apply_transforms)
    trainloader = DataLoader(
        partition_train_test["train"], batch_size=BATCH_SIZE, shuffle=True
    )
    valloader = DataLoader(partition_train_test["test"], batch_size=BATCH_SIZE)
    testset = fds.load_split("test").with_transform(apply_transforms)
    testloader = DataLoader(testset, batch_size=BATCH_SIZE)
    return trainloader, valloader, testloader

模型培训/评估

让我们继续使用常见的模型定义(包括 set_parametersget_parameters)、训练和测试函数:

[ ]:
class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def train(net, trainloader, epochs: int):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for batch in trainloader:
            images, labels = batch["img"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(net(images), labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss
            total += labels.size(0)
            correct += (torch.max(outputs.data, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_acc}")


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for batch in testloader:
            images, labels = batch["img"], batch["label"]
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader.dataset)
    accuracy = correct / total
    return loss, accuracy

Flower 客户端

To implement the Flower client, we (again) create a subclass of flwr.client.NumPyClient and implement the three methods get_parameters, fit, and evaluate. Here, we also pass the partition_id to the client and use it log additional details. We then create an instance of ClientApp and pass it the client_fn.

[ ]:
class FlowerClient(NumPyClient):
    def __init__(self, partition_id, net, trainloader, valloader):
        self.partition_id = partition_id
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.partition_id}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        print(f"[Client {self.partition_id}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.partition_id}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(context: Context) -> Client:
    net = Net().to(DEVICE)

    # Read the node_config to fetch data partition associated to this node
    partition_id = context.node_config["partition-id"]
    num_partitions = context.node_config["num-partitions"]

    trainloader, valloader, _ = load_datasets(partition_id, num_partitions)
    return FlowerClient(partition_id, net, trainloader, valloader).to_client()


# Create the ClientApp
client = ClientApp(client_fn=client_fn)

策略定制

到目前为止,如果您已经阅读过入门笔记本,那么一切都应该很熟悉了。接下来,我们将介绍一些新功能。

服务器端参数 初始化

Flower, by default, initializes the global model by asking one random client for the initial parameters. In many cases, we want more control over parameter initialization though. Flower therefore allows you to directly pass the initial parameters to the Strategy. We create an instance of Net() and get the paramaters as follows:

[ ]:
# Create an instance of the model and get the parameters
params = get_parameters(Net())

Next, we create a server_fn that returns the components needed for the server. Within server_fn, we create a Strategy that uses the initial parameters.

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(
            params
        ),  # Pass initial model parameters
    )

    # Configure the server for 3 rounds of training
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)

Passing initial_parameters to the FedAvg strategy prevents Flower from asking one of the clients for the initial parameters. In server_fn, we pass this new strategy and a ServerConfig for defining the number of federated learning rounds (num_rounds).

Similar to the ClientApp, we now create the ServerApp using the server_fn:

[ ]:
# Create ServerApp
server = ServerApp(server_fn=server_fn)

Last but not least, we specify the resources for each client and run the simulation.

[ ]:
# Specify the resources each of your clients need
# If set to none, by default, each client will be allocated 2x CPU and 0x GPUs
backend_config = {"client_resources": None}
if DEVICE.type == "cuda":
    backend_config = {"client_resources": {"num_gpus": 1}}

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

If we look closely, we can see that the logs do not show any calls to the FlowerClient.get_parameters method.

从定制战略开始

We’ve seen the function run_simulation before. It accepts a number of arguments, amongst them the server_app which wraps around the strategy and number of training rounds, client_app which wraps around the client_fn used to create FlowerClient instances, and the number of clients to simulate which equals num_supernodes.

该策略封装了联邦学习方法/算法,例如`FedAvg``或`FedAdagrad``。这次让我们尝试使用不同的策略:

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAdagrad strategy
    strategy = FedAdagrad(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
    )
    # Configure the server for 3 rounds of training
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

服务器端参数**评估**

Flower 可以在服务器端或客户端评估聚合模型。客户端和服务器端评估在某些方面相似,但也有不同之处。

**集中评估**(或*服务器端评估*)在概念上很简单:它的工作方式与集中式机器学习中的评估方式相同。如果有一个服务器端数据集可用于评估目的,那就太好了。我们可以在每一轮训练后对新聚合的模型进行评估,而无需将模型发送给客户端。我们也很幸运,因为我们的整个评估数据集随时可用。

联邦评估**(或*客户端评估*)更为复杂,但也更为强大:它不需要集中的数据集,允许我们在更大的数据集上对模型进行评估,这通常会产生更真实的评估结果。事实上,如果我们想得到有代表性的评估结果,很多情况下都需要使用**联邦评估。但是,这种能力是有代价的:一旦我们开始在客户端进行评估,我们就应该意识到,如果这些客户端并不总是可用,我们的评估数据集可能会在连续几轮学习中发生变化。此外,每个客户端所拥有的数据集也可能在连续几轮学习中发生变化。这可能会导致评估结果不稳定,因此即使我们不改变模型,也会看到评估结果在连续几轮中波动。

我们已经了解了联邦评估如何在客户端工作(即通过在 FlowerClient 中实现 evaluate 方法)。现在让我们看看如何在服务器端评估聚合模型参数:

[ ]:
# The `evaluate` function will be called by Flower after every round
def evaluate(
    server_round: int,
    parameters: NDArrays,
    config: Dict[str, Scalar],
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
    net = Net().to(DEVICE)
    _, _, testloader = load_datasets(0, NUM_PARTITIONS)
    set_parameters(net, parameters)  # Update model with the latest parameters
    loss, accuracy = test(net, testloader)
    print(f"Server-side evaluation loss {loss} / accuracy {accuracy}")
    return loss, {"accuracy": accuracy}

We create a FedAvg strategy and pass evaluate_fn to it. Then, we create a ServerApp that uses this strategy.

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create the FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
        evaluate_fn=evaluate,  # Pass the evaluation function
    )
    # Configure the server for 3 rounds of training
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

Finally, we run the simulation.

[ ]:
# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

向/从客户端发送/接收任意值

在某些情况下,我们希望从服务器端配置客户端的执行(训练、评估)。其中一个例子就是服务器要求客户端训练一定数量的本地遍历。Flower 提供了一种使用字典从服务器向客户端发送配置值的方法。让我们来看一个例子:客户端通过 fit 中的 config 参数从服务器接收配置值(evaluate 中也有 config 参数)。fit 方法通过 config 参数接收配置字典,然后从字典中读取值。在本例中,它读取了 server_roundlocal_epochs,并使用这些值来改进日志记录和配置本地训练遍历的数量:

[ ]:
class FlowerClient(NumPyClient):
    def __init__(self, pid, net, trainloader, valloader):
        self.pid = pid  # partition ID of a client
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        print(f"[Client {self.pid}] get_parameters")
        return get_parameters(self.net)

    def fit(self, parameters, config):
        # Read values from config
        server_round = config["server_round"]
        local_epochs = config["local_epochs"]

        # Use values provided by the config
        print(f"[Client {self.pid}, round {server_round}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=local_epochs)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        print(f"[Client {self.pid}] evaluate, config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


def client_fn(context: Context) -> Client:
    net = Net().to(DEVICE)
    partition_id = context.node_config["partition-id"]
    num_partitions = context.node_config["num-partitions"]
    trainloader, valloader, _ = load_datasets(partition_id, num_partitions)
    return FlowerClient(partition_id, net, trainloader, valloader).to_client()


# Create the ClientApp
client = ClientApp(client_fn=client_fn)

那么,如何将配置字典从服务器发送到客户端呢?内置的 "Flower策略"(Flower Strategies)提供了这样的方法,其工作原理与服务器端评估的工作原理类似。我们为策略提供一个函数,策略会在每一轮联邦学习中调用这个函数:

[ ]:
def fit_config(server_round: int):
    """Return training configuration dict for each round.

    Perform two rounds of training with one local epoch, increase to two local
    epochs afterwards.
    """
    config = {
        "server_round": server_round,  # The current round of federated learning
        "local_epochs": 1 if server_round < 2 else 2,
    }
    return config

Next, we’ll pass this function to the FedAvg strategy before starting the simulation:

[ ]:
def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.3,
        fraction_evaluate=0.3,
        min_fit_clients=3,
        min_evaluate_clients=3,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
        evaluate_fn=evaluate,
        on_fit_config_fn=fit_config,  # Pass the fit_config function
    )
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

我们可以看到,客户端日志现在包含了当前一轮的联邦学习(从 config 字典中读取)。我们还可以将本地训练配置为在第一轮和第二轮联邦学习期间运行一个遍历,然后在第三轮联邦学习期间运行两个遍历。

客户端还可以向服务器返回任意值。为此,它们会从 fit 和/或 evaluate 返回一个字典。我们在本笔记中看到并使用了这一概念,但并未明确提及:我们的 FlowerClient 返回一个包含自定义键/值对的字典,作为 evaluate 中的第三个返回值。

扩大联邦学习的规模

作为本笔记的最后一步,让我们看看如何使用 Flower 对大量客户端进行实验。

[ ]:
NUM_PARTITIONS = 1000

Note that we can reuse the ClientApp for different num-partitions since the Context is defined by the num_supernodes argument in run_simulation().

We now have 1000 partitions, each holding 45 training and 5 validation examples. Given that the number of training examples on each client is quite small, we should probably train the model a bit longer, so we configure the clients to perform 3 local training epochs. We should also adjust the fraction of clients selected for training during each round (we don’t want all 1000 clients participating in every round), so we adjust fraction_fit to 0.025, which means that only 2.5% of available clients (so 25 clients) will be selected for training each round:

[ ]:
def fit_config(server_round: int):
    config = {
        "server_round": server_round,
        "local_epochs": 3,
    }
    return config


def server_fn(context: Context) -> ServerAppComponents:
    # Create FedAvg strategy
    strategy = FedAvg(
        fraction_fit=0.025,  # Train on 25 clients (each round)
        fraction_evaluate=0.05,  # Evaluate on 50 clients (each round)
        min_fit_clients=20,
        min_evaluate_clients=40,
        min_available_clients=NUM_PARTITIONS,
        initial_parameters=ndarrays_to_parameters(params),
        on_fit_config_fn=fit_config,
    )
    config = ServerConfig(num_rounds=3)
    return ServerAppComponents(strategy=strategy, config=config)


# Create the ServerApp
server = ServerApp(server_fn=server_fn)

# Run simulation
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_PARTITIONS,
    backend_config=backend_config,
)

回顾

在本笔记中,我们看到了如何通过自定义策略、在服务器端初始化参数、选择不同的策略以及在服务器端评估模型来逐步增强我们的系统。用这么少的代码就能实现这么大的灵活性,不是吗?

在后面的章节中,我们将看到如何在服务器和客户端之间传递任意值,以完全自定义客户端执行。有了这种能力,我们使用 Flower 虚拟客户端引擎构建了一个大规模的联邦学习模拟,并在 Jupyter Notebook 中进行了一次实验,在相同的工作负载中运行了 1000 个客户端!

接下来的步骤

Before you continue, make sure to join the Flower community on Flower Discuss (Join Flower Discuss) and on Slack (Join Slack).

如果您需要帮助,我们有专门的 #questions 频道,但我们也很乐意在 #introductions 中了解您是谁!

Flower 联邦学习教程 - 第 3 部分 展示了如何从头开始构建完全自定义的 "策略"。


Open in Colab