연합 평가

연합 학습 시스템에서 모델을 평가하는 데는 중앙 집중식(또는 서버 측) 평가와 연합(또는 클라이언트 측) 평가라는 두 가지 주요 접근 방식이 있습니다.

중앙 집중식 평가

기본 제공 전략

모든 기본 제공 전략은 초기화 중에 평가 함수를 제공하여 중앙 집중식 평가를 지원합니다. 평가 함수는 현재 글로벌 모델 파라미터를 입력으로 받아 평가 결과를 반환할 수 있는 모든 함수입니다:

from flwr.common import Context, NDArrays, Scalar
from flwr.server import ServerApp, ServerAppComponents, ServerConfig
from flwr.server.strategy import FedAvg

from typing import Dict, Optional, Tuple


def get_evaluate_fn(model):
    """Return an evaluation function for server-side evaluation."""

    # Load data and model here to avoid the overhead of doing it in `evaluate` itself
    (x_train, y_train), _ = tf.keras.datasets.cifar10.load_data()

    # Use the last 5k training examples as a validation set
    x_val, y_val = x_train[45000:50000], y_train[45000:50000]

    # The `evaluate` function will be called after every round
    def evaluate(
        server_round: int, parameters: NDArrays, config: Dict[str, Scalar]
    ) -> Optional[Tuple[float, Dict[str, Scalar]]]:
        model.set_weights(parameters)  # Update model with the latest parameters
        loss, accuracy = model.evaluate(x_val, y_val)
        return loss, {"accuracy": accuracy}

    return evaluate


def server_fn(context: Context):
    # Read from config
    num_rounds = context.run_config["num-server-rounds"]
    config = ServerConfig(num_rounds=num_rounds)

    # Load and compile model for server-side parameter evaluation
    model = tf.keras.applications.EfficientNetB0(
        input_shape=(32, 32, 3), weights=None, classes=10
    )
    model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])

    # Create strategy
    strategy = FedAvg(
        # ... other FedAvg arguments
        evaluate_fn=get_evaluate_fn(model),
    )

    return ServerAppComponents(strategy=strategy, config=config)


# Create ServerApp
app = ServerApp(server_fn=server_fn)

사용자 정의 전략

The Strategy abstraction provides a method called evaluate that can directly be used to evaluate the current global model parameters. The current server implementation calls evaluate after parameter aggregation and before federated evaluation (see next paragraph).

연합 평가

연합 평가 구현

Client-side evaluation happens in the Client.evaluate method and can be configured from the server side.

from flwr.client import NumPyClient


class FlowerClient(NumPyClient):
    def __init__(self, model, x_train, y_train, x_test, y_test):
        self.model = model
        self.x_train, self.y_train = x_train, y_train
        self.x_test, self.y_test = x_test, y_test

    def fit(self, parameters, config):
        # ...
        pass

    def evaluate(self, parameters, config):
        """Evaluate parameters on the locally held test set."""

        # Update local model with global parameters
        self.model.set_weights(parameters)

        # Get config values
        steps: int = config["val_steps"]

        # Evaluate global model parameters on the local test data and return results
        loss, accuracy = self.model.evaluate(self.x_test, self.y_test, 32, steps=steps)
        num_examples_test = len(self.x_test)
        return loss, num_examples_test, {"accuracy": accuracy}

연합 평가 구성

연합 평가는 서버 측에서 구성할 수 있습니다. 기본 제공 전략은 다음 인수를 지원합니다:

  • fraction_evaluate: a float defining the fraction of clients that will be selected for evaluation. If fraction_evaluate is set to 0.1 and 100 clients are connected to the server, then 10 will be randomly selected for evaluation. If fraction_evaluate is set to 0.0, federated evaluation will be disabled.

  • min_evaluate_clients: an int: the minimum number of clients to be selected for evaluation. If fraction_evaluate is set to 0.1, min_evaluate_clients is set to 20, and 100 clients are connected to the server, then 20 clients will be selected for evaluation.

  • min_available_clients: an int that defines the minimum number of clients which need to be connected to the server before a round of federated evaluation can start. If fewer than min_available_clients are connected to the server, the server will wait until more clients are connected before it continues to sample clients for evaluation.

  • on_evaluate_config_fn: a function that returns a configuration dictionary which will be sent to the selected clients. The function will be called during each round and provides a convenient way to customize client-side evaluation from the server side, for example, to configure the number of validation steps performed.

from flwr.common import Context
from flwr.server import ServerApp, ServerAppComponents, ServerConfig
from flwr.server.strategy import FedAvg


def evaluate_config(server_round: int):
    """Return evaluation configuration dict for each round.
    Perform five local evaluation steps on each client (i.e., use five
    batches) during rounds, one to three, then increase to ten local
    evaluation steps.
    """
    val_steps = 5 if server_round < 4 else 10
    return {"val_steps": val_steps}


# Create strategy
strategy = FedAvg(
    # ... other FedAvg arguments
    fraction_evaluate=0.2,
    min_evaluate_clients=2,
    min_available_clients=10,
    on_evaluate_config_fn=evaluate_config,
)


def server_fn(context: Context):
    num_rounds = context.run_config["num-server-rounds"]
    config = ServerConfig(num_rounds=num_rounds)
    return ServerAppComponents(strategy=strategy, config=config)


# Create ServerApp
app = ServerApp(server_fn=server_fn)

훈련 중 로컬 모델 업데이트 평가

Model parameters can also be evaluated during training. Client.fit can return arbitrary evaluation results as a dictionary:

from flwr.client import NumPyClient


class FlowerClient(NumPyClient):
    def __init__(self, model, x_train, y_train, x_test, y_test):
        self.model = model
        self.x_train, self.y_train = x_train, y_train
        self.x_test, self.y_test = x_test, y_test

    def fit(self, parameters, config):
        """Train parameters on the locally held training set."""

        # Update local model parameters
        self.model.set_weights(parameters)

        # Train the model using hyperparameters from config
        history = self.model.fit(
            self.x_train, self.y_train, batch_size=32, epochs=2, validation_split=0.1
        )

        # Return updated model parameters and validation results
        parameters_prime = self.model.get_weights()
        num_examples_train = len(self.x_train)
        results = {
            "loss": history.history["loss"][0],
            "accuracy": history.history["accuracy"][0],
            "val_loss": history.history["val_loss"][0],
            "val_accuracy": history.history["val_accuracy"][0],
        }
        return parameters_prime, num_examples_train, results

    def evaluate(self, parameters, config):
        # ...
        pass

전체 코드 예제

For a full code example that uses both centralized and federated evaluation, see the Advanced TensorFlow Example (the same approach can be applied to workloads implemented in any other framework).