Source code for flwr_datasets.partitioner.continuous_partitioner

# Copyright 2025 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Continuous partitioner class that works with Hugging Face Datasets."""


# pylint: disable=R0913, R0917
from typing import Optional

import numpy as np

from datasets import Dataset
from flwr_datasets.partitioner.partitioner import Partitioner


[docs] class ContinuousPartitioner( Partitioner ): # pylint: disable=too-many-instance-attributes r"""Partitioner based on a real-valued dataset property with adjustable strictness. This partitioner enables non-IID partitioning by sorting the dataset according to a continuous (i.e., real-valued, not categorical) property and introducing controlled noise to adjust the level of heterogeneity. To interpolate between IID and non-IID partitioning, a `strictness` parameter (𝜎 ∈ [0, 1]) blends a standardized property vector (z ∈ ℝⁿ) with Gaussian noise (ε ~ 𝒩(0, I)), producing blended scores: .. math:: b = \sigma \cdot z + (1 - \sigma) \cdot ε Samples are then sorted by `b` to assign them to partitions. When `strictness` is 0, partitioning is purely random (IID), while a value of 1 strictly follows the property ranking (strongly non-IID). Parameters ---------- num_partitions : int Number of partitions to create. partition_by : str Name of the continuous feature to partition the dataset on. strictness : float Controls how strongly the feature influences partitioning (0 = iid, 1 = non-iid). shuffle : bool Whether to shuffle the indices within each partition (default: True). seed : Optional[int] Random seed for reproducibility. Examples -------- >>> from datasets import Dataset >>> import numpy as np >>> import pandas as pd >>> from flwr_datasets.partitioner import ContinuousPartitioner >>> import matplotlib.pyplot as plt >>> >>> # Create synthetic data >>> df = pd.DataFrame({ >>> "continuous": np.linspace(0, 10, 10_000), >>> "category": np.random.choice([0, 1, 2, 3], size=10_000) >>> }) >>> hf_dataset = Dataset.from_pandas(df) >>> >>> # Partition dataset >>> partitioner = ContinuousPartitioner( >>> num_partitions=5, >>> partition_by="continuous", >>> strictness=0.7, >>> shuffle=True >>> ) >>> partitioner.dataset = hf_dataset >>> >>> # Plot partitions >>> plt.figure(figsize=(10, 6)) >>> for i in range(5): >>> plt.hist( >>> partitioner.load_partition(i)["continuous"], >>> bins=64, >>> alpha=0.5, >>> label=f"Partition {i}" >>> ) >>> plt.legend() >>> plt.xlabel("Continuous Value") >>> plt.ylabel("Frequency") >>> plt.title("Partition distributions") >>> plt.grid(True) >>> plt.show() """ def __init__( self, num_partitions: int, partition_by: str, strictness: float, shuffle: bool = True, seed: Optional[int] = 42, ) -> None: super().__init__() if not 0 <= strictness <= 1: raise ValueError("`strictness` must be between 0 and 1") if num_partitions <= 0: raise ValueError("`num_partitions` must be greater than 0") self._num_partitions = num_partitions self._partition_by = partition_by self._strictness = strictness self._shuffle = shuffle self._seed = seed self._rng = np.random.default_rng(seed) # Lazy initialization self._partition_id_to_indices: dict[int, list[int]] = {} self._partition_id_to_indices_determined = False
[docs] def load_partition(self, partition_id: int) -> Dataset: """Load a single partition based on the partition index. Parameters ---------- partition_id : int The index that corresponds to the requested partition. Returns ------- dataset_partition : Dataset A single dataset partition. """ self._check_and_generate_partitions_if_needed() return self.dataset.select(self._partition_id_to_indices[partition_id])
@property def num_partitions(self) -> int: """Total number of partitions.""" self._check_and_generate_partitions_if_needed() return self._num_partitions @property def partition_id_to_indices(self) -> dict[int, list[int]]: """Mapping from partition ID to dataset indices.""" self._check_and_generate_partitions_if_needed() return self._partition_id_to_indices def _check_and_generate_partitions_if_needed(self) -> None: """Lazy evaluation of the partitioning logic.""" if self._partition_id_to_indices_determined: return if self._num_partitions > self.dataset.num_rows: raise ValueError( "Number of partitions must be less than or equal to number of dataset samples." ) # Extract property values property_values = np.array(self.dataset[self._partition_by], dtype=np.float32) # Check for missing values (None or NaN) if np.any(property_values is None) or np.isnan(property_values).any(): raise ValueError( f"The column '{self._partition_by}' contains None or NaN values, " f"which are not supported by {self.__class__.__qualname__}. " "Please clean or filter your dataset before partitioning." ) # Standardize std = np.std(property_values) if std < 1e-6 and self._strictness > 0: raise ValueError( f"Cannot standardize column '{self._partition_by}' " f"because it has near-zero std (std={std}). " "All values are nearly identical, which prevents meaningful non-IID partitioning. " "To resolve this, choose a different partition property " "or set strictness to 0 to enable IID partitioning." ) standardized_values = (property_values - np.mean(property_values)) / std # Blend noise noise = self._rng.normal(loc=0, scale=1, size=len(standardized_values)) blended_values = ( self._strictness * standardized_values + (1 - self._strictness) * noise ) # Sort and partition sorted_indices = np.argsort(blended_values) partition_indices = np.array_split(sorted_indices, self._num_partitions) for pid, indices in enumerate(partition_indices): indices_list = indices.tolist() if self._shuffle: self._rng.shuffle(indices_list) self._partition_id_to_indices[pid] = indices_list self._partition_id_to_indices_determined = True