快速入门 MXNet#

警告

MXNet is no longer maintained and has been moved into Attic. As a result, we would encourage you to use other ML frameworks alongise Flower, for example, PyTorch. This tutorial might be removed in future versions of Flower.

在本教程中,我们将学习如何使用 Flower 和 MXNet 在 MNIST 上训练 Sequential 模型。

建议创建一个虚拟环境,并在此 virtualenv 中运行所有内容。

我们的例子包括一个*服务器*和两个*客户端*,它们都有相同的模型。

客户端*负责根据其本地数据集为模型生成单独的模型参数更新。然后,这些参数更新将被发送到*服务器,由*服务器*汇总后生成一个更新的全局模型。最后,服务器*将这一改进版模型发回给每个*客户端。一个完整的参数更新周期称为一*轮*。

现在,我们已经有了一个大致的概念,让我们开始吧。首先,我们需要安装 Flower。运行:

$ pip install flwr

既然我们要使用 MXNet,那就继续安装吧:

$ pip install mxnet

Flower 客户端#

现在,我们已经安装了所有依赖项,让我们用两个客户端和一个服务器来运行一个简单的分布式训练。我们的训练程序和网络架构基于 MXNet 的 手写数字识别教程"。

在名为 client.py 的文件中,导入 Flower 和 MXNet 相关软件包:

import flwr as fl

import numpy as np

import mxnet as mx
from mxnet import nd
from mxnet import gluon
from mxnet.gluon import nn
from mxnet import autograd as ag
import mxnet.ndarray as F

此外,还可以在 MXNet 中定义设备分配:

DEVICE = [mx.gpu() if mx.test_utils.list_gpus() else mx.cpu()]

我们使用 MXNet 加载 MNIST,这是一个用于机器学习的流行手写数字图像分类数据集。MXNet 工具 mx.test_utils.get_mnist() 会下载训练和测试数据。

def load_data():
    print("Download Dataset")
    mnist = mx.test_utils.get_mnist()
    batch_size = 100
    train_data = mx.io.NDArrayIter(
        mnist["train_data"], mnist["train_label"], batch_size, shuffle=True
    )
    val_data = mx.io.NDArrayIter(mnist["test_data"], mnist["test_label"], batch_size)
    return train_data, val_data

用 MXNet 定义训练和损失值。我们在数据集上循环训练模型,测量相应的损失值,并对其进行优化。

def train(net, train_data, epoch):
    trainer = gluon.Trainer(net.collect_params(), "sgd", {"learning_rate": 0.03})
    trainer = gluon.Trainer(net.collect_params(), "sgd", {"learning_rate": 0.01})
    accuracy_metric = mx.metric.Accuracy()
    loss_metric = mx.metric.CrossEntropy()
    metrics = mx.metric.CompositeEvalMetric()
    for child_metric in [accuracy_metric, loss_metric]:
        metrics.add(child_metric)
    softmax_cross_entropy_loss = gluon.loss.SoftmaxCrossEntropyLoss()
    for i in range(epoch):
        train_data.reset()
        num_examples = 0
        for batch in train_data:
            data = gluon.utils.split_and_load(
                batch.data[0], ctx_list=DEVICE, batch_axis=0
            )
            label = gluon.utils.split_and_load(
                batch.label[0], ctx_list=DEVICE, batch_axis=0
            )
            outputs = []
            with ag.record():
                for x, y in zip(data, label):
                    z = net(x)
                    loss = softmax_cross_entropy_loss(z, y)
                    loss.backward()
                    outputs.append(z.softmax())
                    num_examples += len(x)
            metrics.update(label, outputs)
            trainer.step(batch.data[0].shape[0])
        trainings_metric = metrics.get_name_value()
        print("Accuracy & loss at epoch %d: %s" % (i, trainings_metric))
    return trainings_metric, num_examples

接下来,我们定义机器学习模型的验证。我们在测试集上循环,测量测试集上的损失值和准确率。

def test(net, val_data):
    accuracy_metric = mx.metric.Accuracy()
    loss_metric = mx.metric.CrossEntropy()
    metrics = mx.metric.CompositeEvalMetric()
    for child_metric in [accuracy_metric, loss_metric]:
        metrics.add(child_metric)
    val_data.reset()
    num_examples = 0
    for batch in val_data:
        data = gluon.utils.split_and_load(batch.data[0], ctx_list=DEVICE, batch_axis=0)
        label = gluon.utils.split_and_load(
            batch.label[0], ctx_list=DEVICE, batch_axis=0
        )
        outputs = []
        for x in data:
            outputs.append(net(x).softmax())
            num_examples += len(x)
        metrics.update(label, outputs)
    return metrics.get_name_value(), num_examples

在定义了 MXNet 机器学习模型的训练和测试后,我们使用这些函数实现了 Flower 客户端。

我们的 Flower 客户端将使用简单的 Sequential 模型:

def main():
    def model():
        net = nn.Sequential()
        net.add(nn.Dense(256, activation="relu"))
        net.add(nn.Dense(64, activation="relu"))
        net.add(nn.Dense(10))
        net.collect_params().initialize()
        return net

    train_data, val_data = load_data()

    model = model()
    init = nd.random.uniform(shape=(2, 784))
    model(init)

使用 load_data() 加载数据集后,我们会执行一次前向传播,使用 model(init) 初始化模型和模型参数。接下来,我们实现一个 Flower 客户端。

Flower 服务器通过一个名为 Client 的接口与客户端交互。当服务器选择一个特定的客户端进行训练时,它会通过网络发送训练指令。客户端接收到这些指令后,会调用 Client 方法之一来运行您的代码(即训练我们之前定义的神经网络)。

Flower 提供了一个名为 NumPyClient 的便捷类,当您的工作负载使用 MXNet 时,它可以让您更轻松地实现 Client 接口。实现 NumPyClient 通常意味着定义以下方法(set_parameters 是可选的):

  1. get_parameters
    • 以 NumPy ndarrays 列表形式返回模型参数

  2. set_parameters (可选)
    • 用从服务器接收到的参数更新本地模型参数

  3. fit
    • 设置本地模型参数

    • 训练本地模型

    • 接收更新的本地模型参数

  4. evaluate
    • 测试本地模型

它们可以通过以下方式实现:

class MNISTClient(fl.client.NumPyClient):
    def get_parameters(self, config):
        param = []
        for val in model.collect_params(".*weight").values():
            p = val.data()
            param.append(p.asnumpy())
        return param

    def set_parameters(self, parameters):
        params = zip(model.collect_params(".*weight").keys(), parameters)
        for key, value in params:
            model.collect_params().setattr(key, value)

    def fit(self, parameters, config):
        self.set_parameters(parameters)
        [accuracy, loss], num_examples = train(model, train_data, epoch=2)
        results = {"accuracy": float(accuracy[1]), "loss": float(loss[1])}
        return self.get_parameters(config={}), num_examples, results

    def evaluate(self, parameters, config):
        self.set_parameters(parameters)
        [accuracy, loss], num_examples = test(model, val_data)
        print("Evaluation accuracy & loss", accuracy, loss)
        return float(loss[1]), val_data.batch_size, {"accuracy": float(accuracy[1])}

现在我们可以创建一个 MNISTClient 类的实例,并添加一行来实际运行该客户端:

fl.client.start_numpy_client(server_address="0.0.0.0:8080", client=MNISTClient())

这就是客户端。我们只需实现 ClientNumPyClient 并调用 fl.client.start_client()fl.client.start_numpy_client()。字符串 "0.0.0.0:8080"`会告诉客户端要连接的服务器。在本例中,我们可以在同一台机器上运行服务器和客户端,因此我们使用 :code:"0.0.0.0:8080"。如果我们运行的是真正的联邦工作负载,服务器和客户端运行在不同的机器上,那么需要改变的只是传递给客户端的 :code:`server_address

Flower 服务器#

对于简单的工作负载,我们可以启动 Flower 服务器,并将所有配置选项保留为默认值。在名为 server.py 的文件中,导入 Flower 并启动服务器:

import flwr as fl

fl.server.start_server(config=fl.server.ServerConfig(num_rounds=3))

联邦训练模型!#

客户端和服务器都准备就绪后,我们现在就可以运行一切,看看联邦学习的运行情况。联邦学习系统通常有一个服务器和多个客户端。因此,我们必须先启动服务器:

$ python server.py

服务器运行后,我们就可以在不同终端启动客户端了。打开一个新终端,启动第一个客户端:

$ python client.py

打开另一台终端,启动第二个客户端:

$ python client.py

每个客户端都有自己的数据集。现在你应该看到第一个终端(启动服务器的终端)的训练效果了:

INFO flower 2021-03-11 11:59:04,512 | app.py:76 | Flower server running (insecure, 3 rounds)
INFO flower 2021-03-11 11:59:04,512 | server.py:72 | Getting initial parameters
INFO flower 2021-03-11 11:59:09,089 | server.py:74 | Evaluating initial parameters
INFO flower 2021-03-11 11:59:09,089 | server.py:87 | [TIME] FL starting
DEBUG flower 2021-03-11 11:59:11,997 | server.py:165 | fit_round: strategy sampled 2 clients (out of 2)
DEBUG flower 2021-03-11 11:59:14,652 | server.py:177 | fit_round received 2 results and 0 failures
DEBUG flower 2021-03-11 11:59:14,656 | server.py:139 | evaluate: strategy sampled 2 clients
DEBUG flower 2021-03-11 11:59:14,811 | server.py:149 | evaluate received 2 results and 0 failures
DEBUG flower 2021-03-11 11:59:14,812 | server.py:165 | fit_round: strategy sampled 2 clients (out of 2)
DEBUG flower 2021-03-11 11:59:18,499 | server.py:177 | fit_round received 2 results and 0 failures
DEBUG flower 2021-03-11 11:59:18,503 | server.py:139 | evaluate: strategy sampled 2 clients
DEBUG flower 2021-03-11 11:59:18,784 | server.py:149 | evaluate received 2 results and 0 failures
DEBUG flower 2021-03-11 11:59:18,786 | server.py:165 | fit_round: strategy sampled 2 clients (out of 2)
DEBUG flower 2021-03-11 11:59:22,551 | server.py:177 | fit_round received 2 results and 0 failures
DEBUG flower 2021-03-11 11:59:22,555 | server.py:139 | evaluate: strategy sampled 2 clients
DEBUG flower 2021-03-11 11:59:22,789 | server.py:149 | evaluate received 2 results and 0 failures
INFO flower 2021-03-11 11:59:22,789 | server.py:122 | [TIME] FL finished in 13.700094900001204
INFO flower 2021-03-11 11:59:22,790 | app.py:109 | app_fit: losses_distributed [(1, 1.5717803835868835), (2, 0.6093432009220123), (3, 0.4424773305654526)]
INFO flower 2021-03-11 11:59:22,790 | app.py:110 | app_fit: accuracies_distributed []
INFO flower 2021-03-11 11:59:22,791 | app.py:111 | app_fit: losses_centralized []
INFO flower 2021-03-11 11:59:22,791 | app.py:112 | app_fit: accuracies_centralized []
DEBUG flower 2021-03-11 11:59:22,793 | server.py:139 | evaluate: strategy sampled 2 clients
DEBUG flower 2021-03-11 11:59:23,111 | server.py:149 | evaluate received 2 results and 0 failures
INFO flower 2021-03-11 11:59:23,112 | app.py:121 | app_evaluate: federated loss: 0.4424773305654526
INFO flower 2021-03-11 11:59:23,112 | app.py:125 | app_evaluate: results [('ipv4:127.0.0.1:44344', EvaluateRes(loss=0.443320095539093, num_examples=100, accuracy=0.0, metrics={'accuracy': 0.8752475247524752})), ('ipv4:127.0.0.1:44346', EvaluateRes(loss=0.44163456559181213, num_examples=100, accuracy=0.0, metrics={'accuracy': 0.8761386138613861}))]
INFO flower 2021-03-11 11:59:23,112 | app.py:127 | app_evaluate: failures []

恭喜您!您已经成功构建并运行了第一个联邦学习系统。本示例的`完整源代码 <https://github.com/adap/flower/blob/main/examples/quickstart-mxnet/client.py>`_ 可在 examples/quickstart-mxnet 中找到。