Guide de migration OpenFL¶
C’est recently announced qui a déclaré que le projet Open Federated Learning (anciennement connu sous le nom d’OpenFL) n’est plus développé ou maintenu. Ce guide, rédigé en collaboration avec les développeurs OpenFL, vise à créer un chemin facile pour les utilisateurs OpenFL pour apporter leurs workloads dans Flower.
Créer une application Flower pour le code OpenFL¶
Commencez par créer une application Flower où le code OpenFL peut être migré vers.
Installez les dépendances¶
Tout d’abord, nous installons le package Flower flwr :
# In a new Python environment
$ pip install -U "flwr[simulation]"
Ensuite, exécutez la commande suivante :
$ flwr new @flwrlabs/quickstart-pytorch
Après avoir exécuté la commande, un nouveau répertoire appelé quickstart-pytorch sera créé. Voici une comparaison entre lui et les fichiers pertinents dans un dossier typique openfl-example :
openfl-example
├── requirements.txt
├── .workspace
├── plan
│ ├── plan.yaml
│ ├── cols.yaml
│ ├── defaults
│ └── data.yaml
├── logs
├── cert
├── save
├── data
└── src
├── __init__.py
├── taskrunner.py
├── utils.py
└── dataloader.py
quickstart-pytorch
├── pytorchexample
│ ├── __init__.py
│ ├── client_app.py
│ ├── server_app.py
│ └── task.py
├── pyproject.toml
└── README.md
Commencez par un aperçu des zones d’ouverture et de répertoire que vous souhaitez mettre en avant. Nous passerons en revue ces sections plus en détail dans les parties suivantes du guide :
Modèle : Dans OpenFL, le modèle est généralement défini dans
taskrunner.py. Dans Flower, la définition du modèle se trouve généralement danstask.py.Fonctions d’entraînement et d’évaluation : Dans OpenFL, il s’agit de la sous-classe TaskRunner dans
taskrunner.py. Pour Flower, vous trouverez ces fonctions dansclient_app.pyet identifiées sous les décorateurs@app.train()et@app.evaluate.Fonctions d’agrégation : Dans OpenFL, la plupart des exemples utilisent par défaut l’algorithme d’agrégation
WeightedAverage(). Si vous utilisez un autre algorithme d’agrégation, vous le trouverez dansplan.yamlen recherchant aggregation_type. Dans Flower, l’algorithme d’agrégation est défini comme unStrategy.
Migrer votre modèle¶
Le modèle est très simple à porter de OpenFL vers Flower. Si vous travaillez avec un modèle PyTorch, OpenFL a une PyTorchTaskRunner qui hérite de nn.Module (dans taskrunner.py) - et inclut d’autres choses comme les fonctions train et validate. Flower suppose que vous apportez un modèle standard PyTorch, donc c’est aussi simple que de déplacer la définition du modèle vers task.py dans le répertoire quickstart-pytorch, et de changer l’héritage de Net pour revenir à nn.Module. Pour un exemple concret, voir le code snippet suivant OpenFL TaskRunner :
# OpenFL PyTorch TaskRunner
class PyTorchCNN(PyTorchTaskRunner):
"""
Simple CNN for classification.
PyTorchTaskRunner inherits from nn.Module, so you can define your model
in the same way that you would for PyTorch
"""
def __init__(self, device="cpu", **kwargs):
"""Initialize.
Args:
device: The hardware device to use for training (Default = "cpu")
**kwargs: Additional arguments to pass to the function
"""
super().__init__(device=device, **kwargs)
# Define the model
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.to(device)
# `self.optimizer` must be set for optimizer weights to be federated
self.optimizer = optim.Adam(self.parameters(), lr=1e-4)
# Set the loss function
self.loss_fn = F.cross_entropy
def forward(self, x):
"""
Forward pass of the model.
Args:
x: Data input to the model for the forward pass
"""
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
def train_(
self, train_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]]
) -> Metric:
"""TaskRunner train function"""
...
def validate_(
self, valid_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]]
) -> Metric:
"""TaskRunner validation function"""
...
Et le modèle PyTorch correspondant utilisé par Flower :
# Standard PyTorch model definition in Flower (Found in task.py)
class Net(nn.Module):
"""Model (simple CNN adapted from 'PyTorch: A 60 Minute Blitz')"""
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
Migrer vos fonctions d’entraînement et de test¶
Les versions récentes de OpenFL avaient une façon simple de définir les fonctions d’entraînement et d’évaluation. La mise en place et l’extraction des poids du modèle étaient cachées aux utilisateurs, et une liste de valeurs Metric résultant de l’entraînement ou de la validation pouvait être explicitement renvoyée à partir de la fonction. Pour faciliter la migration, voyez les blocs mis en évidence qui peuvent être directement portés dans le fichier Flower client_app.py :
from openfl.federated import PyTorchTaskRunner
from openfl.utilities import Metric
class PyTorchCNN(PyTorchTaskRunner):
"""
Simple CNN for classification.
"""
def __init__(self, device="cpu", **kwargs):
# Model definition
...
def forward(self, x):
# Forward function definition
...
def train_(
self, train_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]]
) -> Metric:
"""
Train single epoch.
Override this function in order to use custom training.
Args:
train_dataloader: Train dataset batch generator. Yields (samples, targets) tuples of
size = `self.data_loader.batch_size`.
Returns:
Metric: An object containing name and np.ndarray value.
"""
losses = []
for data, target in train_dataloader:
data, target = data.to(self.device), target.to(self.device)
self.optimizer.zero_grad()
output = self(data)
loss = self.loss_fn(output, target)
loss.backward()
self.optimizer.step()
losses.append(loss.detach().cpu().numpy())
loss = np.mean(losses)
return Metric(name=self.loss_fn.__name__, value=np.array(loss))
def validate_(
self, validation_dataloader: Iterator[Tuple[np.ndarray, np.ndarray]]
) -> Metric:
"""
Perform validation on PyTorch Model
Override this function for your own custom validation function
Args:
validation_dataloader: Validation dataset batch generator.
Yields (samples, targets) tuples
Returns:
Metric: An object containing name and np.ndarray value
"""
total_samples = 0
val_score = 0
with torch.no_grad():
for data, target in validation_dataloader:
samples = target.shape[0]
total_samples += samples
data, target = data.to(self.device), target.to(
self.device, dtype=torch.int64
)
output = self(data)
# get the index of the max log-probability
pred = output.argmax(dim=1)
val_score += pred.eq(target).sum().cpu().numpy()
accuracy = val_score / total_samples
return Metric(name="accuracy", value=np.array(accuracy))
Dans Flower, on donne plus de contrôle aux utilisateurs par défaut. Avec l’introduction de l’API Message, les fonctions d’entraînement et de validation sont supposées être sans état, donc il y a une initialisation qui doit être gérée par le code utilisateur. La bonne nouvelle est que ce setup est standard et très réutilisable à travers les exemples. Voyons comment la fonction train_ ouverte d’OpenFL s’intègre dans Flower :
# client_app.py
...
@app.train()
def train(msg: Message, context: Context):
"""Train the model on local data."""
# Load the model and initialize it with the received weights
model = Net()
model.load_state_dict(msg.content["arrays"].to_torch_state_dict())
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
# Load the data
partition_id = context.node_config["partition-id"]
num_partitions = context.node_config["num-partitions"]
batch_size = context.run_config["batch-size"]
trainloader, _ = load_data(partition_id, num_partitions, batch_size)
# Adapt the OpenFL training function here
##############################################
criterion = torch.nn.CrossEntropyLoss().to(device)
lr = msg.content["config"]["lr"]
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
losses = []
for data, target in trainloader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
losses.append(loss.detach().cpu().numpy())
train_loss = np.mean(losses)
#############################################
# Construct and return reply Message
model_record = ArrayRecord(model.state_dict())
metrics = {
"train_loss": train_loss,
"num-examples": len(trainloader.dataset),
}
metric_record = MetricRecord(metrics)
content = RecordDict({"arrays": model_record, "metrics": metric_record})
return Message(content=content, reply_to=msg)
Notez que le modèle est reinitialisé, le dataloader est initialisé et configuré, et les hyperparamètres sont chacun définis avant que l’opération de formation principale ne commence. À la conclusion de la formation, les poids du modèle sont extraits et emballés dans un ArrayRecord, et les métriques du modèle sont capturées dans un MetricRecord. Il est nécessaire d’envoyer également le num-examples comme métrique, car c’est nécessaire pour capturer le rapport des poids à donner aux paramètres du modèle pour FedAvg.
Voici la fonction de test correspondante, avec l’aire mise en évidence représentant le code migré d’OpenFL :
@app.evaluate()
def evaluate(msg: Message, context: Context):
"""Evaluate the model on local data."""
# Load the model and initialize it with the received weights
model = Net()
model.load_state_dict(msg.content["arrays"].to_torch_state_dict())
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
# Load the data
partition_id = context.node_config["partition-id"]
num_partitions = context.node_config["num-partitions"]
batch_size = context.run_config["batch-size"]
_, valloader = load_data(partition_id, num_partitions, batch_size)
# Adapt the OpenFL evaluation function here
########################################################
total_samples = 0
val_score = 0
with torch.no_grad():
for data, target in valloader:
samples = target.shape[0]
total_samples += samples
data, target = data.to(device), target.to(self.device, dtype=torch.int64)
output = model(data)
# get the index of the max log-probability
pred = output.argmax(dim=1)
val_score += pred.eq(target).sum().cpu().numpy()
eval_acc = val_score / total_samples
########################################################
# Construct and return reply Message
metrics = {
"eval_acc": eval_acc,
"num-examples": len(valloader.dataset),
}
metric_record = MetricRecord(metrics)
content = RecordDict({"metrics": metric_record})
return Message(content=content, reply_to=msg)
Le code peut être collé presque sans modification ! Il y a quelques références à nettoyer (c’est-à-dire changer self pour model) pour s’adapter aux variables de Flower, mais la logique reste la même.
Migrer les Chargeurs de Données¶
Contrairement à OpenFL, Flower ne nécessite pas que vous utilisiez leurs propres Dataloaders lors du développement de votre application. Cela signifie que vous pouvez simplement utiliser les DataLoaders comme vous le feriez pour PyTorch, Tensorflow ou tout autre framework. Pour des fins de recherche et d’expérimentation, un seul jeu de données peut être fragmenté en plusieurs partitions. Cette information est transmise à chaque partie via la commande Context.
# In client_app.py
@app.train()
def train(msg: Message, context: Context):
...
# Load the data
partition_id = context.node_config["partition-id"]
num_partitions = context.node_config["num-partitions"]
trainloader, _ = load_data(partition_id, num_partitions)
Flower dispose également de sa propre bibliothèque pour partitionner des ensembles de données uniques dans des distributions représentatives de ce qui peut être attendu dans des environnements réels. Pour plus d’informations, voir la documentation flwr-datasets pour les détails.
Code Client¶
Dans OpenFL, le code côté client était connu sous le nom de Collaborator. Dans Flower, l’application que les propriétaires des données opèrent est désignée comme un ClientApp. Chaque fichier mentionné jusqu’à présent (client_app.py, task.py) sont lancés par les clients à l’aide de la commande flwr run. Au-delà du code défini, Flower a la capacité d’insérer des changements dynamiques via un fichier de configuration appelé pyproject.toml. Cela peut inclure des modifications spécifiques à l’application comme les hyperparamètres, mais aussi d’autres informations comme l’adresse ServerApp, etc. Il est important de noter que ce fichier est partagé entre les parties opérant le ClientApp et le ServerApp. Ce concept correspond directement au concept du Plan d’apprentissage fédéré (FLPlan) dans OpenFL capturé dans le fichier plan.yaml de chaque espace de travail.
# Flower pyproject.toml
...
[tool.flwr.app.config]
num-server-rounds = 3
fraction-evaluate = 0.5
local-epochs = 1
learning-rate = 0.1
batch-size = 32
...
Code Serveur¶
Dans OpenFL, toutes les configurations côté agrégateur sont effectuées via le fichier plan.yaml à travers la spécification de différents arguments. Dans Flower, les tâches exactes effectuées par le serveur sont plus configurables à travers du code. Par exemple, les algorithmes d’agrégation sont ajoutés à travers un Strategy, et la logique pour sauvegarder des modèles est ajoutée explicitement. Voici un ServerApp (semblable à un OpenFL Agrégateur) compatible avec les précédents morceaux de code :
import torch
from flwr.app import ArrayRecord, ConfigRecord, Context, MetricRecord
from flwr.serverapp import Grid, ServerApp
from flwr.serverapp.strategy import FedAvg
from pytorchexample.task import Net, load_centralized_dataset, test
# Create ServerApp
app = ServerApp()
@app.main()
def main(grid: Grid, context: Context) -> None:
"""Main entry point for the ServerApp."""
# Read run config
fraction_evaluate: float = context.run_config["fraction-evaluate"]
num_rounds: int = context.run_config["num-server-rounds"]
lr: float = context.run_config["learning-rate"]
# Load global model
global_model = Net()
arrays = ArrayRecord(global_model.state_dict())
# Initialize FedAvg strategy
strategy = FedAvg(fraction_evaluate=fraction_evaluate)
# Start strategy, run FedAvg for `num_rounds`
result = strategy.start(
grid=grid,
initial_arrays=arrays,
train_config=ConfigRecord({"lr": lr}),
num_rounds=num_rounds,
evaluate_fn=global_evaluate,
)
# Save final model to disk
print("\nSaving final model to disk...")
state_dict = result.arrays.to_torch_state_dict()
torch.save(state_dict, "final_model.pt")
Vous remarquerez que la plupart des exemples ServerApp ont une logique spécifique pour travailler avec un cadre de deep learning donné (dans ce cas PyTorch) en raison du sauvegardage d’un modèle final. Cette fonctionnalité est facultative, mais reflète l’enregistrement automatique d’un modèle à la fin d’une expérience OpenFL. Cette ServerApp modification nécessite seulement quelques lignes de modifications, et Flower a un support pour une gamme étendue de cadres de deep learning dans sa examples (Tensorflow, FastAI, Huggingface, etc.) si vous avez besoin de code de référence.
Aide supplémentaire¶
Pour un exemple PyTorch complet qui entre dans les détails sur diverses composantes Flower, voir le tutoriel Get started with Flower. Même si nous attendons que ce guide aidera la plupart des utilisateurs à se migrer rapidement vers l’écosystème Flower, certaines charges de travail OpenFL complexes peuvent nécessiter plus de clarification ou d’aide. Si vous avez d’autres questions, join the Flower Slack (et utilisez le canal #questions) ou rejoignez notre OpenFL Continuity Program pour nous contacter !
Important
Alors que nous travaillons avec la communauté OpenFL, nous mettrons régulièrement à jour ce guide. N’hésitez pas à partager vos commentaires avec nous !
Joyeux migrateur ! 🚀