Source code for flwr_datasets.partitioner.continuous_partitioner
# Copyright 2025 Flower Labs GmbH. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# =============================================================================="""Continuous partitioner class that works with Hugging Face Datasets."""# pylint: disable=R0913, R0917fromtypingimportOptionalimportnumpyasnpfromdatasetsimportDatasetfromflwr_datasets.partitioner.partitionerimportPartitioner
[docs]classContinuousPartitioner(Partitioner):# pylint: disable=too-many-instance-attributesr"""Partitioner based on a real-valued dataset property with adjustable strictness. This partitioner enables non-IID partitioning by sorting the dataset according to a continuous (i.e., real-valued, not categorical) property and introducing controlled noise to adjust the level of heterogeneity. To interpolate between IID and non-IID partitioning, a `strictness` parameter (𝜎 ∈ [0, 1]) blends a standardized property vector (z ∈ ℝⁿ) with Gaussian noise (ε ~ 𝒩(0, I)), producing blended scores: .. math:: b = \sigma \cdot z + (1 - \sigma) \cdot ε Samples are then sorted by `b` to assign them to partitions. When `strictness` is 0, partitioning is purely random (IID), while a value of 1 strictly follows the property ranking (strongly non-IID). Parameters ---------- num_partitions : int Number of partitions to create. partition_by : str Name of the continuous feature to partition the dataset on. strictness : float Controls how strongly the feature influences partitioning (0 = iid, 1 = non-iid). shuffle : bool Whether to shuffle the indices within each partition (default: True). seed : Optional[int] Random seed for reproducibility. Examples -------- >>> from datasets import Dataset >>> import numpy as np >>> import pandas as pd >>> from flwr_datasets.partitioner import ContinuousPartitioner >>> import matplotlib.pyplot as plt >>> >>> # Create synthetic data >>> df = pd.DataFrame({ >>> "continuous": np.linspace(0, 10, 10_000), >>> "category": np.random.choice([0, 1, 2, 3], size=10_000) >>> }) >>> hf_dataset = Dataset.from_pandas(df) >>> >>> # Partition dataset >>> partitioner = ContinuousPartitioner( >>> num_partitions=5, >>> partition_by="continuous", >>> strictness=0.7, >>> shuffle=True >>> ) >>> partitioner.dataset = hf_dataset >>> >>> # Plot partitions >>> plt.figure(figsize=(10, 6)) >>> for i in range(5): >>> plt.hist( >>> partitioner.load_partition(i)["continuous"], >>> bins=64, >>> alpha=0.5, >>> label=f"Partition {i}" >>> ) >>> plt.legend() >>> plt.xlabel("Continuous Value") >>> plt.ylabel("Frequency") >>> plt.title("Partition distributions") >>> plt.grid(True) >>> plt.show() """def__init__(self,num_partitions:int,partition_by:str,strictness:float,shuffle:bool=True,seed:Optional[int]=42,)->None:super().__init__()ifnot0<=strictness<=1:raiseValueError("`strictness` must be between 0 and 1")ifnum_partitions<=0:raiseValueError("`num_partitions` must be greater than 0")self._num_partitions=num_partitionsself._partition_by=partition_byself._strictness=strictnessself._shuffle=shuffleself._seed=seedself._rng=np.random.default_rng(seed)# Lazy initializationself._partition_id_to_indices:dict[int,list[int]]={}self._partition_id_to_indices_determined=False
[docs]defload_partition(self,partition_id:int)->Dataset:"""Load a single partition based on the partition index. Parameters ---------- partition_id : int The index that corresponds to the requested partition. Returns ------- dataset_partition : Dataset A single dataset partition. """self._check_and_generate_partitions_if_needed()returnself.dataset.select(self._partition_id_to_indices[partition_id])
@propertydefnum_partitions(self)->int:"""Total number of partitions."""self._check_and_generate_partitions_if_needed()returnself._num_partitions@propertydefpartition_id_to_indices(self)->dict[int,list[int]]:"""Mapping from partition ID to dataset indices."""self._check_and_generate_partitions_if_needed()returnself._partition_id_to_indicesdef_check_and_generate_partitions_if_needed(self)->None:"""Lazy evaluation of the partitioning logic."""ifself._partition_id_to_indices_determined:returnifself._num_partitions>self.dataset.num_rows:raiseValueError("Number of partitions must be less than or equal to number of dataset samples.")# Extract property valuesproperty_values=np.array(self.dataset[self._partition_by],dtype=np.float32)# Check for missing values (None or NaN)ifnp.any(property_valuesisNone)ornp.isnan(property_values).any():raiseValueError(f"The column '{self._partition_by}' contains None or NaN values, "f"which are not supported by {self.__class__.__qualname__}. ""Please clean or filter your dataset before partitioning.")# Standardizestd=np.std(property_values)ifstd<1e-6andself._strictness>0:raiseValueError(f"Cannot standardize column '{self._partition_by}' "f"because it has near-zero std (std={std}). ""All values are nearly identical, which prevents meaningful non-IID partitioning. ""To resolve this, choose a different partition property ""or set strictness to 0 to enable IID partitioning.")standardized_values=(property_values-np.mean(property_values))/std# Blend noisenoise=self._rng.normal(loc=0,scale=1,size=len(standardized_values))blended_values=(self._strictness*standardized_values+(1-self._strictness)*noise)# Sort and partitionsorted_indices=np.argsort(blended_values)partition_indices=np.array_split(sorted_indices,self._num_partitions)forpid,indicesinenumerate(partition_indices):indices_list=indices.tolist()ifself._shuffle:self._rng.shuffle(indices_list)self._partition_id_to_indices[pid]=indices_listself._partition_id_to_indices_determined=True