Quickstart XGBoost¶
Dans cet exemple d’apprentissage fédéré, nous allons apprendre à entraîner un classifieur XGBoost simple sur le jeu de données Higgs en utilisant Flower et XGBoost. Il est recommandé de créer un environnement virtuel et d’exécuter tout dans un virtualenv.
Utilisons flwr new pour créer un projet complet Flower+XGBoost. Il générera tous les fichiers nécessaires pour exécuter une fédération de 10 nœuds en utilisant la stratégie FedXgbBagging. Par défaut, l’application générée utilise un profil de simulation local qui soumet à un SuperLink géré local, qui exécute ensuite le run avec le runtime d’exécution Simulation Flower. Le jeu de données sera partitionné en utilisant les partitions de Flower Dataset’s IidPartitioner.
FedXgbBagging (aggrégation de bootstrap) est une méthode d’ensemble qui améliore la stabilité et l’exactitude dans l’apprentissage automatique, ici appliquée à XGBoost en FL. Chaque client génère un échantillon de bootstrap en sous-échantillonnant ses données et entraîne une arborescence par tour, qui est ensuite agrégée par le serveur et ajoutée au modèle global.
Maintenant que nous avons une idée approximative de ce que cet exemple est sur, allons-y. Tout d’abord, installez Flower dans votre nouvel environnement :
# In a new Python environment
$ pip install flwr[simulation]
Ensuite, exécutez la commande suivante :
$ flwr new @flwrlabs/quickstart-xgboost
Après l’exécution, vous remarquerez un nouveau répertoire nommé quickstart-xgboost créé. Il devrait avoir la structure suivante :
quickstart-xgboost
├── quickstart_xgboost
│ ├── __init__.py
│ ├── client_app.py # Defines your ClientApp
│ ├── server_app.py # Defines your ServerApp
│ └── task.py # Defines your data loading and utility functions
├── pyproject.toml # Project metadata like dependencies and configs
└── README.md
Si vous n’avez pas encore installé le projet et ses dépendances, vous pouvez le faire avec :
# From the directory where your pyproject.toml is
$ pip install -e .
Pour exécuter le projet, faites :
# Run with default arguments and stream logs
$ flwr run . --stream
Le processus flwr run . soumet l’exécution, imprime l’ID d’exécution et retourne sans diffuser les journaux. Pour le workflow local complet, voir Exécuter Flower Localement avec un SuperLink Géré.
Avec les arguments par défaut, vous verrez une sortie en flux comme celle-ci :
Starting local SuperLink on 127.0.0.1:39093...
Successfully started run 1859953118041441032
INFO : Starting FedXgbBagging strategy:
INFO : ├── Number of rounds: 3
INFO : [ROUND 1/3]
INFO : configure_train: Sampled 2 nodes (out of 10)
INFO : aggregate_train: Received 2 results and 0 failures
INFO : └──> Aggregated MetricRecord: {}
INFO : configure_evaluate: Sampled 2 nodes (out of 10)
INFO : aggregate_evaluate: Received 2 results and 0 failures
INFO : └──> Aggregated MetricRecord: {'auc': 0.7677505289821278}
INFO : [ROUND 2/3]
INFO : ...
INFO : [ROUND 3/3]
INFO : ...
INFO : Strategy execution finished in 132.88s
INFO : Final results:
INFO : ServerApp-side Evaluate Metrics:
INFO : {}
Vous pouvez également surcharger les paramètres définis dans la section [tool.flwr.app.config] du fichier pyproject.toml comme suit :
# Override some arguments
$ flwr run . --run-config "num-server-rounds=5 params.eta=0.2"
Voici une explication de chaque composant du projet que vous venez de créer : configurations, partitionnement des données, définition du ClientApp, et définition du ServerApp.
Les Configurations¶
Nous définissons toutes les configurations / hyper-paramètres requises à l’intérieur du fichier pyproject.toml:
[tool.flwr.app.config]
num-server-rounds = 3
fraction-train = 0.1
fraction-evaluate = 0.1
local-epochs = 1
save-model = false
# XGBoost parameters
params.objective = "binary:logistic"
params.eta = 0.1 # Learning rate
params.max-depth = 8
params.eval-metric = "auc"
params.nthread = 16
params.num-parallel-tree = 1
params.subsample = 1
params.tree-method = "hist"
Le local-epochs représente le nombre d’itérations pour la boost local. Nous utilisons le CPU par défaut pour l’entraînement. On peut affecter cela à un GPU en définissant tree-method sur gpu_hist. Nous utilisons AUC comme métrique d’évaluation.
Les données¶
Nous utiliserons Flower Datasets pour télécharger facilement et partitionner le jeu de données Higgs. Dans cet exemple, vous ferez usage du IidPartitioner pour générer des partitions num_partitions. Vous pouvez choisir parmi d’autres partitioners disponibles dans les Datasets Flower :
partitioner = IidPartitioner(num_partitions=num_clients)
fds = FederatedDataset(
dataset="jxie/higgs",
partitioners={"train": partitioner},
)
partition = fds.load_partition(partition_id, split="train")
partition.set_format("numpy")
# Train/test splitting
train_data, valid_data, num_train, num_val = train_test_split(
partition, test_fraction=0.2, seed=42
)
# Reformat data to DMatrix for xgboost
train_dmatrix = transform_dataset_to_dmatrix(train_data)
valid_dmatrix = transform_dataset_to_dmatrix(valid_data)
Nous effectuons une séparation entraînement-évaluation en utilisant la partition donnée (données locales du client), et reformattons les données pour le package DMatrix de xgboost. Les fonctions de train_test_split et transform_dataset_to_dmatrix sont définies comme suit :
def train_test_split(partition, test_fraction, seed):
"""Split the data into train and validation set given split rate."""
train_test = partition.train_test_split(test_size=test_fraction, seed=seed)
partition_train = train_test["train"]
partition_test = train_test["test"]
num_train = len(partition_train)
num_test = len(partition_test)
return partition_train, partition_test, num_train, num_test
def transform_dataset_to_dmatrix(data):
"""Transform dataset to DMatrix format for xgboost."""
x = data["inputs"]
y = data["label"]
new_data = xgb.DMatrix(x, label=y)
return new_data
L’Application Client¶
Les principales modifications que nous devons apporter pour utiliser XGBoost avec Flower ont trait à la conversion du ArrayRecord reçu dans le Message en objet binaire chargable, et vice versa lors de la génération de la réponse Message du ClientApp. Nous pouvons faire usage des conversions suivantes:
@app.train()
def train(msg: Message, context: Context):
# Instantiate a XGBoost model
bst = xgb.Booster(params=params)
global_model = bytearray(msg.content["arrays"]["0"].numpy().tobytes())
# Load global model into booster
bst.load_model(global_model)
# ...
# Convert XGB object back into an ArrayRecord
# Note: we store the model as the first item in a list into ArrayRecord,
# which can be accessed using index ["0"].
local_model = bst.save_raw("json")
model_np = np.frombuffer(local_model, dtype=np.uint8)
model_record = ArrayRecord([model_np])
Le reste de la fonctionnalité est directement inspiré du cas centralisé. Le ClientApp comporte trois méthodes de base (train, evaluate, et query) que nous pouvons implémenter à des fins différentes. Par exemple : train pour entraîner le modèle reçu en utilisant les données locales ; evaluate pour évaluer la performance du modèle reçu sur un jeu de validation ; et query pour récupérer des informations sur le nœud exécutant le ClientApp. Dans ce tutoriel, nous ne ferons que faire usage de train et evaluate.
Voyons comment la méthode train peut être implémentée. Elle reçoit en arguments d’entrée un Message depuis le ServerApp. Par défaut, elle porte :
un
ArrayRecordavec les tableaux du modèle à fédérer. Par défaut, ils peuvent être récupérés avec la clé"arrays"lors de l’accès au contenu du message.une
ConfigRecordavec la configuration transmise depuis leServerApp. Par défaut, elle peut être récupérée avec la clé"config"lors de l’accès au contenu du message.
La méthode train reçoit également le Context, ce qui donne accès aux configurations pour votre exécution et votre nœud. Les hyperparamètres de la configuration d’exécution sont définis dans la partie pyproject.toml de votre application Flower. La configuration du nœud ne peut être définie qu’en exécutant Flower avec le Deployment Runtime et ce n’est pas directement configurable pendant les simulations.
# Flower ClientApp
app = ClientApp()
@app.train()
def train(msg: Message, context: Context) -> Message:
# Load model and data
partition_id = context.node_config["partition-id"]
num_partitions = context.node_config["num-partitions"]
train_dmatrix, _, num_train, _ = load_data(partition_id, num_partitions)
# Read from run config
num_local_round = context.run_config["local-epochs"]
# Flatted config dict and replace "-" with "_"
cfg = replace_keys(unflatten_dict(context.run_config))
params = cfg["params"]
global_round = msg.content["config"]["server-round"]
if global_round == 1:
# First round local training
bst = xgb.train(
params,
train_dmatrix,
num_boost_round=num_local_round,
)
else:
bst = xgb.Booster(params=params)
global_model = bytearray(msg.content["arrays"]["0"].numpy().tobytes())
# Load global model into booster
bst.load_model(global_model)
# Local training
bst = _local_boost(bst, num_local_round, train_dmatrix)
# Save model
local_model = bst.save_raw("json")
model_np = np.frombuffer(local_model, dtype=np.uint8)
# Construct reply message
# Note: we store the model as the first item in a list into ArrayRecord,
# which can be accessed using index ["0"].
model_record = ArrayRecord([model_np])
metrics = {
"num-examples": num_train,
}
metric_record = MetricRecord(metrics)
content = RecordDict({"arrays": model_record, "metrics": metric_record})
return Message(content=content, reply_to=msg)
À la première ronde, nous appelons xgb.train() pour construire le premier ensemble d’arbres. À partir de la deuxième ronde, nous chargeons le modèle global envoyé par le serveur dans un nouveau objet Booster, et mettons à jour les poids du modèle sur les données de formation locales avec la fonction _local_boost comme suit:
def _local_boost(bst_input, num_local_round, train_dmatrix):
# Update trees based on local training data.
for i in range(num_local_round):
bst_input.update(train_dmatrix, bst_input.num_boosted_rounds())
# Bagging: extract the last N=num_local_round trees for sever aggregation
bst = bst_input[
bst_input.num_boosted_rounds()
- num_local_round : bst_input.num_boosted_rounds()
]
return bst
Étant donné num_local_round, nous mettons à jour les arbres en appelant la méthode bst_input.update. Après entraînement, les derniers N=num_local_round arbres seront extraits pour être envoyés au serveur.
La méthode @app.evaluate() serait presque identique avec deux exceptions : (1) le modèle n’est pas entraîné localement, mais il est utilisé pour évaluer sa performance sur le jeu de validation local ; (2) inclure le modèle dans la réponse Message n’est plus nécessaire car il ne subit aucune modification locale.
L’Application Serveur¶
Pour construire un ServerApp, nous définissons sa méthode @app.main(). Cette méthode reçoit en entrée les arguments suivants :
Un objet
Gridqui sera utilisé pour interagir avec les nœuds s’exécutant laClientAppafin d’impliquer les clients dans une ronde d’entraînement/évaluation/requête ou autre.Un objet
Contextqui fournit accès à la configuration de l’exécution.
Dans cet exemple, nous utilisons la stratégie FedXgbBagging. Ensuite, nous initialisons un modèle global vide qui servira de modèle XGBoost sur le côté client à la première ronde. Après cela, l’exécution de la stratégie est lancée lorsqu’on invoque sa méthode start. À cette fin, on passe:
l’objet
Grid.- un
ArrayRecordportant un modèle initialisé aléatoirement qui servira de modèle global modèle à fédérer.
- un
le paramètre
num_roundsspécifiant combien de rondes effectuer.
# Create ServerApp
app = ServerApp()
@app.main()
def main(grid: Grid, context: Context) -> None:
# Read run config
num_rounds = context.run_config["num-server-rounds"]
fraction_train = context.run_config["fraction-train"]
fraction_evaluate = context.run_config["fraction-evaluate"]
# Flatted config dict and replace "-" with "_"
cfg = replace_keys(unflatten_dict(context.run_config))
params = cfg["params"]
# Init global model
# Init with an empty object; the XGBooster will be created
# and trained on the client side.
global_model = b""
# Note: we store the model as the first item in a list into ArrayRecord,
# which can be accessed using index ["0"].
arrays = ArrayRecord([np.frombuffer(global_model, dtype=np.uint8)])
# Initialize FedXgbBagging strategy
strategy = FedXgbBagging(
fraction_train=fraction_train,
fraction_evaluate=fraction_evaluate,
)
# Start strategy, run FedXgbBagging for `num_rounds`
result = strategy.start(
grid=grid,
initial_arrays=arrays,
num_rounds=num_rounds,
)
if context.run_config["save-model"]:
# Save final model to disk
bst = xgb.Booster(params=params)
global_model = bytearray(result.arrays["0"].numpy().tobytes())
# Load global model into booster
bst.load_model(global_model)
# Save model
print("\nSaving final model to disk...")
bst.save_model("final_model.json")
Notez que la méthode start du stratégie retourne un objet Result. Cet objet contient toutes les informations pertinentes sur le processus FL, y compris les poids du modèle final sous forme de ArrayRecord, et des métriques d’entraînement et d’évaluation fédérées sous forme de MetricRecords.
Félicitations ! Vous avez réussi à créer et exécuter votre premier système d’apprentissage fédéré.
Astuce
Vérifiez la documentation de Run simulations pour en savoir plus sur la façon de configurer et d’exécuter les simulations Flower.
Note
Vérifiez le chapitre source code de la version étendue de ce tutoriel dans examples/xgboost-quickstart du dépôt GitHub Flower.