# Copyright 2020 Flower Labs GmbH. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# =============================================================================="""Flower ClientManager."""importrandomimportthreadingfromabcimportABC,abstractmethodfromloggingimportINFOfromtypingimportOptionalfromflwr.common.loggerimportlogfrom.client_proxyimportClientProxyfrom.criterionimportCriterion
[docs]classClientManager(ABC):"""Abstract base class for managing Flower clients."""
[docs]@abstractmethoddefnum_available(self)->int:"""Return the number of available clients. Returns ------- num_available : int The number of currently available clients. """
[docs]@abstractmethoddefregister(self,client:ClientProxy)->bool:"""Register Flower ClientProxy instance. Parameters ---------- client : flwr.server.client_proxy.ClientProxy The ClientProxy of the Client to register. Returns ------- success : bool Indicating if registration was successful. False if ClientProxy is already registered or can not be registered for any reason. """
[docs]@abstractmethoddefunregister(self,client:ClientProxy)->None:"""Unregister Flower ClientProxy instance. This method is idempotent. Parameters ---------- client : flwr.server.client_proxy.ClientProxy The ClientProxy of the Client to unregister. """
[docs]@abstractmethoddefall(self)->dict[str,ClientProxy]:"""Return all available clients."""
[docs]@abstractmethoddefwait_for(self,num_clients:int,timeout:int)->bool:"""Wait until at least `num_clients` are available."""
[docs]@abstractmethoddefsample(self,num_clients:int,min_num_clients:Optional[int]=None,criterion:Optional[Criterion]=None,)->list[ClientProxy]:"""Sample a number of Flower ClientProxy instances."""
[docs]classSimpleClientManager(ClientManager):"""Provides a pool of available clients."""def__init__(self)->None:self.clients:dict[str,ClientProxy]={}self._cv=threading.Condition()def__len__(self)->int:"""Return the number of available clients. Returns ------- num_available : int The number of currently available clients. """returnlen(self.clients)
[docs]defnum_available(self)->int:"""Return the number of available clients. Returns ------- num_available : int The number of currently available clients. """returnlen(self)
[docs]defwait_for(self,num_clients:int,timeout:int=86400)->bool:"""Wait until at least `num_clients` are available. Blocks until the requested number of clients is available or until a timeout is reached. Current timeout default: 1 day. Parameters ---------- num_clients : int The number of clients to wait for. timeout : int The time in seconds to wait for, defaults to 86400 (24h). Returns ------- success : bool """withself._cv:returnself._cv.wait_for(lambda:len(self.clients)>=num_clients,timeout=timeout)
[docs]defregister(self,client:ClientProxy)->bool:"""Register Flower ClientProxy instance. Parameters ---------- client : flwr.server.client_proxy.ClientProxy Returns ------- success : bool Indicating if registration was successful. False if ClientProxy is already registered or can not be registered for any reason. """ifclient.cidinself.clients:returnFalseself.clients[client.cid]=clientwithself._cv:self._cv.notify_all()returnTrue
[docs]defunregister(self,client:ClientProxy)->None:"""Unregister Flower ClientProxy instance. This method is idempotent. Parameters ---------- client : flwr.server.client_proxy.ClientProxy """ifclient.cidinself.clients:delself.clients[client.cid]withself._cv:self._cv.notify_all()
[docs]defall(self)->dict[str,ClientProxy]:"""Return all available clients."""returnself.clients
[docs]defsample(self,num_clients:int,min_num_clients:Optional[int]=None,criterion:Optional[Criterion]=None,)->list[ClientProxy]:"""Sample a number of Flower ClientProxy instances."""# Block until at least num_clients are connected.ifmin_num_clientsisNone:min_num_clients=num_clientsself.wait_for(min_num_clients)# Sample clients which meet the criterionavailable_cids=list(self.clients)ifcriterionisnotNone:available_cids=[cidforcidinavailable_cidsifcriterion.select(self.clients[cid])]ifnum_clients>len(available_cids):log(INFO,"Sampling failed: number of available clients"" (%s) is less than number of requested clients (%s).",len(available_cids),num_clients,)return[]sampled_cids=random.sample(available_cids,num_clients)return[self.clients[cid]forcidinsampled_cids]