Source code for flwr_datasets.partitioner.distribution_partitioner

# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Distribution partitioner class that works with Hugging Face Datasets."""


from collections import Counter
from typing import Optional, Union

import numpy as np

import datasets
from flwr_datasets.common.typing import NDArray, NDArrayFloat, NDArrayInt
from flwr_datasets.partitioner.partitioner import Partitioner


[docs]class DistributionPartitioner(Partitioner): # pylint: disable=R0902 """Partitioner based on a distribution. Inspired from implementations of Li et al. Federated Optimization in Heterogeneous Networks (2020) https://arxiv.org/abs/1812.06127. Given a 2-dimensional user-specified distribution, the algorithm splits the dataset for each unique label per partition where each label is assigned to the partitions in a deterministic pathological manner. The 1st dimension is the number of unique labels and the 2nd-dimension is the number of buckets into which the samples associated with each label will be divided. That is, given a distribution array of shape, `num_unique_labels_per_partition` x `num_partitions` ( `num_unique_labels`, ---------------------------------------------------- ), `num_unique_labels` the label_id at the i'th row is assigned to the partition_id based on the following approach. First, for an i'th row, generate a list of `id`s according to the formula: id = alpha + beta where, alpha = (i - num_unique_labels_per_partition + 1) \ + (j % num_unique_labels_per_partition), alpha = alpha + (alpha >= 0 ? 0 : num_unique_labels), beta = num_unique_labels * (j // num_unique_labels_per_partition) and j in {0, 1, 2, ..., `num_columns`}. Then, sort the list of `id`s in ascending order. The j'th index in this sorted list corresponds to the partition_id that the i'th unique label (and the underlying distribution array value) will be assigned to. So, for a dataset with 10 unique labels and a configuration with 20 partitions and 2 unique labels per partition, the 0'th row of the distribution array (corresponding to class 0) will be assigned to partitions [0, 9, 10, 19], 1st row (class 1) to [0, 1, 10, 11], 2nd row (class 2) to [1, 2, 11, 12], 3rd row (class 3) to [2, 3, 12, 13], etc ... . Alternatively, the distribution can be interpreted as partition 0 having classes 0 and 1, partition 1 having classes 1 and 2, partition 2 having classes 2 and 3, etc ... The list representing the unique labels is sorted in ascending order. Parameters ---------- distribution_array : Union[NDArrayInt, NDArrayFloat] A 2-dimensional numpy array of the probability distribution of samples for all labels in all partitions. The array shape should be (`num_unique_labels`, `num_unique_labels_per_partition*num_partitions/num_unique_labels`), such that the first row of the array corresponds to the sample distribution of the first unique label (in ascending order). The values may be scaled per label such that the sum of the label distributions across all partitions are equal to the original unpartitioned label distribution - see the `rescale` argument. num_partitions : int The total number of partitions that the data will be divided into. The number of partitions must be an integer multiple of the number of unique labels in the dataset. num_unique_labels_per_partition : int Number of unique labels assigned to a single partition. partition_by : str Column name of the labels (targets) based on which sampling works. preassigned_num_samples_per_label : int The number of samples that each unique label in each partition will first be assigned before the `distribution_array` values are assigned. This value has no effect if `rescale` is set to False. rescale : bool, default=True Whether to partition samples according to the values in `distribution_array` or rescale based on the original unpartitioned class label distribution. `float` values are rounded to the nearest `int`. All samples for any label_id are exhausted during the partitioning by randomly assigning any unassigned samples from round-off errors to one of the label_id's partition_ids. shuffle : bool, default=True Whether to randomize the order of samples. Shuffling applied after the samples assignment to nodes. seed : int, default=42 Seed used for dataset shuffling. It has no effect if `shuffle` is False. Examples -------- In order to reproduce the power-law distrbution of the paper, follow this setup: >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.partitioner import DistributionPartitioner >>> from pprint import pprint >>> import numpy as np >>> >>> num_partitions = 1_000 >>> num_unique_labels_per_partition = 2 >>> num_unique_labels = 10 >>> preassigned_num_samples_per_label = 5 >>> >>> # Generate a vector from a log-normal probability distribution >>> rng = np.random.default_rng(2024) >>> mu, sigma = 0., 2. >>> distribution_array = rng.lognormal( >>> mu, >>> sigma, >>> (num_partitions*num_unique_labels_per_partition), >>> ) >>> distribution_array = distribution_array.reshape((num_unique_labels, -1)) >>> >>> partitioner = DistributionPartitioner( >>> distribution_array=distribution_array, >>> num_partitions=num_partitions, >>> num_unique_labels_per_partition=num_unique_labels_per_partition, >>> partition_by="label", # MNIST dataset has a target column `label` >>> preassigned_num_samples_per_label=preassigned_num_samples_per_label, >>> ) >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": partitioner}) >>> partition = fds.load_partition(0) >>> print(partition[0]) # Print the first example {'image': <PIL.PngImagePlugin.PngImageFile image mode=L size=28x28 at 0x169DD54D0>, 'label': 0} >>> distributions = { >>> partition_id: fds.load_partition(partition_id=partition_id) >>> .to_pandas()["label"] >>> .value_counts() >>> .to_dict() >>> for partition_id in range(10) >>> } >>> pprint(distributions) {0: {0: 40, 1: 5}, 1: {2: 36, 1: 5}, 2: {3: 52, 2: 7}, 3: {3: 14, 4: 6}, 4: {4: 47, 5: 28}, 5: {6: 30, 5: 5}, 6: {6: 19, 7: 11}, 7: {8: 22, 7: 11}, 8: {9: 11, 8: 5}, 9: {0: 124, 9: 13}} """ def __init__( # pylint: disable=R0913 self, distribution_array: Union[NDArrayInt, NDArrayFloat], num_partitions: int, num_unique_labels_per_partition: int, partition_by: str, preassigned_num_samples_per_label: int, rescale: bool = True, shuffle: bool = True, seed: Optional[int] = 42, ) -> None: super().__init__() # Attributes based on the constructor self._distribution_array = distribution_array self._num_partitions = num_partitions self._num_unique_labels_per_partition = num_unique_labels_per_partition self._partition_by = partition_by self._preassigned_num_samples_per_label = preassigned_num_samples_per_label self._rescale = rescale self._shuffle = shuffle self._seed = seed self._rng = np.random.default_rng(seed=self._seed) # NumPy random generator # Utility attributes # The attributes below are determined during the first call to load_partition self._num_unique_labels: int = 0 self._num_columns: int = 0 self._partition_id_to_indices_determined = False self._partition_id_to_indices: dict[int, list[int]] = {}
[docs] def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a partition based on the partition index. Parameters ---------- partition_id : int the index that corresponds to the requested partition Returns ------- dataset_partition : Dataset single partition of a dataset """ # The partitioning is done lazily - only when the first partition is # requested. Only the first call creates the indices assignments for all the # partition indices. self._check_distribution_array_shape_if_needed() self._check_num_unique_labels_per_partition_if_needed() self._check_distribution_array_sum_if_needed() self._check_num_partitions_correctness_if_needed() self._determine_partition_id_to_indices_if_needed() return self.dataset.select(self._partition_id_to_indices[partition_id])
@property def num_partitions(self) -> int: """Total number of partitions.""" return self._num_partitions def _determine_partition_id_to_indices_if_needed( # pylint: disable=R0914 self, ) -> None: """Create an assignment of indices to the partition indices.""" if self._partition_id_to_indices_determined: return # Compute the label distribution from the dataset unique_labels = sorted(self.dataset.unique(self._partition_by)) labels = np.asarray(self.dataset[self._partition_by]) unique_label_to_indices = {} unique_label_distribution = {} for unique_label in unique_labels: unique_label_to_indices[unique_label] = np.where(labels == unique_label)[0] unique_label_distribution[unique_label] = len( unique_label_to_indices[unique_label] ) if self._rescale: # Compute the normalized distribution for each class label self._distribution_array = self._distribution_array / np.sum( self._distribution_array, axis=-1, keepdims=True ) # Compute the total preassigned number of samples per label for all labels # and partitions. This sum will be subtracted from the label distribution # of the original dataset, and added back later. It ensures that # (1) each partition will have at least # `self._preassigned_num_samples_per_label`, and # (2) there is sufficient indices to sample from the dataset. total_preassigned_samples = int( self._preassigned_num_samples_per_label * self._num_columns ) label_distribution = np.fromiter( unique_label_distribution.values(), dtype=float, ) self._check_total_preassigned_samples_within_limit( label_distribution, total_preassigned_samples ) # Subtract the preassigned total amount from the label distribution, # we'll add these back later. label_distribution -= total_preassigned_samples # Rescale normalized distribution with the actual label distribution. # Each row represents the number of samples to be taken for that class label # and the sum of each row equals the total of each class label. label_sampling_matrix = np.floor( self._distribution_array * label_distribution[:, np.newaxis] ).astype(int) # Add back the preassigned total amount label_sampling_matrix += self._preassigned_num_samples_per_label else: label_sampling_matrix = self._distribution_array.astype(int) # Create the label sampling dictionary label_samples = dict( zip(unique_label_distribution.keys(), label_sampling_matrix) ) # Create indices split from dataset split_indices_per_label = {} for unique_label in unique_labels: # Compute cumulative sum of samples to identify splitting points cumsum_division_numbers = np.cumsum(label_samples[unique_label]) split_indices = np.split( unique_label_to_indices[unique_label], cumsum_division_numbers ) if self._rescale: # Randomly append unassigned samples (which are in the last split that # exceeds `self._num_columns`) to one of the `self._num_columns` # partitions. Unassigned samples originate from float-to-int rounding # errors of the normalizing algorithm. if len(split_indices) > self._num_columns: last_split = split_indices.pop() random_index = self._rng.integers(0, self._num_columns) split_indices[random_index] = np.append( split_indices[random_index], last_split ) assert len(split_indices) == self._num_columns split_indices_per_label[unique_label] = split_indices # Initialize sampling tracker. Keys are the unique class labels. # Values are the smallest indices of each array in `label_samples` # which will be sampled next. Once a sample is taken from a label/key, # increment the value (index) by 1. index_tracker = {k: 0 for k in unique_labels} # Prepare data structure to store indices assigned to partition ids self._partition_id_to_indices = { partition_id: [] for partition_id in range(self._num_partitions) } for partition_id in range(self._num_partitions): # Get the `num_unique_labels_per_partition` labels for each partition. Use # `numpy.roll` to get indices of adjacent sorted labels for pathological # label distributions. labels_per_client = np.roll(unique_labels, -partition_id)[ : self._num_unique_labels_per_partition ] for label in labels_per_client: index_to_sample = index_tracker[label] self._partition_id_to_indices[partition_id].extend( split_indices_per_label[label][index_to_sample] ) index_tracker[label] += 1 # Shuffle the indices to avoid datasets with targets in sequences like # [00000, 11111, ...]) if the shuffle is True if self._shuffle: for indices in self._partition_id_to_indices.values(): # In place shuffling self._rng.shuffle(indices) self._partition_id_to_indices_determined = True def _check_distribution_array_shape_if_needed(self) -> None: """Test distribution array shape correctness.""" if not self._partition_id_to_indices_determined: if not isinstance(self._distribution_array, np.ndarray): raise TypeError("Input must be a NumPy array.") if self._distribution_array.ndim != 2: raise ValueError("The distribution array is not 2-dimensional.") self._num_unique_labels = len(self.dataset.unique(self._partition_by)) self._num_columns = int( self._num_unique_labels_per_partition * self._num_partitions / self._num_unique_labels ) if self._distribution_array.shape[0] != self._num_unique_labels: raise ValueError( "The expected number of rows in `distribution_array` must equal to " "the number of unique labels in the dataset, which is " f"{self._num_unique_labels}, but the number of rows in " f"`distribution_array` is {self._distribution_array.shape[0]}." ) if self._distribution_array.shape[1] != self._num_columns: raise ValueError( "The expected number of columns in `distribution_array` is " f"{self._num_columns} (refer to the documentation for the " "expression), but the number of columns in `distribution_array` " f"is {self._distribution_array.shape[1]}." ) def _check_num_unique_labels_per_partition_if_needed(self) -> None: """Test number of unique labels do not exceed self.num_unique_labels.""" if self._num_unique_labels_per_partition > self._num_unique_labels: raise ValueError( "The specified `num_unique_labels_per_partition`" f"={self._num_unique_labels_per_partition} is greater than the number " f"of unique labels in the given dataset={self._num_unique_labels} " f"as specified by the label column `{self._partition_by}`." "Reduce the `num_unique_labels_per_partition` or make use of a " "different dataset to apply this partitioning." ) def _check_distribution_array_sum_if_needed(self) -> None: """Test correctness of distribution array sum.""" if not self._partition_id_to_indices_determined and not self._rescale: labels = self.dataset[self._partition_by] unique_labels_counter = sorted(Counter(labels).items()) unique_labels_counter_vals = [v for _, v in unique_labels_counter] if any(self._distribution_array.sum(1) > unique_labels_counter_vals): raise ValueError( "The sum of at least one unique label distribution array " "exceeds that of the unique labels counter in the given dataset= " f"{dict(unique_labels_counter)}." ) def _check_num_partitions_correctness_if_needed(self) -> None: """Test num_partitions when the dataset is given.""" if not self._partition_id_to_indices_determined: if self._num_partitions > self.dataset.num_rows: raise ValueError( f"The number of partitions ({self._num_partitions}) needs to be " "smaller than the number of samples in the dataset " f"({self.dataset.num_rows})." ) if self._num_partitions % self._num_unique_labels != 0: raise ValueError( f"The number of partitions ({self._num_partitions}) is not " f"divisible by the number of unique labels " f"{({self._num_unique_labels})}." ) if not self._num_partitions > 0: raise ValueError( "The number of partitions needs to be greater than zero." ) def _check_total_preassigned_samples_within_limit( self, label_distribution: NDArray, total_preassigned_samples: int ) -> None: """Test total preassigned samples do not exceed minimum allowable.""" if any(label_distribution - total_preassigned_samples < self._num_columns): raise ValueError( "There is insufficient samples to partition by applying the specified " "`preassigned_num_samples_per_label`" f"={self._preassigned_num_samples_per_label}. Reduce the " "`preassigned_num_samples_per_label` or use a different dataset with " "more samples to apply this partition." )