Source code for flwr.simulation.run_simulation

# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower Simulation."""


import argparse
import asyncio
import json
import logging
import sys
import threading
import traceback
from logging import DEBUG, ERROR, INFO, WARNING
from pathlib import Path
from queue import Empty, Queue
from typing import Any, Optional

from flwr.cli.config_utils import load_and_validate
from flwr.cli.utils import get_sha256_hash
from flwr.client import ClientApp
from flwr.common import Context, EventType, RecordSet, event, log, now
from flwr.common.config import get_fused_config_from_dir, parse_config_args
from flwr.common.constant import RUN_ID_NUM_BYTES, Status
from flwr.common.logger import (
    set_logger_propagation,
    update_console_handler,
    warn_deprecated_feature_with_example,
)
from flwr.common.typing import Run, RunStatus, UserConfig
from flwr.server.driver import Driver, InMemoryDriver
from flwr.server.run_serverapp import run as _run
from flwr.server.server_app import ServerApp
from flwr.server.superlink.fleet import vce
from flwr.server.superlink.fleet.vce.backend.backend import BackendConfig
from flwr.server.superlink.linkstate import LinkStateFactory
from flwr.server.superlink.linkstate.in_memory_linkstate import RunRecord
from flwr.server.superlink.linkstate.utils import generate_rand_int_from_bytes
from flwr.simulation.ray_transport.utils import (
    enable_tf_gpu_growth as enable_gpu_growth,
)


def _replace_keys(d: Any, match: str, target: str) -> Any:
    if isinstance(d, dict):
        return {
            k.replace(match, target): _replace_keys(v, match, target)
            for k, v in d.items()
        }
    if isinstance(d, list):
        return [_replace_keys(i, match, target) for i in d]
    return d


# Entry point from CLI
# pylint: disable=too-many-locals
def run_simulation_from_cli() -> None:
    """Run Simulation Engine from the CLI."""
    args = _parse_args_run_simulation().parse_args()

    event(
        EventType.CLI_FLOWER_SIMULATION_ENTER,
        event_details={"backend": args.backend, "num-supernodes": args.num_supernodes},
    )

    if args.enable_tf_gpu_growth:
        warn_deprecated_feature_with_example(
            "Passing `--enable-tf-gpu-growth` is deprecated.",
            example_message="Instead, set the `TF_FORCE_GPU_ALLOW_GROWTH` environmnet "
            "variable to true.",
            code_example='TF_FORCE_GPU_ALLOW_GROWTH="true" flower-simulation <...>',
        )

    # Load JSON config
    backend_config_dict = json.loads(args.backend_config)

    if backend_config_dict:
        # Backend config internally operates with `_` not with `-`
        backend_config_dict = _replace_keys(backend_config_dict, match="-", target="_")
        log(DEBUG, "backend_config_dict: %s", backend_config_dict)

    run_id = (
        generate_rand_int_from_bytes(RUN_ID_NUM_BYTES)
        if args.run_id is None
        else args.run_id
    )

    app_path = Path(args.app)
    if not app_path.is_dir():
        log(ERROR, "--app is not a directory")
        sys.exit("Simulation Engine cannot start.")

    # Load pyproject.toml
    config, errors, warnings = load_and_validate(
        app_path / "pyproject.toml", check_module=False
    )
    if errors:
        raise ValueError(errors)

    if warnings:
        log(WARNING, warnings)

    if config is None:
        raise ValueError("Config extracted from FAB's pyproject.toml is not valid")

    # Get ClientApp and SeverApp components
    app_components = config["tool"]["flwr"]["app"]["components"]
    client_app_attr = app_components["clientapp"]
    server_app_attr = app_components["serverapp"]

    override_config = parse_config_args(
        [args.run_config] if args.run_config else args.run_config
    )
    fused_config = get_fused_config_from_dir(app_path, override_config)

    # Create run
    run = Run.create_empty(run_id)
    run.override_config = override_config

    _ = _run_simulation(
        server_app_attr=server_app_attr,
        client_app_attr=client_app_attr,
        num_supernodes=args.num_supernodes,
        backend_name=args.backend,
        backend_config=backend_config_dict,
        app_dir=args.app,
        run=run,
        enable_tf_gpu_growth=args.enable_tf_gpu_growth,
        verbose_logging=args.verbose,
        server_app_run_config=fused_config,
        is_app=True,
        exit_event=EventType.CLI_FLOWER_SIMULATION_LEAVE,
    )


# Entry point from Python session (script or notebook)
# pylint: disable=too-many-arguments,too-many-positional-arguments
[docs] def run_simulation( server_app: ServerApp, client_app: ClientApp, num_supernodes: int, backend_name: str = "ray", backend_config: Optional[BackendConfig] = None, enable_tf_gpu_growth: bool = False, verbose_logging: bool = False, ) -> None: r"""Run a Flower App using the Simulation Engine. Parameters ---------- server_app : ServerApp The `ServerApp` to be executed. It will send messages to different `ClientApp` instances running on different (virtual) SuperNodes. client_app : ClientApp The `ClientApp` to be executed by each of the SuperNodes. It will receive messages sent by the `ServerApp`. num_supernodes : int Number of nodes that run a ClientApp. They can be sampled by a Driver in the ServerApp and receive a Message describing what the ClientApp should perform. backend_name : str (default: ray) A simulation backend that runs `ClientApp`s. backend_config : Optional[BackendConfig] 'A dictionary to configure a backend. Separate dictionaries to configure different elements of backend. Supported top-level keys are `init_args` for values parsed to initialisation of backend, `client_resources` to define the resources for clients, and `actor` to define the actor parameters. Values supported in <value> are those included by `flwr.common.typing.ConfigsRecordValues`. enable_tf_gpu_growth : bool (default: False) A boolean to indicate whether to enable GPU growth on the main thread. This is desirable if you make use of a TensorFlow model on your `ServerApp` while having your `ClientApp` running on the same GPU. Without enabling this, you might encounter an out-of-memory error because TensorFlow, by default, allocates all GPU memory. Read more about how `tf.config.experimental.set_memory_growth()` works in the TensorFlow documentation: https://www.tensorflow.org/api/stable. verbose_logging : bool (default: False) When disabled, only INFO, WARNING and ERROR log messages will be shown. If enabled, DEBUG-level logs will be displayed. """ event( EventType.PYTHON_API_RUN_SIMULATION_ENTER, event_details={"backend": backend_name, "num-supernodes": num_supernodes}, ) if enable_tf_gpu_growth: warn_deprecated_feature_with_example( "Passing `enable_tf_gpu_growth=True` is deprecated.", example_message="Instead, set the `TF_FORCE_GPU_ALLOW_GROWTH` environment " "variable to true.", code_example='import os;os.environ["TF_FORCE_GPU_ALLOW_GROWTH"]="true"' "\n\tflwr.simulation.run_simulationt(...)", ) _ = _run_simulation( num_supernodes=num_supernodes, client_app=client_app, server_app=server_app, backend_name=backend_name, backend_config=backend_config, enable_tf_gpu_growth=enable_tf_gpu_growth, verbose_logging=verbose_logging, exit_event=EventType.PYTHON_API_RUN_SIMULATION_LEAVE, )
# pylint: disable=too-many-arguments,too-many-positional-arguments def run_serverapp_th( server_app_attr: Optional[str], server_app: Optional[ServerApp], server_app_run_config: UserConfig, driver: Driver, app_dir: str, f_stop: threading.Event, has_exception: threading.Event, enable_tf_gpu_growth: bool, run_id: int, ctx_queue: "Queue[Context]", ) -> threading.Thread: """Run SeverApp in a thread.""" def server_th_with_start_checks( tf_gpu_growth: bool, stop_event: threading.Event, exception_event: threading.Event, _driver: Driver, _server_app_dir: str, _server_app_run_config: UserConfig, _server_app_attr: Optional[str], _server_app: Optional[ServerApp], _ctx_queue: "Queue[Context]", ) -> None: """Run SeverApp, after check if GPU memory growth has to be set. Upon exception, trigger stop event for Simulation Engine. """ try: if tf_gpu_growth: log(INFO, "Enabling GPU growth for Tensorflow on the server thread.") enable_gpu_growth() # Initialize Context context = Context( run_id=run_id, node_id=0, node_config={}, state=RecordSet(), run_config=_server_app_run_config, ) # Run ServerApp updated_context = _run( driver=_driver, context=context, server_app_dir=_server_app_dir, server_app_attr=_server_app_attr, loaded_server_app=_server_app, ) _ctx_queue.put(updated_context) except Exception as ex: # pylint: disable=broad-exception-caught log(ERROR, "ServerApp thread raised an exception: %s", ex) log(ERROR, traceback.format_exc()) exception_event.set() raise finally: log(DEBUG, "ServerApp finished running.") # Upon completion, trigger stop event if one was passed if stop_event is not None: stop_event.set() log(DEBUG, "Triggered stop event for Simulation Engine.") serverapp_th = threading.Thread( target=server_th_with_start_checks, args=( enable_tf_gpu_growth, f_stop, has_exception, driver, app_dir, server_app_run_config, server_app_attr, server_app, ctx_queue, ), ) serverapp_th.start() return serverapp_th # pylint: disable=too-many-locals,too-many-positional-arguments def _main_loop( num_supernodes: int, backend_name: str, backend_config_stream: str, app_dir: str, is_app: bool, enable_tf_gpu_growth: bool, run: Run, exit_event: EventType, flwr_dir: Optional[str] = None, client_app: Optional[ClientApp] = None, client_app_attr: Optional[str] = None, server_app: Optional[ServerApp] = None, server_app_attr: Optional[str] = None, server_app_run_config: Optional[UserConfig] = None, ) -> Context: """Start ServerApp on a separate thread, then launch Simulation Engine.""" # Initialize StateFactory state_factory = LinkStateFactory(":flwr-in-memory-state:") f_stop = threading.Event() # A Threading event to indicate if an exception was raised in the ServerApp thread server_app_thread_has_exception = threading.Event() serverapp_th = None success = True updated_context = Context( run_id=run.run_id, node_id=0, node_config=UserConfig(), state=RecordSet(), run_config=UserConfig(), ) try: # Register run log(DEBUG, "Pre-registering run with id %s", run.run_id) run.status = RunStatus(Status.RUNNING, "", "") run.starting_at = now().isoformat() run.running_at = run.starting_at state_factory.state().run_ids[run.run_id] = RunRecord(run=run) # type: ignore if server_app_run_config is None: server_app_run_config = {} # Initialize Driver driver = InMemoryDriver(state_factory=state_factory) driver.set_run(run_id=run.run_id) output_context_queue: Queue[Context] = Queue() # Get and run ServerApp thread serverapp_th = run_serverapp_th( server_app_attr=server_app_attr, server_app=server_app, server_app_run_config=server_app_run_config, driver=driver, app_dir=app_dir, f_stop=f_stop, has_exception=server_app_thread_has_exception, enable_tf_gpu_growth=enable_tf_gpu_growth, run_id=run.run_id, ctx_queue=output_context_queue, ) # Start Simulation Engine vce.start_vce( num_supernodes=num_supernodes, client_app_attr=client_app_attr, client_app=client_app, backend_name=backend_name, backend_config_json_stream=backend_config_stream, app_dir=app_dir, is_app=is_app, state_factory=state_factory, f_stop=f_stop, run=run, flwr_dir=flwr_dir, ) updated_context = output_context_queue.get(timeout=3) except Empty: log(DEBUG, "Queue timeout. No context received.") except Exception as ex: log(ERROR, "An exception occurred !! %s", ex) log(ERROR, traceback.format_exc()) success = False raise RuntimeError("An error was encountered. Ending simulation.") from ex finally: # Trigger stop event f_stop.set() event( exit_event, event_details={ "run-id-hash": get_sha256_hash(run.run_id), "success": success, }, ) if serverapp_th: serverapp_th.join() if server_app_thread_has_exception.is_set(): raise RuntimeError("Exception in ServerApp thread") log(DEBUG, "Stopping Simulation Engine now.") return updated_context # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments def _run_simulation( num_supernodes: int, exit_event: EventType, client_app: Optional[ClientApp] = None, server_app: Optional[ServerApp] = None, backend_name: str = "ray", backend_config: Optional[BackendConfig] = None, client_app_attr: Optional[str] = None, server_app_attr: Optional[str] = None, server_app_run_config: Optional[UserConfig] = None, app_dir: str = "", flwr_dir: Optional[str] = None, run: Optional[Run] = None, enable_tf_gpu_growth: bool = False, verbose_logging: bool = False, is_app: bool = False, ) -> Context: """Launch the Simulation Engine.""" if backend_config is None: backend_config = {} if "init_args" not in backend_config: backend_config["init_args"] = {} # Set default client_resources if not passed if "client_resources" not in backend_config: backend_config["client_resources"] = {"num_cpus": 2, "num_gpus": 0} # Initialization of backend config to enable GPU growth globally when set if "actor" not in backend_config: backend_config["actor"] = {"tensorflow": 0} # Set logging level logger = logging.getLogger("flwr") if verbose_logging: update_console_handler(level=DEBUG, timestamps=True, colored=True) else: backend_config["init_args"]["logging_level"] = backend_config["init_args"].get( "logging_level", WARNING ) backend_config["init_args"]["log_to_driver"] = backend_config["init_args"].get( "log_to_driver", True ) if enable_tf_gpu_growth: # Check that Backend config has also enabled using GPU growth use_tf = backend_config.get("actor", {}).get("tensorflow", False) if not use_tf: log(WARNING, "Enabling GPU growth for your backend.") backend_config["actor"]["tensorflow"] = True # Convert config to original JSON-stream format backend_config_stream = json.dumps(backend_config) # If no `Run` object is set, create one if run is None: run_id = generate_rand_int_from_bytes(RUN_ID_NUM_BYTES) run = Run.create_empty(run_id=run_id) args = ( num_supernodes, backend_name, backend_config_stream, app_dir, is_app, enable_tf_gpu_growth, run, exit_event, flwr_dir, client_app, client_app_attr, server_app, server_app_attr, server_app_run_config, ) # Detect if there is an Asyncio event loop already running. # If yes, disable logger propagation. In environmnets # like Jupyter/Colab notebooks, it's often better to do this. asyncio_loop_running = False try: _ = ( asyncio.get_running_loop() ) # Raises RuntimeError if no event loop is present log(DEBUG, "Asyncio event loop already running.") asyncio_loop_running = True except RuntimeError: pass finally: if asyncio_loop_running: # Set logger propagation to False to prevent duplicated log output in Colab. logger = set_logger_propagation(logger, False) updated_context = _main_loop(*args) return updated_context def _parse_args_run_simulation() -> argparse.ArgumentParser: """Parse flower-simulation command line arguments.""" parser = argparse.ArgumentParser( description="Start a Flower simulation", ) parser.add_argument( "--app", type=str, required=True, help="Path to a directory containing a FAB-like structure with a " "pyproject.toml.", ) parser.add_argument( "--num-supernodes", type=int, required=True, help="Number of simulated SuperNodes.", ) parser.add_argument( "--run-config", default=None, help="Override configuration key-value pairs.", ) parser.add_argument( "--backend", default="ray", type=str, help="Simulation backend that executes the ClientApp.", ) parser.add_argument( "--backend-config", type=str, default="{}", help='A JSON formatted stream, e.g \'{"<keyA>":<value>, "<keyB>":<value>}\' to ' "configure a backend. Values supported in <value> are those included by " "`flwr.common.typing.ConfigsRecordValues`. ", ) parser.add_argument( "--enable-tf-gpu-growth", action="store_true", help="Enables GPU growth on the main thread. This is desirable if you make " "use of a TensorFlow model on your `ServerApp` while having your `ClientApp` " "running on the same GPU. Without enabling this, you might encounter an " "out-of-memory error because TensorFlow by default allocates all GPU memory." "Read more about how `tf.config.experimental.set_memory_growth()` works in " "the TensorFlow documentation: https://www.tensorflow.org/api/stable.", ) parser.add_argument( "--verbose", action="store_true", help="When unset, only INFO, WARNING and ERROR log messages will be shown. " "If set, DEBUG-level logs will be displayed. ", ) parser.add_argument( "--flwr-dir", default=None, help="""The path containing installed Flower Apps. By default, this value is equal to: - `$FLWR_HOME/` if `$FLWR_HOME` is defined - `$XDG_DATA_HOME/.flwr/` if `$XDG_DATA_HOME` is defined - `$HOME/.flwr/` in all other cases """, ) parser.add_argument( "--run-id", type=int, help="Sets the ID of the run started by the Simulation Engine.", ) return parser