Source code for flwr.simulation.app

# Copyright 2021 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower Simulation process."""


import argparse
from logging import DEBUG, ERROR, INFO
from queue import Queue
from time import sleep
from typing import Optional

from flwr.cli.config_utils import get_fab_metadata
from flwr.cli.install import install_from_fab
from flwr.common import EventType
from flwr.common.args import try_obtain_root_certificates
from flwr.common.config import (
    get_flwr_dir,
    get_fused_config_from_dir,
    get_project_config,
    get_project_dir,
    unflatten_dict,
)
from flwr.common.constant import Status, SubStatus
from flwr.common.logger import (
    log,
    mirror_output_to_queue,
    restore_output,
    start_log_uploader,
    stop_log_uploader,
)
from flwr.common.serde import (
    configs_record_from_proto,
    context_from_proto,
    fab_from_proto,
    run_from_proto,
    run_status_to_proto,
)
from flwr.common.typing import RunStatus
from flwr.proto.run_pb2 import (  # pylint: disable=E0611
    GetFederationOptionsRequest,
    GetFederationOptionsResponse,
    UpdateRunStatusRequest,
)
from flwr.proto.simulationio_pb2 import (  # pylint: disable=E0611
    PullSimulationInputsRequest,
    PullSimulationInputsResponse,
    PushSimulationOutputsRequest,
)
from flwr.server.superlink.fleet.vce.backend.backend import BackendConfig
from flwr.simulation.run_simulation import _run_simulation
from flwr.simulation.simulationio_connection import SimulationIoConnection


def flwr_simulation() -> None:
    """Run process-isolated Flower Simulation."""
    # Capture stdout/stderr
    log_queue: Queue[Optional[str]] = Queue()
    mirror_output_to_queue(log_queue)

    parser = argparse.ArgumentParser(
        description="Run a Flower Simulation",
    )
    parser.add_argument(
        "--superlink",
        type=str,
        help="Address of SuperLink's SimulationIO API",
    )
    parser.add_argument(
        "--run-once",
        action="store_true",
        help="When set, this process will start a single simulation "
        "for a pending Run. If no pending run the process will exit. ",
    )
    parser.add_argument(
        "--flwr-dir",
        default=None,
        help="""The path containing installed Flower Apps.
    By default, this value is equal to:

        - `$FLWR_HOME/` if `$FLWR_HOME` is defined
        - `$XDG_DATA_HOME/.flwr/` if `$XDG_DATA_HOME` is defined
        - `$HOME/.flwr/` in all other cases
    """,
    )
    parser.add_argument(
        "--insecure",
        action="store_true",
        help="Run the server without HTTPS, regardless of whether certificate "
        "paths are provided. By default, the server runs with HTTPS enabled. "
        "Use this flag only if you understand the risks.",
    )
    parser.add_argument(
        "--root-certificates",
        metavar="ROOT_CERT",
        type=str,
        help="Specifies the path to the PEM-encoded root certificate file for "
        "establishing secure HTTPS connections.",
    )
    args = parser.parse_args()

    log(INFO, "Starting Flower Simulation")
    certificates = try_obtain_root_certificates(args, args.superlink)

    log(
        DEBUG,
        "Staring isolated `Simulation` connected to SuperLink DriverAPI at %s",
        args.superlink,
    )
    run_simulation_process(
        simulationio_api_address=args.superlink,
        log_queue=log_queue,
        run_once=args.run_once,
        flwr_dir_=args.flwr_dir,
        certificates=certificates,
    )

    # Restore stdout/stderr
    restore_output()


[docs] def run_simulation_process( # pylint: disable=R0914, disable=W0212, disable=R0915 simulationio_api_address: str, log_queue: Queue[Optional[str]], run_once: bool, flwr_dir_: Optional[str] = None, certificates: Optional[bytes] = None, ) -> None: """Run Flower Simulation process.""" conn = SimulationIoConnection( simulationio_service_address=simulationio_api_address, root_certificates=certificates, ) # Resolve directory where FABs are installed flwr_dir = get_flwr_dir(flwr_dir_) log_uploader = None while True: try: # Pull SimulationInputs from LinkState req = PullSimulationInputsRequest() res: PullSimulationInputsResponse = conn._stub.PullSimulationInputs(req) if not res.HasField("run"): sleep(3) run_status = None continue context = context_from_proto(res.context) run = run_from_proto(res.run) fab = fab_from_proto(res.fab) # Start log uploader for this run log_uploader = start_log_uploader( log_queue=log_queue, node_id=context.node_id, run_id=run.run_id, stub=conn._stub, ) log(DEBUG, "Simulation process starts FAB installation.") install_from_fab(fab.content, flwr_dir=flwr_dir, skip_prompt=True) fab_id, fab_version = get_fab_metadata(fab.content) app_path = get_project_dir(fab_id, fab_version, fab.hash_str, flwr_dir) config = get_project_config(app_path) # Get ClientApp and SeverApp components app_components = config["tool"]["flwr"]["app"]["components"] client_app_attr = app_components["clientapp"] server_app_attr = app_components["serverapp"] fused_config = get_fused_config_from_dir(app_path, run.override_config) # Update run_config in context context.run_config = fused_config log( DEBUG, "Flower will load ServerApp `%s` in %s", server_app_attr, app_path, ) log( DEBUG, "Flower will load ClientApp `%s` in %s", client_app_attr, app_path, ) # Change status to Running run_status_proto = run_status_to_proto(RunStatus(Status.RUNNING, "", "")) conn._stub.UpdateRunStatus( UpdateRunStatusRequest(run_id=run.run_id, run_status=run_status_proto) ) # Pull Federation Options fed_opt_res: GetFederationOptionsResponse = conn._stub.GetFederationOptions( GetFederationOptionsRequest(run_id=run.run_id) ) federation_options = configs_record_from_proto( fed_opt_res.federation_options ) # Unflatten underlying dict fed_opt = unflatten_dict({**federation_options}) # Extract configs values of interest num_supernodes = fed_opt.get("num-supernodes") if num_supernodes is None: raise ValueError( "Federation options expects `num-supernodes` to be set." ) backend_config: BackendConfig = fed_opt.get("backend", {}) verbose: bool = fed_opt.get("verbose", False) enable_tf_gpu_growth: bool = fed_opt.get("enable_tf_gpu_growth", True) # Launch the simulation _run_simulation( server_app_attr=server_app_attr, client_app_attr=client_app_attr, num_supernodes=num_supernodes, backend_config=backend_config, app_dir=str(app_path), run=run, enable_tf_gpu_growth=enable_tf_gpu_growth, verbose_logging=verbose, server_app_run_config=fused_config, is_app=True, exit_event=EventType.CLI_FLOWER_SIMULATION_LEAVE, ) # Send resulting context context_proto = None # context_to_proto(updated_context) out_req = PushSimulationOutputsRequest( run_id=run.run_id, context=context_proto ) _ = conn._stub.PushSimulationOutputs(out_req) run_status = RunStatus(Status.FINISHED, SubStatus.COMPLETED, "") except Exception as ex: # pylint: disable=broad-exception-caught exc_entity = "Simulation" log(ERROR, "%s raised an exception", exc_entity, exc_info=ex) run_status = RunStatus(Status.FINISHED, SubStatus.FAILED, str(ex)) finally: # Stop log uploader for this run and upload final logs if log_uploader: stop_log_uploader(log_queue, log_uploader) log_uploader = None # Update run status if run_status: run_status_proto = run_status_to_proto(run_status) conn._stub.UpdateRunStatus( UpdateRunStatusRequest( run_id=run.run_id, run_status=run_status_proto ) ) # Stop the loop if `flwr-simulation` is expected to process a single run if run_once: break