Source code for flwr_datasets.utils

# Copyright 2023 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utils for FederatedDataset."""


import warnings
from typing import Optional, Union, cast

from datasets import Dataset, DatasetDict, concatenate_datasets
from flwr_datasets.partitioner import IidPartitioner, Partitioner
from flwr_datasets.preprocessor import Preprocessor
from flwr_datasets.preprocessor.merger import Merger

tested_datasets = [
    "mnist",
    "ylecun/mnist",
    "cifar10",
    "uoft-cs/cifar10",
    "fashion_mnist",
    "zalando-datasets/fashion_mnist",
    "sasha/dog-food",
    "zh-plus/tiny-imagenet",
    "scikit-learn/adult-census-income",
    "cifar100",
    "uoft-cs/cifar100",
    "svhn",
    "ufldl-stanford/svhn",
    "sentiment140",
    "stanfordnlp/sentiment140",
    "speech_commands",
    "LIUM/tedlium",
    "flwrlabs/femnist",
    "flwrlabs/ucf101",
    "flwrlabs/ambient-acoustic-context",
    "jlh/uci-mushrooms",
    "Mike0307/MNIST-M",
    "flwrlabs/usps",
    "scikit-learn/iris",
    "flwrlabs/pacs",
    "flwrlabs/cinic10",
    "flwrlabs/caltech101",
    "flwrlabs/office-home",
    "flwrlabs/fed-isic2019",
]


def _instantiate_partitioners(
    partitioners: dict[str, Union[Partitioner, int]]
) -> dict[str, Partitioner]:
    """Transform the partitioners from the initial format to instantiated objects.

    Parameters
    ----------
    partitioners : Dict[str, Union[Partitioner, int]]
        Dataset split to the Partitioner or a number of IID partitions.

    Returns
    -------
    partitioners : Dict[str, Partitioner]
        Partitioners specified as split to Partitioner object.
    """
    instantiated_partitioners: dict[str, Partitioner] = {}
    if isinstance(partitioners, dict):
        for split, partitioner in partitioners.items():
            if isinstance(partitioner, Partitioner):
                instantiated_partitioners[split] = partitioner
            elif isinstance(partitioner, int):
                instantiated_partitioners[split] = IidPartitioner(
                    num_partitions=partitioner
                )
            else:
                raise ValueError(
                    f"Incorrect type of the 'partitioners' value encountered. "
                    f"Expected Partitioner or int. Given {type(partitioner)}"
                )
    else:
        raise ValueError(
            f"Incorrect type of the 'partitioners' encountered. "
            f"Expected Dict[str, Union[int, Partitioner]]. "
            f"Given {type(partitioners)}."
        )
    return instantiated_partitioners


def _instantiate_merger_if_needed(
    merger: Optional[Union[Preprocessor, dict[str, tuple[str, ...]]]]
) -> Optional[Preprocessor]:
    """Instantiate `Merger` if preprocessor is merge_config."""
    if merger and isinstance(merger, dict):
        merger = Merger(merge_config=merger)
    return cast(Optional[Preprocessor], merger)


def _check_if_dataset_tested(dataset: str) -> None:
    """Check if the dataset is in the narrowed down list of the tested datasets."""
    if dataset not in tested_datasets:
        warnings.warn(
            f"The currently tested dataset are {tested_datasets}. Given: {dataset}.",
            stacklevel=1,
        )


[docs]def divide_dataset( dataset: Dataset, division: Union[list[float], tuple[float, ...], dict[str, float]] ) -> Union[list[Dataset], DatasetDict]: """Divide the dataset according to the `division`. The division support varying number of splits, which you can name. The splits are created from the beginning of the dataset. Parameters ---------- dataset : Dataset Dataset to be divided. division: Union[List[float], Tuple[float, ...], Dict[str, float]] Configuration specifying how the dataset is divided. Each fraction has to be >0 and <=1. They have to sum up to at most 1 (smaller sum is possible). Returns ------- divided_dataset : Union[List[Dataset], DatasetDict] If `division` is `List` or `Tuple` then `List[Dataset]` is returned else if `division` is `Dict` then `DatasetDict` is returned. Examples -------- Use `divide_dataset` with division specified as a list. >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.utils import divide_dataset >>> >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": 100}) >>> partition = fds.load_partition(0) >>> division = [0.8, 0.2] >>> train, test = divide_dataset(dataset=partition, division=division) Use `divide_dataset` with division specified as a dict (this accomplishes the same goal as the example with a list above). >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.utils import divide_dataset >>> >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": 100}) >>> partition = fds.load_partition(0) >>> division = {"train": 0.8, "test": 0.2} >>> train_test = divide_dataset(dataset=partition, division=division) >>> train, test = train_test["train"], train_test["test"] """ _check_division_config_correctness(division) dataset_length = len(dataset) ranges = _create_division_indices_ranges(dataset_length, division) if isinstance(division, (list, tuple)): split_partition: list[Dataset] = [] for single_range in ranges: split_partition.append(dataset.select(single_range)) return split_partition if isinstance(division, dict): split_partition_dict: dict[str, Dataset] = {} for split_name, single_range in zip(division.keys(), ranges): split_partition_dict[split_name] = dataset.select(single_range) return DatasetDict(split_partition_dict) raise TypeError( f"The type of the `division` should be dict, " f"tuple or list but is {type(division)} instead." )
def _create_division_indices_ranges( dataset_length: int, division: Union[list[float], tuple[float, ...], dict[str, float]], ) -> list[range]: ranges = [] if isinstance(division, (list, tuple)): start_idx = 0 end_idx = 0 for fraction in division: end_idx += int(dataset_length * fraction) ranges.append(range(start_idx, end_idx)) start_idx = end_idx elif isinstance(division, dict): ranges = [] start_idx = 0 end_idx = 0 for fraction in division.values(): end_idx += int(dataset_length * fraction) ranges.append(range(start_idx, end_idx)) start_idx = end_idx else: raise TypeError( f"The type of the `division` should be dict, " f"tuple or list but is {type(division)} instead. " ) return ranges def _check_division_config_types_correctness( division: Union[list[float], tuple[float, ...], dict[str, float]] ) -> None: if isinstance(division, (list, tuple)): if not all(isinstance(x, float) for x in division): raise TypeError( "List or tuple values of `division` must contain only floats, " "other types are not allowed." ) elif isinstance(division, dict): if not all(isinstance(x, float) for x in division.values()): raise TypeError( "Dict values of `division` must be only floats, " "other types are not allowed." ) else: raise TypeError("`division` must be a list, tuple, or dict.") def _check_division_config_values_correctness( division: Union[list[float], tuple[float, ...], dict[str, float]] ) -> None: if isinstance(division, (list, tuple)): if not all(0 < x <= 1 for x in division): raise ValueError( "All fractions for the division must be greater than 0 and smaller or " "equal to 1." ) fraction_sum_from_list_tuple = sum(division) if fraction_sum_from_list_tuple > 1: raise ValueError("Sum of fractions for division must not exceed 1.") if fraction_sum_from_list_tuple < 1: warnings.warn( f"Sum of fractions for division is {sum(division)}, which is below 1. " f"Make sure that's the desired behavior. Some data will not be used " f"in the current specification.", stacklevel=1, ) elif isinstance(division, dict): values = list(division.values()) if not all(0 < x <= 1 for x in values): raise ValueError( "All fractions must be greater than 0 and smaller or equal to 1." ) if sum(values) > 1: raise ValueError("Sum of fractions must not exceed 1.") if sum(values) < 1: warnings.warn( f"Sum of fractions in `division` is {values}, which is below 1. " f"Make sure that's the desired behavior. Some data will not be used " f"in the current specification.", stacklevel=1, ) else: raise TypeError("`division` must be a list, tuple, or dict.") def _check_division_config_correctness( division: Union[list[float], tuple[float, ...], dict[str, float]] ) -> None: _check_division_config_types_correctness(division) _check_division_config_values_correctness(division)
[docs]def concatenate_divisions( partitioner: Partitioner, partition_division: Union[list[float], tuple[float, ...], dict[str, float]], division_id: Union[int, str], ) -> Dataset: """Create a dataset by concatenation of divisions from all partitions. The divisions are created based on the `partition_division` and accessed based on the `division_id`. This fuction can be used to create e.g. centralized dataset from federated on-edge test sets. Parameters ---------- partitioner : Partitioner Partitioner object with assigned dataset. partition_division : Union[List[float], Tuple[float, ...], Dict[str, float]] Fractions specifying the division of the partitions of a `partitioner`. You can think of this as on-edge division of the data into multiple divisions (e.g. into train and validation). E.g. [0.8, 0.2] or {"partition_train": 0.8, "partition_test": 0.2}. division_id : Union[int, str] The way to access the division (from a List or DatasetDict). If your `partition_division` is specified as a list, then `division_id` represents an index to an element in that list. If `partition_division` is passed as a `Dict`, then `division_id` is a key of such dictionary. Returns ------- concatenated_divisions : Dataset A dataset created as concatenation of the divisions from all partitions. Examples -------- Use `concatenate_divisions` with division specified as a list. >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.utils import concatenate_divisions >>> >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": 100}) >>> concatenated_divisions = concatenate_divisions( ... partitioner=fds.partitioners["train"], ... partition_division=[0.8, 0.2], ... division_id=1 ... ) >>> print(concatenated_divisions) Use `concatenate_divisions` with division specified as a dict. This accomplishes the same goal as the example with a list above. >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.utils import concatenate_divisions >>> >>> fds = FederatedDataset(dataset="mnist", partitioners={"train": 100}) >>> concatenated_divisions = concatenate_divisions( ... partitioner=fds["train"], ... partition_division={"train": 0.8, "test": 0.2}, ... division_id="test" ... ) >>> print(concatenated_divisions) """ _check_division_config_correctness(partition_division) divisions = [] zero_len_divisions = 0 for partition_id in range(partitioner.num_partitions): partition = partitioner.load_partition(partition_id) if isinstance(partition_division, (list, tuple)): if not isinstance(division_id, int): raise TypeError( "The `division_id` needs to be an int in case of " "`partition_division` specification as List." ) partition = divide_dataset(partition, partition_division) division = partition[division_id] elif isinstance(partition_division, dict): partition = divide_dataset(partition, partition_division) division = partition[division_id] else: raise TypeError( "The type of partition needs to be List of DatasetDict in this " "context." ) if len(division) == 0: zero_len_divisions += 1 divisions.append(division) if zero_len_divisions == partitioner.num_partitions: raise ValueError( "The concatenated dataset is of length 0. Please change the " "`partition_division` parameter to change this behavior." ) if zero_len_divisions != 0: warnings.warn( f"{zero_len_divisions} division(s) have length zero.", stacklevel=1 ) return concatenate_datasets(divisions)