Source code for flwr_datasets.partitioner.vertical_size_partitioner

# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""VerticalSizePartitioner class."""
# flake8: noqa: E501
# pylint: disable=C0301, R0902, R0913
from math import floor
from typing import Literal, Optional, Union, cast

import numpy as np

import datasets
from flwr_datasets.partitioner.partitioner import Partitioner
from flwr_datasets.partitioner.vertical_partitioner_utils import (
    _add_active_party_columns,
    _init_optional_str_or_list_str,
)


[docs] class VerticalSizePartitioner(Partitioner): """Creates vertical partitions by spliting features (columns) based on sizes. The sizes refer to the number of columns after the `drop_columns` are dropped. `shared_columns` and `active_party_column` are excluded and added only after the size-based division. Enables selection of "active party" column(s) and palcement into a specific partition or creation of a new partition just for it. Also enables droping columns and sharing specified columns across all partitions. Parameters ---------- partition_sizes : Union[list[int], list[float]] A list where each value represents the size of a partition. list[int] -> each value represent an absolute number of columns. Size zero is allowed and will result in an empty partition if no shared columns are present. A list of floats -> each value represent a fraction total number of columns. Note that these values apply to collums without `active_party_columns`, `shared_columns`. They are additionally included in to the partition(s). `drop_columns` are also not counted toward the partition sizes. In case fo list[int]: sum(partition_sizes) == len(columns) - len(drop_columns) - len(shared_columns) - len(active_party_columns) active_party_columns : Optional[Union[str, list[str]]] Column(s) (typically representing labels) associated with the "active party" (which can be the server). active_party_columns_mode : Union[Literal[["add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all"], int] Determines how to assign the active party columns: - `"add_to_first"`: Append active party columns to the first partition. - `"add_to_last"`: Append active party columns to the last partition. - `"create_as_first"`: Create a new partition at the start containing only these columns. - `"create_as_last"`: Create a new partition at the end containing only these columns. - `"add_to_all"`: Append active party columns to all partitions. - int: Append active party columns to the specified partition index. drop_columns : Optional[Union[str, list[str]]] Columns to remove entirely from the dataset before partitioning. shared_columns : Optional[Union[str, list[str]]] Columns to duplicate into every partition after initial partitioning. shuffle : bool Whether to shuffle the order of columns before partitioning. seed : Optional[int] Random seed for shuffling columns. Has no effect if `shuffle=False`. Examples -------- >>> from flwr_datasets import FederatedDataset >>> from flwr_datasets.partitioner import VerticalSizePartitioner >>> >>> partitioner = VerticalSizePartitioner( ... partition_sizes=[8, 4, 2], ... active_party_columns="income", ... active_party_columns_mode="create_as_last" ... ) >>> fds = FederatedDataset( ... dataset="scikit-learn/adult-census-income", ... partitioners={"train": partitioner} ... ) >>> partitions = [fds.load_partition(i) for i in range(fds.partitioners["train"].num_partitions)] >>> print([partition.column_names for partition in partitions]) """ def __init__( self, partition_sizes: Union[list[int], list[float]], active_party_columns: Optional[Union[str, list[str]]] = None, active_party_columns_mode: Union[ Literal[ "add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all", ], int, ] = "add_to_last", drop_columns: Optional[Union[str, list[str]]] = None, shared_columns: Optional[Union[str, list[str]]] = None, shuffle: bool = True, seed: Optional[int] = 42, ) -> None: super().__init__() self._partition_sizes = partition_sizes self._active_party_columns = _init_optional_str_or_list_str( active_party_columns ) self._active_party_columns_mode = active_party_columns_mode self._drop_columns = _init_optional_str_or_list_str(drop_columns) self._shared_columns = _init_optional_str_or_list_str(shared_columns) self._shuffle = shuffle self._seed = seed self._rng = np.random.default_rng(seed=self._seed) self._partition_columns: Optional[list[list[str]]] = None self._partitions_determined = False self._validate_parameters_in_init() def _determine_partitions_if_needed(self) -> None: if self._partitions_determined: return if self.dataset is None: raise ValueError("No dataset is set for this partitioner.") all_columns = list(self.dataset.column_names) self._validate_parameters_while_partitioning( all_columns, self._shared_columns, self._active_party_columns ) columns = [column for column in all_columns if column not in self._drop_columns] columns = [column for column in columns if column not in self._shared_columns] columns = [ column for column in columns if column not in self._active_party_columns ] if self._shuffle: self._rng.shuffle(columns) if all(isinstance(fraction, float) for fraction in self._partition_sizes): partition_columns = _fraction_split( columns, cast(list[float], self._partition_sizes) ) else: partition_columns = _count_split( columns, cast(list[int], self._partition_sizes) ) partition_columns = _add_active_party_columns( self._active_party_columns, self._active_party_columns_mode, partition_columns, ) # Add shared columns to all partitions for partition in partition_columns: for column in self._shared_columns: partition.append(column) self._partition_columns = partition_columns self._partitions_determined = True
[docs] def load_partition(self, partition_id: int) -> datasets.Dataset: """Load a partition based on the partition index. Parameters ---------- partition_id : int The index that corresponds to the requested partition. Returns ------- dataset_partition : Dataset Single partition of a dataset. """ self._determine_partitions_if_needed() assert self._partition_columns is not None if partition_id < 0 or partition_id >= len(self._partition_columns): raise IndexError( f"partition_id: {partition_id} out of range <0, {self.num_partitions - 1}>." ) columns = self._partition_columns[partition_id] return self.dataset.select_columns(columns)
@property def num_partitions(self) -> int: """Number of partitions.""" self._determine_partitions_if_needed() assert self._partition_columns is not None return len(self._partition_columns) def _validate_parameters_in_init(self) -> None: if not isinstance(self._partition_sizes, list): raise ValueError("partition_sizes must be a list.") if all(isinstance(fraction, float) for fraction in self._partition_sizes): fraction_sum = sum(self._partition_sizes) # Tolerance 0.01 for the floating point numerical problems if 0.99 < fraction_sum < 1.01: self._partition_sizes = self._partition_sizes[:-1] + [ 1.0 - self._partition_sizes[-1] ] fraction_sum = 1.0 if fraction_sum != 1.0: raise ValueError( "Float ratios in `partition_sizes` must sum to 1.0. " f"Instead got: {fraction_sum}." ) if any( fraction < 0.0 or fraction > 1.0 for fraction in self._partition_sizes ): raise ValueError( "All floats in `partition_sizes` must be >= 0.0 and <= 1.0." ) elif all( isinstance(coulumn_count, int) for coulumn_count in self._partition_sizes ): if any(coulumn_count < 0 for coulumn_count in self._partition_sizes): raise ValueError("All integers in `partition_sizes` must be >= 0.") else: raise ValueError("`partition_sizes` list must be all floats or all ints.") # Validate columns lists for parameter_name, parameter_list in [ ("drop_columns", self._drop_columns), ("shared_columns", self._shared_columns), ("active_party_columns", self._active_party_columns), ]: if not all(isinstance(column, str) for column in parameter_list): raise ValueError(f"All entries in {parameter_name} must be strings.") valid_modes = { "add_to_first", "add_to_last", "create_as_first", "create_as_last", "add_to_all", } if not ( isinstance(self._active_party_columns_mode, int) or self._active_party_columns_mode in valid_modes ): raise ValueError( "active_party_columns_mode must be an int or one of " "'add_to_first', 'add_to_last', 'create_as_first', 'create_as_last', " "'add_to_all'." ) def _validate_parameters_while_partitioning( self, all_columns: list[str], shared_columns: list[str], active_party_columns: list[str], ) -> None: # Shared columns existance check for column in shared_columns: if column not in all_columns: raise ValueError(f"Shared column '{column}' not found in the dataset.") # Active party columns existence check for column in active_party_columns: if column not in all_columns: raise ValueError( f"Active party column '{column}' not found in the dataset." ) num_columns = len(all_columns) num_cols_unused_in_core_div = 0 if self._active_party_columns is not None: num_cols_unused_in_core_div += len(self._active_party_columns) if self._shared_columns is not None: num_cols_unused_in_core_div += len(self._shared_columns) if self._drop_columns is not None: num_cols_unused_in_core_div += len(self._drop_columns) num_core_div_columns = num_columns - num_cols_unused_in_core_div if all(isinstance(size, int) for size in self._partition_sizes): if sum(self._partition_sizes) != num_core_div_columns: raise ValueError( "Sum of partition sizes cannot differ from the total number of columns " "used in the division. Note that shared_columns, drop_columns and" "active_party_columns are not included in the division." )
def _count_split(columns: list[str], counts: list[int]) -> list[list[str]]: partition_columns = [] start = 0 for count in counts: end = start + count partition_columns.append(columns[start:end]) start = end return partition_columns def _fraction_split(columns: list[str], fractions: list[float]) -> list[list[str]]: num_columns = len(columns) partitions = [] cumulative = 0 for index, fraction in enumerate(fractions): count = int(floor(fraction * num_columns)) if index == len(fractions) - 1: # Last partition takes the remainder count = num_columns - cumulative partitions.append(columns[cumulative : cumulative + count]) cumulative += count return partitions