# Copyright 2020 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower server."""
import concurrent.futures
import io
import timeit
from logging import INFO, WARN
from typing import Optional, Union
from flwr.common import (
Code,
DisconnectRes,
EvaluateIns,
EvaluateRes,
FitIns,
FitRes,
Parameters,
ReconnectIns,
Scalar,
)
from flwr.common.logger import log
from flwr.common.typing import GetParametersIns
from flwr.server.client_manager import ClientManager, SimpleClientManager
from flwr.server.client_proxy import ClientProxy
from flwr.server.history import History
from flwr.server.strategy import FedAvg, Strategy
from .server_config import ServerConfig
FitResultsAndFailures = tuple[
list[tuple[ClientProxy, FitRes]],
list[Union[tuple[ClientProxy, FitRes], BaseException]],
]
EvaluateResultsAndFailures = tuple[
list[tuple[ClientProxy, EvaluateRes]],
list[Union[tuple[ClientProxy, EvaluateRes], BaseException]],
]
ReconnectResultsAndFailures = tuple[
list[tuple[ClientProxy, DisconnectRes]],
list[Union[tuple[ClientProxy, DisconnectRes], BaseException]],
]
[docs]
class Server:
"""Flower server."""
def __init__(
self,
*,
client_manager: ClientManager,
strategy: Optional[Strategy] = None,
) -> None:
self._client_manager: ClientManager = client_manager
self.parameters: Parameters = Parameters(
tensors=[], tensor_type="numpy.ndarray"
)
self.strategy: Strategy = strategy if strategy is not None else FedAvg()
self.max_workers: Optional[int] = None
[docs]
def set_max_workers(self, max_workers: Optional[int]) -> None:
"""Set the max_workers used by ThreadPoolExecutor."""
self.max_workers = max_workers
[docs]
def set_strategy(self, strategy: Strategy) -> None:
"""Replace server strategy."""
self.strategy = strategy
[docs]
def client_manager(self) -> ClientManager:
"""Return ClientManager."""
return self._client_manager
# pylint: disable=too-many-locals
[docs]
def fit(self, num_rounds: int, timeout: Optional[float]) -> tuple[History, float]:
"""Run federated averaging for a number of rounds."""
history = History()
# Initialize parameters
log(INFO, "[INIT]")
self.parameters = self._get_initial_parameters(server_round=0, timeout=timeout)
log(INFO, "Starting evaluation of initial global parameters")
res = self.strategy.evaluate(0, parameters=self.parameters)
if res is not None:
log(
INFO,
"initial parameters (loss, other metrics): %s, %s",
res[0],
res[1],
)
history.add_loss_centralized(server_round=0, loss=res[0])
history.add_metrics_centralized(server_round=0, metrics=res[1])
else:
log(INFO, "Evaluation returned no results (`None`)")
# Run federated learning for num_rounds
start_time = timeit.default_timer()
for current_round in range(1, num_rounds + 1):
log(INFO, "")
log(INFO, "[ROUND %s]", current_round)
# Train model and replace previous global model
res_fit = self.fit_round(
server_round=current_round,
timeout=timeout,
)
if res_fit is not None:
parameters_prime, fit_metrics, _ = res_fit # fit_metrics_aggregated
if parameters_prime:
self.parameters = parameters_prime
history.add_metrics_distributed_fit(
server_round=current_round, metrics=fit_metrics
)
# Evaluate model using strategy implementation
res_cen = self.strategy.evaluate(current_round, parameters=self.parameters)
if res_cen is not None:
loss_cen, metrics_cen = res_cen
log(
INFO,
"fit progress: (%s, %s, %s, %s)",
current_round,
loss_cen,
metrics_cen,
timeit.default_timer() - start_time,
)
history.add_loss_centralized(server_round=current_round, loss=loss_cen)
history.add_metrics_centralized(
server_round=current_round, metrics=metrics_cen
)
# Evaluate model on a sample of available clients
res_fed = self.evaluate_round(server_round=current_round, timeout=timeout)
if res_fed is not None:
loss_fed, evaluate_metrics_fed, _ = res_fed
if loss_fed is not None:
history.add_loss_distributed(
server_round=current_round, loss=loss_fed
)
history.add_metrics_distributed(
server_round=current_round, metrics=evaluate_metrics_fed
)
# Bookkeeping
end_time = timeit.default_timer()
elapsed = end_time - start_time
return history, elapsed
[docs]
def evaluate_round(
self,
server_round: int,
timeout: Optional[float],
) -> Optional[
tuple[Optional[float], dict[str, Scalar], EvaluateResultsAndFailures]
]:
"""Validate current global model on a number of clients."""
# Get clients and their respective instructions from strategy
client_instructions = self.strategy.configure_evaluate(
server_round=server_round,
parameters=self.parameters,
client_manager=self._client_manager,
)
if not client_instructions:
log(INFO, "configure_evaluate: no clients selected, skipping evaluation")
return None
log(
INFO,
"configure_evaluate: strategy sampled %s clients (out of %s)",
len(client_instructions),
self._client_manager.num_available(),
)
# Collect `evaluate` results from all clients participating in this round
results, failures = evaluate_clients(
client_instructions,
max_workers=self.max_workers,
timeout=timeout,
group_id=server_round,
)
log(
INFO,
"aggregate_evaluate: received %s results and %s failures",
len(results),
len(failures),
)
# Aggregate the evaluation results
aggregated_result: tuple[
Optional[float],
dict[str, Scalar],
] = self.strategy.aggregate_evaluate(server_round, results, failures)
loss_aggregated, metrics_aggregated = aggregated_result
return loss_aggregated, metrics_aggregated, (results, failures)
[docs]
def fit_round(
self,
server_round: int,
timeout: Optional[float],
) -> Optional[
tuple[Optional[Parameters], dict[str, Scalar], FitResultsAndFailures]
]:
"""Perform a single round of federated averaging."""
# Get clients and their respective instructions from strategy
client_instructions = self.strategy.configure_fit(
server_round=server_round,
parameters=self.parameters,
client_manager=self._client_manager,
)
if not client_instructions:
log(INFO, "configure_fit: no clients selected, cancel")
return None
log(
INFO,
"configure_fit: strategy sampled %s clients (out of %s)",
len(client_instructions),
self._client_manager.num_available(),
)
# Collect `fit` results from all clients participating in this round
results, failures = fit_clients(
client_instructions=client_instructions,
max_workers=self.max_workers,
timeout=timeout,
group_id=server_round,
)
log(
INFO,
"aggregate_fit: received %s results and %s failures",
len(results),
len(failures),
)
# Aggregate training results
aggregated_result: tuple[
Optional[Parameters],
dict[str, Scalar],
] = self.strategy.aggregate_fit(server_round, results, failures)
parameters_aggregated, metrics_aggregated = aggregated_result
return parameters_aggregated, metrics_aggregated, (results, failures)
[docs]
def disconnect_all_clients(self, timeout: Optional[float]) -> None:
"""Send shutdown signal to all clients."""
all_clients = self._client_manager.all()
clients = [all_clients[k] for k in all_clients.keys()]
instruction = ReconnectIns(seconds=None)
client_instructions = [(client_proxy, instruction) for client_proxy in clients]
_ = reconnect_clients(
client_instructions=client_instructions,
max_workers=self.max_workers,
timeout=timeout,
)
def _get_initial_parameters(
self, server_round: int, timeout: Optional[float]
) -> Parameters:
"""Get initial parameters from one of the available clients."""
# Server-side parameter initialization
parameters: Optional[Parameters] = self.strategy.initialize_parameters(
client_manager=self._client_manager
)
if parameters is not None:
log(INFO, "Using initial global parameters provided by strategy")
return parameters
# Get initial parameters from one of the clients
log(INFO, "Requesting initial parameters from one random client")
random_client = self._client_manager.sample(1)[0]
ins = GetParametersIns(config={})
get_parameters_res = random_client.get_parameters(
ins=ins, timeout=timeout, group_id=server_round
)
if get_parameters_res.status.code == Code.OK:
log(INFO, "Received initial parameters from one random client")
else:
log(
WARN,
"Failed to receive initial parameters from the client."
" Empty initial parameters will be used.",
)
return get_parameters_res.parameters
def reconnect_clients(
client_instructions: list[tuple[ClientProxy, ReconnectIns]],
max_workers: Optional[int],
timeout: Optional[float],
) -> ReconnectResultsAndFailures:
"""Instruct clients to disconnect and never reconnect."""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
submitted_fs = {
executor.submit(reconnect_client, client_proxy, ins, timeout)
for client_proxy, ins in client_instructions
}
finished_fs, _ = concurrent.futures.wait(
fs=submitted_fs,
timeout=None, # Handled in the respective communication stack
)
# Gather results
results: list[tuple[ClientProxy, DisconnectRes]] = []
failures: list[Union[tuple[ClientProxy, DisconnectRes], BaseException]] = []
for future in finished_fs:
failure = future.exception()
if failure is not None:
failures.append(failure)
else:
result = future.result()
results.append(result)
return results, failures
def reconnect_client(
client: ClientProxy,
reconnect: ReconnectIns,
timeout: Optional[float],
) -> tuple[ClientProxy, DisconnectRes]:
"""Instruct client to disconnect and (optionally) reconnect later."""
disconnect = client.reconnect(
reconnect,
timeout=timeout,
group_id=None,
)
return client, disconnect
def fit_clients(
client_instructions: list[tuple[ClientProxy, FitIns]],
max_workers: Optional[int],
timeout: Optional[float],
group_id: int,
) -> FitResultsAndFailures:
"""Refine parameters concurrently on all selected clients."""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
submitted_fs = {
executor.submit(fit_client, client_proxy, ins, timeout, group_id)
for client_proxy, ins in client_instructions
}
finished_fs, _ = concurrent.futures.wait(
fs=submitted_fs,
timeout=None, # Handled in the respective communication stack
)
# Gather results
results: list[tuple[ClientProxy, FitRes]] = []
failures: list[Union[tuple[ClientProxy, FitRes], BaseException]] = []
for future in finished_fs:
_handle_finished_future_after_fit(
future=future, results=results, failures=failures
)
return results, failures
def fit_client(
client: ClientProxy, ins: FitIns, timeout: Optional[float], group_id: int
) -> tuple[ClientProxy, FitRes]:
"""Refine parameters on a single client."""
fit_res = client.fit(ins, timeout=timeout, group_id=group_id)
return client, fit_res
def _handle_finished_future_after_fit(
future: concurrent.futures.Future, # type: ignore
results: list[tuple[ClientProxy, FitRes]],
failures: list[Union[tuple[ClientProxy, FitRes], BaseException]],
) -> None:
"""Convert finished future into either a result or a failure."""
# Check if there was an exception
failure = future.exception()
if failure is not None:
failures.append(failure)
return
# Successfully received a result from a client
result: tuple[ClientProxy, FitRes] = future.result()
_, res = result
# Check result status code
if res.status.code == Code.OK:
results.append(result)
return
# Not successful, client returned a result where the status code is not OK
failures.append(result)
def evaluate_clients(
client_instructions: list[tuple[ClientProxy, EvaluateIns]],
max_workers: Optional[int],
timeout: Optional[float],
group_id: int,
) -> EvaluateResultsAndFailures:
"""Evaluate parameters concurrently on all selected clients."""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
submitted_fs = {
executor.submit(evaluate_client, client_proxy, ins, timeout, group_id)
for client_proxy, ins in client_instructions
}
finished_fs, _ = concurrent.futures.wait(
fs=submitted_fs,
timeout=None, # Handled in the respective communication stack
)
# Gather results
results: list[tuple[ClientProxy, EvaluateRes]] = []
failures: list[Union[tuple[ClientProxy, EvaluateRes], BaseException]] = []
for future in finished_fs:
_handle_finished_future_after_evaluate(
future=future, results=results, failures=failures
)
return results, failures
def evaluate_client(
client: ClientProxy,
ins: EvaluateIns,
timeout: Optional[float],
group_id: int,
) -> tuple[ClientProxy, EvaluateRes]:
"""Evaluate parameters on a single client."""
evaluate_res = client.evaluate(ins, timeout=timeout, group_id=group_id)
return client, evaluate_res
def _handle_finished_future_after_evaluate(
future: concurrent.futures.Future, # type: ignore
results: list[tuple[ClientProxy, EvaluateRes]],
failures: list[Union[tuple[ClientProxy, EvaluateRes], BaseException]],
) -> None:
"""Convert finished future into either a result or a failure."""
# Check if there was an exception
failure = future.exception()
if failure is not None:
failures.append(failure)
return
# Successfully received a result from a client
result: tuple[ClientProxy, EvaluateRes] = future.result()
_, res = result
# Check result status code
if res.status.code == Code.OK:
results.append(result)
return
# Not successful, client returned a result where the status code is not OK
failures.append(result)
def init_defaults(
server: Optional[Server],
config: Optional[ServerConfig],
strategy: Optional[Strategy],
client_manager: Optional[ClientManager],
) -> tuple[Server, ServerConfig]:
"""Create server instance if none was given."""
if server is None:
if client_manager is None:
client_manager = SimpleClientManager()
if strategy is None:
strategy = FedAvg()
server = Server(client_manager=client_manager, strategy=strategy)
elif strategy is not None:
log(WARN, "Both server and strategy were provided, ignoring strategy")
# Set default config values
if config is None:
config = ServerConfig()
return server, config
def run_fl(
server: Server,
config: ServerConfig,
) -> History:
"""Train a model on the given server and return the History object."""
hist, elapsed_time = server.fit(
num_rounds=config.num_rounds, timeout=config.round_timeout
)
log(INFO, "")
log(INFO, "[SUMMARY]")
log(INFO, "Run finished %s round(s) in %.2fs", config.num_rounds, elapsed_time)
for line in io.StringIO(str(hist)):
log(INFO, "\t%s", line.strip("\n"))
log(INFO, "")
# Graceful shutdown
server.disconnect_all_clients(timeout=config.round_timeout)
return hist