Source code for flwr_datasets.partitioner.continuous_partitioner
# Copyright 2025 Flower Labs GmbH. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# =============================================================================="""Continuous partitioner class that works with Hugging Face Datasets."""# pylint: disable=R0913, R0917fromtypingimportOptionalimportnumpyasnpfromdatasetsimportDatasetfromflwr_datasets.partitioner.partitionerimportPartitioner
[docs]defload_partition(self,partition_id:int)->Dataset:"""Load a single partition based on the partition index. Parameters ---------- partition_id : int The index that corresponds to the requested partition. Returns ------- dataset_partition : Dataset A single dataset partition. """self._check_and_generate_partitions_if_needed()returnself.dataset.select(self._partition_id_to_indices[partition_id])
@propertydefnum_partitions(self)->int:"""Total number of partitions."""self._check_and_generate_partitions_if_needed()returnself._num_partitions@propertydefpartition_id_to_indices(self)->dict[int,list[int]]:"""Mapping from partition ID to dataset indices."""self._check_and_generate_partitions_if_needed()returnself._partition_id_to_indicesdef_check_and_generate_partitions_if_needed(self)->None:"""Lazy evaluation of the partitioning logic."""ifself._partition_id_to_indices_determined:returnifself._num_partitions>self.dataset.num_rows:raiseValueError("Number of partitions must be less than or equal to number of dataset samples.")# Extract property valuesproperty_values=np.array(self.dataset[self._partition_by],dtype=np.float32)# Check for missing values (None or NaN)ifnp.any(property_valuesisNone)ornp.isnan(property_values).any():raiseValueError(f"The column '{self._partition_by}' contains None or NaN values, "f"which are not supported by {self.__class__.__qualname__}. ""Please clean or filter your dataset before partitioning.")# Standardizestd=np.std(property_values)ifstd<1e-6andself._strictness>0:raiseValueError(f"Cannot standardize column '{self._partition_by}' "f"because it has near-zero std (std={std}). ""All values are nearly identical, which prevents meaningful non-IID partitioning. ""To resolve this, choose a different partition property ""or set strictness to 0 to enable IID partitioning.")standardized_values=(property_values-np.mean(property_values))/std# Blend noisenoise=self._rng.normal(loc=0,scale=1,size=len(standardized_values))blended_values=(self._strictness*standardized_values+(1-self._strictness)*noise)# Sort and partitionsorted_indices=np.argsort(blended_values)partition_indices=np.array_split(sorted_indices,self._num_partitions)forpid,indicesinenumerate(partition_indices):indices_list=indices.tolist()ifself._shuffle:self._rng.shuffle(indices_list)self._partition_id_to_indices[pid]=indices_listself._partition_id_to_indices_determined=True