예제: 파이토치 - 중앙 집중식에서 연합식으로#

이 튜토리얼에서는 Flower를 사용해 기존 머신 러닝 워크로드의 연합 버전을 구축하는 방법을 보여드립니다. 여기서는 PyTorch를 사용해 CIFAR-10 데이터 세트에서 컨볼루션 신경망을 훈련합니다. 먼저, ‘PyTorch로 딥 러닝 <https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html>`_ 튜토리얼을 기반으로 centralized 학습 접근 방식을 사용하여 이 머신 러닝 작업을 소개합니다. 그런 다음 centralized 훈련 코드를 기반으로 연합 방식 훈련을 실행합니다.

중앙 집중식 훈련#

중앙 집중식 CNN 트레이닝 코드에 대한 간략한 설명부터 시작하겠습니다. 무슨 일이 일어나고 있는지 더 자세히 설명하려면 공식 `PyTorch 튜토리얼 <https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html>`_을 참조하세요.

CIFAR-10에 대한 기존 (중앙 집중식) 교육에 필요한 모든 구성 요소가 포함된 cifar.py`라는 파일을 생성해 보겠습니다. 먼저, 필요한 모든 패키지(예: :code:`torchtorchvision)를 가져와야 합니다. 연합 학습을 위한 패키지를 가져오지 않는 것을 확인 할 수 있습니. 나중에 연합 학습 구성 요소를 추가할 때에도 이러한 모든 가져오기를 그대로 유지할 수 있습니다.

from typing import Tuple, Dict

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch import Tensor
from torchvision.datasets import CIFAR10

이미 언급했듯이 이 머신 러닝 워크로드에는 CIFAR-10 데이터 세트를 사용합니다. 모델 아키텍처(매우 간단한 컨볼루션 신경망)는 :code:`class Net()`에 정의되어 있습니다.

class Net(nn.Module):

    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: Tensor) -> Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

load_data() 함수는 CIFAR-10 훈련 및 테스트 세트를 로드합니다. :code:`transform`은 로드 후 데이터를 정규화합니다.

DATA_ROOT = "~/data/cifar-10"

def load_data() -> Tuple[torch.utils.data.DataLoader, torch.utils.data.DataLoader, Dict]:
    """Load CIFAR-10 (training and test set)."""
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )
    trainset = CIFAR10(DATA_ROOT, train=True, download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
    testset = CIFAR10(DATA_ROOT, train=False, download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=32, shuffle=False)
    num_examples = {"trainset" : len(trainset), "testset" : len(testset)}
    return trainloader, testloader, num_examples

이제 학습 집합을 반복하고, 손실을 측정하고, 이를 역전파한 다음 각 학습 예제 배치에 대해 하나의 최적화 단계를 수행하는 학습(함수 train())을 정의해야 합니다.

모델 평가는 test() 함수에 정의되어 있습니다. 이 함수는 모든 테스트 샘플을 반복하고 테스트 데이터 세트에 따라 모델의 손실을 측정합니다.

def train(
    net: Net,
    trainloader: torch.utils.data.DataLoader,
    epochs: int,
    device: torch.device,
) -> None:
    """Train the network."""
    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

    print(f"Training {epochs} epoch(s) w/ {len(trainloader)} batches each")

    # Train the network
    for epoch in range(epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            images, labels = data[0].to(device), data[1].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 100 == 99:  # print every 100 mini-batches
                print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1, running_loss / 2000))
                running_loss = 0.0


def test(
    net: Net,
    testloader: torch.utils.data.DataLoader,
    device: torch.device,
) -> Tuple[float, float]:
    """Validate the network on the entire test set."""
    criterion = nn.CrossEntropyLoss()
    correct = 0
    total = 0
    loss = 0.0
    with torch.no_grad():
        for data in testloader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = net(images)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = correct / total
    return loss, accuracy

데이터 로딩, 모델 아키텍처, 훈련 및 평가를 정의했으면 모든 것을 종합하여 CIFAR-10에서 CNN을 훈련할 수 있습니다.

def main():
    DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("Centralized PyTorch training")
    print("Load data")
    trainloader, testloader, _ = load_data()
    print("Start training")
    net=Net().to(DEVICE)
    train(net=net, trainloader=trainloader, epochs=2, device=DEVICE)
    print("Evaluate model")
    loss, accuracy = test(net=net, testloader=testloader, device=DEVICE)
    print("Loss: ", loss)
    print("Accuracy: ", accuracy)


if __name__ == "__main__":
    main()

이제 머신 러닝 워크로드를 실행할 수 있습니다:

python3 cifar.py

지금까지는 파이토치를 사용해 본 적이 있다면 상당히 익숙하게 보일 것입니다. 다음 단계로 넘어가서 구축한 것을 사용하여 하나의 서버와 두 개의 클라이언트로 구성된 간단한 연합 학습 시스템을 만들어 보겠습니다.

연합 훈련#

이전 섹션에서 설명한 간단한 머신 러닝 프로젝트는 단일 데이터 세트(CIFAR-10)로 모델을 학습시키는데, 이를 중앙 집중식 학습이라고 부릅니다. 이전 섹션에서 설명한 중앙 집중식 학습의 개념은 대부분 알고 계실 것이며, 많은 분들이 이전에 사용해 보셨을 것입니다. 일반적으로 머신 러닝 워크로드를 연합 방식으로 실행하려면 대부분의 코드를 변경하고 모든 것을 처음부터 다시 설정해야 합니다. 이는 상당한 노력이 필요할 수 있습니다.

하지만 Flower를 사용하면 대대적인 재작성 없이도 기존 코드를 연합 학습 설정으로 발전시킬 수 있습니다.

개념은 이해하기 쉽습니다. *서버*를 시작한 다음 *서버*에 연결된 *클라이언트*에 대해 :code:`cifar.py`의 코드를 사용해야 합니다. *서버*는 모델 파라미터를 클라이언트로 전송합니다. *클라이언트*는 학습을 실행하고 파라미터를 업데이트합니다. 업데이트된 파라미터는 *서버*로 다시 전송되며, *서버*는 수신된 모든 파라미터 업데이트의 평균을 구합니다. 이것은 연합 학습 프로세스의 한 라운드를 설명하며 여러 라운드에 걸쳐 이 과정을 반복합니다.

이 예제는 하나의 *서버*와 두 개의 *클라이언트*로 구성됩니다. 먼저 server.py`를 설정해 보겠습니다. *server*는 Flower 패키지 :code:`flwr`를 가져와야 합니다. 다음으로, :code:`start_server 함수를 사용하여 서버를 시작하고 세 차례의 연합 학습을 수행하도록 지시합니다.

import flwr as fl

if __name__ == "__main__":
    fl.server.start_server(server_address="0.0.0.0:8080", config=fl.server.ServerConfig(num_rounds=3))

이미 *서버*를 시작할 수 있습니다:

python3 server.py

마지막으로, :code:`client.py`에서 client 로직을 정의하고 :code:`cifar.py`에서 이전에 정의한 중앙 집중식 학습을 기반으로 구축합니다. *클라이언트*는 :code:`flwr`을 가져와야 하며, PyTorch 모델의 파라미터를 업데이트하기 위해 :code:`torch`도 가져와야 합니다:

from collections import OrderedDict
from typing import Dict, List, Tuple

import numpy as np
import torch

import cifar
import flwr as fl

DEVICE: str = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Flower *클라이언트*를 구현한다는 것은 기본적으로 flwr.client.Client 또는 :code:`flwr.client.NumPyClient`의 서브클래스를 구현하는 것을 의미합니다. 우리의 구현은 :code:`flwr.client.NumPyClient`를 기반으로 하며, 이를 :code:`CifarClient`라고 부를 것입니다. :code:`NumPyClient`는 파이토치나 텐서플로우/Keras처럼 NumPy 상호운용성이 좋은 프레임워크를 사용하는 경우 필요한 일부 보일러플레이트를 피하기 때문에 :code:`Client`보다 구현하기가 조금 더 쉽습니다. code:`CifarClient`는 모델 파라미터를 가져오거나 설정하는 메서드 2개, 모델 학습을 위한 메서드 1개, 모델 테스트를 위한 메서드 1개 등 네 가지 메서드를 구현해야 합니다:

  1. set_parameters
    • 서버에서 수신한 로컬 모델의 모델 파라미터를 설정합니다

    • (신경망 레이어 목록으로 생각하면 됩니다) NumPy :code:`ndarray`로 받은 모델 파라미터 목록에 대해 반복합니다

  2. get_parameters
    • 모델 매개변수를 가져와서 NumPy :code:`ndarray`의 목록으로 반환합니다(이는 :code:`flwr.client.NumPyClient`가 기대하는 바와 같습니다)

  3. fit
    • 서버에서 받은 파라미터로 로컬 모델의 파라미터를 업데이트합니다

    • 로컬 훈련 세트에서 모델을 훈련합니다

    • 업데이트된 로컬 모델 가중치를 가져와 서버로 반환합니다

  4. evaluate
    • 서버에서 받은 파라미터로 로컬 모델의 파라미터를 업데이트합니다

    • 로컬 테스트 세트에서 업데이트된 모델을 평가합니다

    • 로컬 손실 및 정확도를 서버에 반환합니다

두 개의 NumPyClient 메서드인 fit`과 :code:`evaluate`는 이전에 :code:`cifar.py`에 정의된 함수인 :code:`train()`과 :code:`test()`를 활용합니다. 따라서 여기서 실제로 하는 일은 :code:`NumPyClient 서브클래스를 통해 이미 정의된 함수 중 훈련과 평가를 위해 호출할 함수를 Flower에 알려주는 것입니다. 전달되는 데이터 유형을 더 잘 이해할 수 있도록 type annotations을 포함했습니다.

class CifarClient(fl.client.NumPyClient):
    """Flower client implementing CIFAR-10 image classification using
    PyTorch."""

    def __init__(
        self,
        model: cifar.Net,
        trainloader: torch.utils.data.DataLoader,
        testloader: torch.utils.data.DataLoader,
        num_examples: Dict,
    ) -> None:
        self.model = model
        self.trainloader = trainloader
        self.testloader = testloader
        self.num_examples = num_examples

    def get_parameters(self, config) -> List[np.ndarray]:
        # Return model parameters as a list of NumPy ndarrays
        return [val.cpu().numpy() for _, val in self.model.state_dict().items()]

    def set_parameters(self, parameters: List[np.ndarray]) -> None:
        # Set model parameters from a list of NumPy ndarrays
        params_dict = zip(self.model.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
        self.model.load_state_dict(state_dict, strict=True)

    def fit(
        self, parameters: List[np.ndarray], config: Dict[str, str]
    ) -> Tuple[List[np.ndarray], int, Dict]:
        # Set model parameters, train model, return updated model parameters
        self.set_parameters(parameters)
        cifar.train(self.model, self.trainloader, epochs=1, device=DEVICE)
        return self.get_parameters(config={}), self.num_examples["trainset"], {}

    def evaluate(
        self, parameters: List[np.ndarray], config: Dict[str, str]
    ) -> Tuple[float, int, Dict]:
        # Set model parameters, evaluate model on local test dataset, return result
        self.set_parameters(parameters)
        loss, accuracy = cifar.test(self.model, self.testloader, device=DEVICE)
        return float(loss), self.num_examples["testset"], {"accuracy": float(accuracy)}

이제 모델과 데이터를 모두 로드하는 함수를 정의하고, CifarClient`를 생성하고, 클라이언트를 시작하는 작업만 남았습니다. 코드:`cifar.py`를 사용하여 데이터와 모델을 로드합니다. :code:`server.py`에서 사용한 것과 동일한 IP 주소를 지정하여 :code:`fl.client.start_client() 함수로 :code:`CifarClient`를 시작합니다:

def main() -> None:
    """Load data, start CifarClient."""

    # Load model and data
    model = cifar.Net()
    model.to(DEVICE)
    trainloader, testloader, num_examples = cifar.load_data()

    # Start client
    client = CifarClient(model, trainloader, testloader, num_examples)
    fl.client.start_client(server_address="0.0.0.0:8080", client.to_client())


if __name__ == "__main__":
    main()

여기까지입니다. 이제 두 개의 터미널 창을 추가로 열고 다음을 실행할 수 있습니다

python3 client.py

를 입력하고(그 전에 서버가 실행 중인지 확인하세요) (이전에는 중앙 집중식) PyTorch 프로젝트가 두 클라이언트에서 연합 학습을 실행하는 것을 확인합니다. 축하합니다!

다음 단계#

이 예제의 전체 소스 코드: 파이토치: 중앙 Centralized에서 Federated으로 (코드). 물론 이 예제는 두 클라이언트가 완전히 동일한 데이터 세트를 로드하기 때문에 다소 지나치게 단순화되어 있으며, 이는 현실적이지 않습니다. 이제 이 주제를 더 자세히 살펴볼 준비가 되셨습니다. 각 클라이언트에서 서로 다른 CIFAR-10의 하위 집합을 사용해 보는 것은 어떨까요? 클라이언트를 더 추가하는 것은 어떨까요?