DistributionPartitioner

class DistributionPartitioner(distribution_array: ndarray[Any, dtype[int64]] | ndarray[Any, dtype[float64]], num_partitions: int, num_unique_labels_per_partition: int, partition_by: str, preassigned_num_samples_per_label: int, rescale: bool = True, shuffle: bool = True, seed: int | None = 42)[source]

Bases: Partitioner

Partitioner based on a distribution.

Inspired from implementations of Li et al. Federated Optimization in Heterogeneous Networks (2020) https://arxiv.org/abs/1812.06127.

Given a 2-dimensional user-specified distribution, the algorithm splits the dataset for each unique label per partition where each label is assigned to the partitions in a deterministic pathological manner. The 1st dimension is the number of unique labels and the 2nd-dimension is the number of buckets into which the samples associated with each label will be divided. That is, given a distribution array of shape,

num_unique_labels_per_partition x num_partitions

( num_unique_labels, —————————————————- ),

num_unique_labels

the label_id at the i’th row is assigned to the partition_id based on the following approach.

First, for an i’th row, generate a list of `id`s according to the formula:

id = alpha + beta

where,

alpha = (i - num_unique_labels_per_partition + 1) + (j % num_unique_labels_per_partition), alpha = alpha + (alpha >= 0 ? 0 : num_unique_labels), beta = num_unique_labels * (j // num_unique_labels_per_partition)

and j in {0, 1, 2, …, num_columns}. Then, sort the list of `id`s in ascending order. The j’th index in this sorted list corresponds to the partition_id that the i’th unique label (and the underlying distribution array value) will be assigned to. So, for a dataset with 10 unique labels and a configuration with 20 partitions and 2 unique labels per partition, the 0’th row of the distribution array (corresponding to class 0) will be assigned to partitions [0, 9, 10, 19], 1st row (class 1) to [0, 1, 10, 11], 2nd row (class 2) to [1, 2, 11, 12], 3rd row (class 3) to [2, 3, 12, 13], etc … . Alternatively, the distribution can be interpreted as partition 0 having classes 0 and 1, partition 1 having classes 1 and 2, partition 2 having classes 2 and 3, etc … The list representing the unique labels is sorted in ascending order.

Parameters:
  • distribution_array (Union[NDArrayInt, NDArrayFloat]) – A 2-dimensional numpy array of the probability distribution of samples for all labels in all partitions. The array shape should be (num_unique_labels, num_unique_labels_per_partition*num_partitions/num_unique_labels), such that the first row of the array corresponds to the sample distribution of the first unique label (in ascending order). The values may be scaled per label such that the sum of the label distributions across all partitions are equal to the original unpartitioned label distribution - see the rescale argument.

  • num_partitions (int) – The total number of partitions that the data will be divided into. The number of partitions must be an integer multiple of the number of unique labels in the dataset.

  • num_unique_labels_per_partition (int) – Number of unique labels assigned to a single partition.

  • partition_by (str) – Column name of the labels (targets) based on which sampling works.

  • preassigned_num_samples_per_label (int) – The number of samples that each unique label in each partition will first be assigned before the distribution_array values are assigned. This value has no effect if rescale is set to False.

  • rescale (bool, default=True) – Whether to partition samples according to the values in distribution_array or rescale based on the original unpartitioned class label distribution. float values are rounded to the nearest int. All samples for any label_id are exhausted during the partitioning by randomly assigning any unassigned samples from round-off errors to one of the label_id’s partition_ids.

  • shuffle (bool, default=True) – Whether to randomize the order of samples. Shuffling applied after the samples assignment to nodes.

  • seed (int, default=42) – Seed used for dataset shuffling. It has no effect if shuffle is False.

Examples

In order to reproduce the power-law distrbution of the paper, follow this setup:

>>> from flwr_datasets import FederatedDataset
>>> from flwr_datasets.partitioner import DistributionPartitioner
>>> from pprint import pprint
>>> import numpy as np
>>>
>>> num_partitions = 1_000
>>> num_unique_labels_per_partition = 2
>>> num_unique_labels = 10
>>> preassigned_num_samples_per_label = 5
>>>
>>> # Generate a vector from a log-normal probability distribution
>>> rng = np.random.default_rng(2024)
>>> mu, sigma = 0., 2.
>>> distribution_array = rng.lognormal(
>>>     mu,
>>>     sigma,
>>>     (num_partitions*num_unique_labels_per_partition),
>>> )
>>> distribution_array = distribution_array.reshape((num_unique_labels, -1))
>>>
>>> partitioner = DistributionPartitioner(
>>>     distribution_array=distribution_array,
>>>     num_partitions=num_partitions,
>>>     num_unique_labels_per_partition=num_unique_labels_per_partition,
>>>     partition_by="label",  # MNIST dataset has a target column `label`
>>>     preassigned_num_samples_per_label=preassigned_num_samples_per_label,
>>> )
>>> fds = FederatedDataset(dataset="mnist", partitioners={"train": partitioner})
>>> partition = fds.load_partition(0)
>>> print(partition[0])  # Print the first example
{'image': <PIL.PngImagePlugin.PngImageFile image mode=L size=28x28 at 0x169DD54D0>,
'label': 0}
>>> distributions = {
>>>     partition_id: fds.load_partition(partition_id=partition_id)
>>>     .to_pandas()["label"]
>>>     .value_counts()
>>>     .to_dict()
>>>     for partition_id in range(10)
>>> }
>>> pprint(distributions)
{0: {0: 40, 1: 5},
 1: {2: 36, 1: 5},
 2: {3: 52, 2: 7},
 3: {3: 14, 4: 6},
 4: {4: 47, 5: 28},
 5: {6: 30, 5: 5},
 6: {6: 19, 7: 11},
 7: {8: 22, 7: 11},
 8: {9: 11, 8: 5},
 9: {0: 124, 9: 13}}

Methods

is_dataset_assigned()

Check if a dataset has been assigned to the partitioner.

load_partition(partition_id)

Load a partition based on the partition index.

Attributes

dataset

Dataset property.

num_partitions

Total number of partitions.

property dataset: Dataset

Dataset property.

is_dataset_assigned() bool

Check if a dataset has been assigned to the partitioner.

This method returns True if a dataset is already set for the partitioner, otherwise, it returns False.

Returns:

dataset_assigned – True if a dataset is assigned, otherwise False.

Return type:

bool

load_partition(partition_id: int) Dataset[source]

Load a partition based on the partition index.

Parameters:

partition_id (int) – the index that corresponds to the requested partition

Returns:

dataset_partition – single partition of a dataset

Return type:

Dataset

property num_partitions: int

Total number of partitions.