Quickstart scikit-learn

Dans ce tutoriel d’apprentissage fédéré, nous allons apprendre à entraîner une régression logistique sur le dataset Iris en utilisant Flower et scikit-learn. Il est recommandé de créer un environnement virtuel et d’exécuter tout cela dans un virtualenv.

Utilisez flwr new pour créer un projet complet Flower+scikit-learn. Il générera tous les fichiers nécessaires pour exécuter une fédération de 10 nœuds en utilisant FedAvg. Par défaut, l’application générée utilise un profil de simulation local qui flwr run soumet à un SuperLink géré local, qui exécute ensuite l’exécution avec Flower Simulation Runtime. Le dataset sera partitionné en utilisant le Flower Datasets de IidPartitioner.

Maintenant que nous avons une idée approximative de ce que cet exemple est sur, allons-y. Tout d’abord, installez Flower dans votre nouvel environnement :

# In a new Python environment
$ pip install flwr[simulation]

Ensuite, exécutez la commande suivante :

$ flwr new @flwrlabs/quickstart-sklearn

Après avoir exécuté cela, vous remarquerez que un nouveau répertoire nommé quickstart-sklearn a été créé. Il devrait avoir la structure suivante :

quickstart-sklearn
├── sklearnexample
│   ├── __init__.py
│   ├── client_app.py   # Defines your ClientApp   ├── server_app.py   # Defines your ServerApp   └── task.py         # Defines your model, training and data loading
├── pyproject.toml      # Project metadata like dependencies and configs
└── README.md

Si vous n’avez pas encore installé le projet et ses dépendances, vous pouvez le faire avec :

# From the directory where your pyproject.toml is
$ pip install -e .

Pour lancer le projet, faites:

# Run with default arguments and stream logs
$ flwr run . --stream

Le processus flwr run . soumet l’exécution, imprime l’ID d’exécution et retourne sans diffuser les journaux. Pour le workflow local complet, voir Exécuter Flower Localement avec un SuperLink Géré.

Avec les arguments par défaut, vous verrez une sortie en flux continu comme celle-ci :

Starting local SuperLink on 127.0.0.1:39093...
Successfully started run 1859953118041441032
INFO :      Starting FedAvg strategy:
INFO :          ├── Number of rounds: 3
INFO :      [ROUND 1/3]
INFO :      configure_train: Sampled 10 nodes (out of 10)
INFO :      aggregate_train: Received 10 results and 0 failures
INFO :          └──> Aggregated MetricRecord: {'train_logloss': 1.3937176081476854}
INFO :      configure_evaluate: Sampled 10 nodes (out of 10)
INFO :      aggregate_evaluate: Received 10 results and 0 failures
INFO :          └──> Aggregated MetricRecord: {'test_logloss': 1.23306, 'accuracy': 0.69154, 'precision': 0.68659, 'recall': 0.68046, 'f1': 0.65752}
INFO :      [ROUND 2/3]
INFO :      ...
INFO :      [ROUND 3/3]
INFO :      ...
INFO :      Strategy execution finished in 17.87s
INFO :      Final results:
INFO :          ServerApp-side Evaluate Metrics:
INFO :          {}

Vous pouvez également surcharger les paramètres définis dans la section [tool.flwr.app.config] de pyproject.toml comme suit:

# Override some arguments
$ flwr run . --run-config "num-server-rounds=5 local-epochs=2"

Voici une explication de chaque composant dans le projet que vous venez de créer : partitionnement du jeu de données, modèle, définir la ClientApp et définir la ServerApp.

Les données

Ce tutoriel utilise Flower Datasets pour télécharger et partitionner facilement le jeu de données Iris. Dans cet exemple, vous utiliserez la méthode IidPartitioner pour générer des partitions num_partitions. Vous pouvez choisir parmi les other partitioners disponibles dans Flower Datasets. Chaque partie ClientApp appellera cette fonction pour créer des chargeurs de données avec les données correspondant à leur partition de données. Notez que, dans cet exemple, seuls un sous-ensemble des colonnes seront utilisés.

FEATURES = ["petal_length", "petal_width", "sepal_length", "sepal_width"]

partitioner = IidPartitioner(num_partitions=num_partitions)
fds = FederatedDataset(dataset="hitorilabs/iris", partitioners={"train": partitioner})
dataset = fds.load_partition(partition_id, "train").with_format("pandas")[:]
X = dataset[FEATURES]
y = dataset["species"]
# Split the on-edge data: 80% train, 20% test
X_train, X_test = X[: int(0.8 * len(X))], X[int(0.8 * len(X)) :]
y_train, y_test = y[: int(0.8 * len(y))], y[int(0.8 * len(y)) :]
return X_train.values, y_train.values, X_test.values, y_test.values

Le Modèle

Nous définissons le modèle LogisticRegression à partir de scikit-learn dans la fonction create_log_reg_and_instantiate_parameters(). Cette fonction d’aide initialise également les paramètres du modèle en utilisant la fonction utilitaire set_initial_params() dans le même fichier.

def create_log_reg_and_instantiate_parameters(penalty):
    model = LogisticRegression(
        penalty=penalty,
        max_iter=1,  # local epoch
        warm_start=True,  # prevent refreshing weights when fitting,
        solver="saga",
    )
    # Setting initial parameters, akin to model.compile for keras models
    set_initial_params(model, n_features=len(FEATURES), n_classes=len(UNIQUE_LABELS))
    return model

L’Application Client

Les principales modifications que nous devons apporter pour utiliser Scikit-learn avec Flower ont affaire à la conversion du ArrayRecord reçu dans la partie Message en des tableaux numpy et les utiliser pour définir les paramètres du modèle. Après avoir entraîné, une autre fonction d’aide peut être utilisée pour extraire puis packer les tableaux numpy mis à jour dans un dictionnaire de l’état `Message` à partir du ClientApp. Nous pouvons faire usage des méthodes intégrées dans le ArrayRecord pour effectuer ces conversions:

@app.train()
def train(msg: Message, context: Context):

    # Create LogisticRegression Model
    penalty = context.run_config["penalty"]
    # Create LogisticRegression Model
    model = create_log_reg_and_instantiate_parameters(penalty)

    # Apply received parameters
    ndarrays = msg.content["arrays"].to_numpy_ndarrays()
    set_model_params(model, ndarrays)

    # Train the model
    ...

    # Extract the updated model parameters with auxhiliary function
    ndarrays = get_model_params(model)
    # Pack the updated parameters into an ArrayRecord
    model_record = ArrayRecord(ndarrays)

Le reste de la fonctionnalité est directement inspiré du cas centralisé. Le ClientApp comporte trois méthodes de base (train, evaluate, et query) que nous pouvons implémenter à des fins différentes. Par exemple : train pour entraîner le modèle reçu en utilisant les données locales ; evaluate pour évaluer la performance du modèle reçu sur un jeu de validation ; et query pour récupérer des informations sur le nœud exécutant le ClientApp. Dans ce tutoriel, nous ne ferons que faire usage de train et evaluate.

Voyons comment la méthode train peut être implémentée. Elle reçoit en arguments d’entrée un Message depuis le ServerApp. Par défaut, elle porte :

  • un ArrayRecord avec les tableaux du modèle à fédérer. Par défaut, ils peuvent être récupérés avec la clé "arrays" lors de l’accès au contenu du message.

  • une ConfigRecord avec la configuration transmise depuis le ServerApp. Par défaut, elle peut être récupérée avec la clé "config" lors de l’accès au contenu du message.

La méthode train reçoit également le Context, donnant accès aux configurations pour votre exécution et nœud. Les hyperparamètres de la configuration d’exécution sont définis dans la section pyproject.toml de votre application Flower. La configuration du nœud ne peut être configurée que lors de l’exécution de Flower avec le Deployment Runtime et ce, directement configurable pendant les simulations.

app = ClientApp()


@app.train()
def train(msg: Message, context: Context):
    """Train the model on local data."""

    # Create LogisticRegression Model
    penalty = context.run_config["penalty"]
    # Create LogisticRegression Model
    model = create_log_reg_and_instantiate_parameters(penalty)

    # Apply received parameters
    ndarrays = msg.content["arrays"].to_numpy_ndarrays()
    set_model_params(model, ndarrays)

    # Load the data
    partition_id = context.node_config["partition-id"]
    num_partitions = context.node_config["num-partitions"]
    X_train, y_train, _, _ = load_data(partition_id, num_partitions)

    # Ignore convergence failure due to low local epochs
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        # Train the model on local data
        model.fit(X_train, y_train)

    # Let's compute train loss
    y_train_pred_proba = model.predict_proba(X_train)
    train_logloss = log_loss(y_train, y_train_pred_proba, labels=UNIQUE_LABELS)
    accuracy = model.score(X_train, y_train)

    # Construct and return reply Message
    ndarrays = get_model_params(model)
    model_record = ArrayRecord(ndarrays)
    metrics = {
        "num-examples": len(X_train),
        "train_logloss": train_logloss,
        "train_accuracy": accuracy,
    }
    metric_record = MetricRecord(metrics)
    content = RecordDict({"arrays": model_record, "metrics": metric_record})
    return Message(content=content, reply_to=msg)

La méthode @app.evaluate de mirroir train, mais elle n’évalue que le modèle reçu sur l’ensemble de validation local. Elle retourne un objet contenant la perte et l’exactitude d’évaluation, sans inclure les poids du modèle, puisqu’ils ne sont pas modifiés pendant l’évaluation.

L’Application Serveur

Pour construire un ServerApp, nous définissons sa méthode @app.main(). Cette méthode reçoit en arguments d’entrée :

  • Un objet Grid qui sera utilisé pour interagir avec les nœuds s’exécutant la ClientApp afin d’impliquer les clients dans une ronde d’entraînement/évaluation/requête ou autre.

  • Un objet Context qui fournit accès à la configuration de l’exécution.

Dans cet exemple, nous utilisons FedAvg et la configurons avec une valeur spécifique de fraction_train qui est lue à partir de la configuration d’exécution. Vous pouvez trouver la valeur par défaut définie dans la partie pyproject.toml. Ensuite, l’exécution de la stratégie est lancée lorsqu’on invoque sa méthode start. À cela on passe:

  • l’objet Grid.

  • un ArrayRecord portant un modèle initialisé aléatoirement qui servira de modèle global pour fédérer.

  • Un objet ConfigRecord avec les hyperparamètres d’entraînement à envoyer aux clients. La stratégie insérera également le numéro actuel de ronde dans cette configuration avant de l’envoyer aux nœuds participants.

  • Le paramètre num_rounds spécifiant combien de rondes de FedAvg effectuer.

app = ServerApp()


@app.main()
def main(grid: Grid, context: Context) -> None:
    """Main entry point for the ServerApp."""

    # Read run config
    num_rounds: int = context.run_config["num-server-rounds"]

    # Create LogisticRegression Model
    penalty = context.run_config["penalty"]
    model = create_log_reg_and_instantiate_parameters(penalty)
    # Construct ArrayRecord representation
    arrays = ArrayRecord(get_model_params(model))

    # Initialize FedAvg strategy
    strategy = FedAvg(fraction_train=1.0, fraction_evaluate=1.0)

    # Start strategy, run FedAvg for `num_rounds`
    result = strategy.start(
        grid=grid,
        initial_arrays=arrays,
        num_rounds=num_rounds,
    )

    if context.run_config["save-model"]:
        # Save final model parameters
        print("\nSaving final model to disk...")
        ndarrays = result.arrays.to_numpy_ndarrays()
        set_model_params(model, ndarrays)
        joblib.dump(model, "logreg_model.pkl")

Félicitations ! Vous avez réussi à construire et exécuter votre premier système d’apprentissage fédéré dans scikit-learn sur le jeu de données Iris en utilisant la nouvelle API Message.

Astuce

Vérifiez la documentation de Run simulations pour en savoir plus sur la façon de configurer et d’exécuter les simulations Flower.

Note

Vérifiez le code source de une autre application Flower utilisant scikit-learn dans le dépôt GitHub Flower GitHub repository.