Source code for flwr.simulation.simulationio_connection
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# =============================================================================="""Flower SimulationIo connection."""fromloggingimportDEBUG,WARNINGfromtypingimportOptional,castimportgrpcfromflwr.common.constantimportSIMULATIONIO_API_DEFAULT_ADDRESSfromflwr.common.grpcimportcreate_channelfromflwr.common.loggerimportlogfromflwr.proto.simulationio_pb2_grpcimportSimulationIoStub# pylint: disable=E0611
[docs]classSimulationIoConnection:"""`SimulationIoConnection` provides an interface to the SimulationIo API. Parameters ---------- simulationio_service_address : str (default: "[::]:9094") The address (URL, IPv6, IPv4) of the SuperLink SimulationIo API service. root_certificates : Optional[bytes] (default: None) The PEM-encoded root certificates as a byte string. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. """def__init__(# pylint: disable=too-many-argumentsself,simulationio_service_address:str=SIMULATIONIO_API_DEFAULT_ADDRESS,root_certificates:Optional[bytes]=None,)->None:self._addr=simulationio_service_addressself._cert=root_certificatesself._grpc_stub:Optional[SimulationIoStub]=Noneself._channel:Optional[grpc.Channel]=None@propertydef_is_connected(self)->bool:"""Check if connected to the SimulationIo API server."""returnself._channelisnotNone@propertydef_stub(self)->SimulationIoStub:"""SimulationIo stub."""ifnotself._is_connected:self._connect()returncast(SimulationIoStub,self._grpc_stub)def_connect(self)->None:"""Connect to the SimulationIo API."""ifself._is_connected:log(WARNING,"Already connected")returnself._channel=create_channel(server_address=self._addr,insecure=(self._certisNone),root_certificates=self._cert,)self._grpc_stub=SimulationIoStub(self._channel)log(DEBUG,"[SimulationIO] Connected to %s",self._addr)def_disconnect(self)->None:"""Disconnect from the SimulationIo API."""ifnotself._is_connected:log(DEBUG,"Already disconnected")returnchannel:grpc.Channel=self._channelself._channel=Noneself._grpc_stub=Nonechannel.close()log(DEBUG,"[SimulationIO] Disconnected")