Source code for flex.data.dataset

"""
Copyright (C) 2024  Instituto Andaluz Interuniversitario en Ciencia de Datos e Inteligencia Computacional (DaSCI).

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published
    by the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

import contextlib
import warnings
from dataclasses import dataclass, field
from typing import Optional, Union

import numpy as np
from cardinality import count

from flex.data.lazy_indexable import LazyIndexable


[docs] @dataclass(frozen=True) class Dataset: """Class used to represent the dataset from a node in a Federated Learning enviroment. Attributes ---------- X_data: LazyIndexable A numpy.array containing the data for the node. y_data: LazyIndexable A numpy.array containing the labels for the training data. Can be None if working on an unsupervised learning task. Default None. """ X_data: LazyIndexable = field(init=True) y_data: Optional[LazyIndexable] = field(default=None, init=True) def __len__(self): return len(self.X_data) def __getitem__(self, index): if isinstance(index, (int, np.integer)): return ( self.X_data[index], self.y_data[index] if self.y_data is not None else None, ) elif isinstance(index, (slice, list)): return Dataset( self.X_data[index], self.y_data[index] if self.y_data is not None else None, ) else: raise IndexError( f"Indexing with element {index} of type {type(index)} is not supported" ) def __iter__(self): return zip( self.X_data, self.y_data if self.y_data is not None else [None] * len(self), )
[docs] def to_torchvision_dataset(self, **kwargs): """This function transforms a Dataset into a Torchvision dataset object Returns: -------- torvhcision.datasets.VisionDataset: a torchvision dataset with the contents of datasets. \ Note that transforms should be pased as arguments. """ from .dataset_pt_utils import DefaultVision return DefaultVision(self, **kwargs)
[docs] def to_tf_dataset(self): """This function is an utility to transform a Dataset object to a tensorflow.data.Dataset object Returns: -------- tensorflow.data.Dataset: tf dataset object instanciated using the contents of a Dataset """ from tensorflow import type_spec_from_value from tensorflow.data import Dataset as tf_Dataset return tf_Dataset.from_generator( self.__iter__, output_signature=( type_spec_from_value(self[0][0]), type_spec_from_value(self[0][1]), ), )
[docs] def to_numpy(self, x_dtype=None, y_dtype=None): """Function to return the FlexDataObject as numpy arrays.""" if self.y_data is None: return self.X_data.to_numpy(dtype=x_dtype) else: return self.X_data.to_numpy(x_dtype), self.y_data.to_numpy(dtype=y_dtype)
[docs] def to_list(self): """Function to return the FlexDataObject as list.""" if self.y_data is None: return self.X_data.tolist() else: return self.X_data.tolist(), self.y_data.tolist()
[docs] @classmethod def from_torchvision_dataset(cls, pytorch_dataset): """Function to convert an object from torchvision.datasets.* to a FlexDataObject. Args: ----- pytorch_dataset (torchvision.datasets.*): a torchvision dataset. Returns: -------- Dataset: a FlexDataObject which encapsulates the dataset. """ from flex.data.dataset_pt_utils import FeatureDataset, LabelDataset from flex.data.pluggable_datasets import PluggableTorchvision if pytorch_dataset.__class__.__name__ not in PluggableTorchvision: warnings.warn( "The input dataset and arguments are not explicitly supported, therefore they might not work as expected.", RuntimeWarning, ) length = count(pytorch_dataset) X_data = LazyIndexable(FeatureDataset(pytorch_dataset), length=length) y_data = LazyIndexable(LabelDataset(pytorch_dataset), length=length) return cls(X_data=X_data, y_data=y_data)
[docs] @classmethod def from_tfds_image_dataset(cls, tfds_dataset): """Function to convert a dataset from tensorflow_datasets to a FlexDataObject. Args: ----- tdfs_dataset (tf.data.Datasets): a tf dataset Returns: -------- Dataset: a FlexDataObject which encapsulates the dataset. """ if not isinstance(tfds_dataset, tuple): # unbatch if required with contextlib.suppress(ValueError): tfds_dataset = tfds_dataset.unbatch() # After unbatching, we can't get the length, so we have to get it. # To get the length, we use count. length = count(tfds_dataset.as_numpy_iterator()) X_data = LazyIndexable( (x for x, _ in tfds_dataset.as_numpy_iterator()), length=length ) y_data = LazyIndexable( (y for _, y in tfds_dataset.as_numpy_iterator()), length=length ) else: X_data = LazyIndexable(iter(tfds_dataset[0]), length=len(tfds_dataset[0])) y_data = LazyIndexable(iter(tfds_dataset[1]), length=len(tfds_dataset[1])) return cls(X_data=X_data, y_data=y_data)
[docs] @classmethod def from_tfds_text_dataset( cls, tfds_dataset, X_columns: list = None, label_columns: list = None ): """Function to convert a dataset from tensorflow_datasets to a FlexDataObject. Args: ----- tdfs_dataset (tf.data.Datasets): a tf dataset loaded. X_columns (list): List containing the features (input) of the model. label_columns (list): List containing the targets of the model. Returns: -------- Dataset: a FlexDataObject which encapsulates the dataset. """ from tensorflow.python.data.ops.dataset_ops import PrefetchDataset if isinstance(tfds_dataset, PrefetchDataset): # First case: Users used load func with batch_size != -1 or without indicating the batch_size length = len(tfds_dataset) if not isinstance(tfds_dataset, tuple): with contextlib.suppress(ValueError): tfds_dataset.unbatch() if X_columns is None: X_data_generator = iter(tfds_dataset.as_numpy_iterator()) elif len(X_columns) == 1: X_data_generator = ( tuple(map(row.get, X_columns))[0] for row in tfds_dataset.as_numpy_iterator() ) else: X_data_generator = ( tuple(map(row.get, X_columns)) for row in tfds_dataset.as_numpy_iterator() ) X_data = LazyIndexable(X_data_generator, length=length) if label_columns is None: y_data = None elif len(label_columns) == 1: y_data_generator = ( tuple(map(row.get, label_columns))[0] for row in tfds_dataset.as_numpy_iterator() ) y_data = LazyIndexable(y_data_generator, length=length) else: y_data_generator = ( tuple(map(row.get, label_columns)) for row in tfds_dataset.as_numpy_iterator() ) y_data = LazyIndexable(y_data_generator, length=length) else: # User used batch_size=-1 when using the load function if X_columns is None: X_data_generator = iter(map(tfds_dataset.get, tfds_dataset.keys())) else: X_data_generator = iter(map(tfds_dataset.get, X_columns)) X_data = LazyIndexable(X_data_generator, length=len(tfds_dataset)) if label_columns is None: y_data = None else: y_data_generator = iter(map(tfds_dataset.get, label_columns)) y_data = LazyIndexable(y_data_generator, length=len(tfds_dataset)) return cls(X_data=X_data, y_data=y_data)
[docs] @classmethod def from_huggingface_dataset( cls, hf_dataset, X_columns: list = None, label_columns: list = None, ): """Function to conver an arrow dataset from the Datasets package (HuggingFace datasets library) to a FlexDataObject. Args: ----- hf_dataset (Union[datasets.arrow_dataset.Dataset, str]): a dataset from the dataset library. If a string is recieved, it will load the dataset from the HuggingFace repository. When a string is given, the split has to be specified in the str variable as follows: 'dataset;split'. Also, if the string contains a subset, for those datasets that have multiple subsets for differents tasks, it may be given as follow: 'dataset;subset;split', so we can download the dataset and the desired subset and split. X_columns (list): List containing the features names for training the model label_columns (list): List containing the name or names of the label column Returns: -------- Dataset: a FlexDataObject which encapsulates the dataset. """ from flex.data.pluggable_datasets import PluggableHuggingFace try: name_checker = "" if isinstance(hf_dataset, str): from datasets.load import load_dataset hf_dataset = hf_dataset.split(";") if len(hf_dataset) == 2: name, split = hf_dataset subset = None elif len(hf_dataset) == 3: name, subset, split = hf_dataset try: hf_dataset = ( load_dataset(name, split=split) if subset is None else load_dataset( name, subset, split=split, ignore_verifications=True ) ) except Exception as err: print( f"Couldn't download the dataset from the HuggingFace datasets: {err}" ) name_checker = ( f"{name.upper()}_{subset.upper()}_HF" if subset is not None else f"{name.upper()}_HF" ) else: name_checker = hf_dataset.info.builder_name if name_checker not in PluggableHuggingFace.__members__.keys(): warnings.warn( "The input dataset and arguments are not explicitly supported, therefore they might not work as expected.", RuntimeWarning, ) except Exception: warnings.warn( "The input dataset doesn't have the property dataset.info.builder_name or the str format is not correct, so we can't check if is supported or not. Therefore, it might not work as expected.", RuntimeWarning, ) length = count(hf_dataset) if X_columns is None: X_data_generator = iter( zip(*map(hf_dataset.__getitem__, hf_dataset.features)) ) elif len(X_columns) == 1: X_data_generator = ( i for x in map(hf_dataset.__getitem__, X_columns) for i in x ) else: X_data_generator = iter(zip(*map(hf_dataset.__getitem__, X_columns))) X_data = LazyIndexable(X_data_generator, length=length) if label_columns is None: y_data = None elif len(label_columns) == 1: y_data_generator = ( i for x in map(hf_dataset.__getitem__, label_columns) for i in x ) y_data = LazyIndexable(y_data_generator, length=length) else: y_data_generator = iter(zip(*map(hf_dataset.__getitem__, label_columns))) y_data = LazyIndexable(y_data_generator, length=length) return cls(X_data=X_data, y_data=y_data)
[docs] @classmethod def from_torchtext_dataset(cls, pytorch_text_dataset): """Function to convert an object from torchtext.datasets.* to a FlexDataObject. It is mandatory that the dataset contains at least the following transform: torchtext.transforms.ToTensor() Args: ----- pytorch_text_dataset (torchtext.datasets.*): a torchtext dataset Returns: -------- Dataset: a FlexDataObject which encapsulates the dataset. """ from flex.data.pluggable_datasets import PluggableTorchtext if pytorch_text_dataset.__class__.__name__ not in PluggableTorchtext: warnings.warn( "The input dataset and arguments are not explicitly supported, therefore they might not work as expected.", RuntimeWarning, ) try: length = len(pytorch_text_dataset) except TypeError: y_data = [label for label, _ in pytorch_text_dataset] length = len(y_data) X_data = LazyIndexable( (text for _, text in pytorch_text_dataset), length=length ) y_data = LazyIndexable(y_data, length=length) return cls(X_data=X_data, y_data=y_data)
[docs] @classmethod def from_array( cls, X_array: Union[list, np.ndarray], y_array: Union[list, np.ndarray] = None ): """Function that create a Dataset from array-like objects, list and numpy. Args: ----- X_array (Union[list, np.ndarray]): Array-like containing X_data. y_array (Optional[Union[list, np.ndarray]]): Array-like containing the y_data. Default None. Returns: -------- Dataset: a Dataset which encasulates X_array and/or y_array. """ if y_array is not None: if not isinstance(X_array, (list, np.ndarray)) or not isinstance( y_array, (list, np.ndarray) ): warnings.warn( # noqa: B028 "X_array or y_array are not a list nor a numpy array. The method might not work as expected.", RuntimeWarning, ) else: if not isinstance(X_array, (list, np.ndarray)): warnings.warn( # noqa: B028 "X_array is not a list nor a numpy array. The method might not work as expected.", RuntimeWarning, ) X_data = LazyIndexable(X_array, length=len(X_array)) y_data = ( None if y_array is None else LazyIndexable(y_array, length=len(y_array)) ) return cls(X_data=X_data, y_data=y_data)
[docs] def validate(self): """Function that checks whether the object is correct or not.""" try: y_data_length = len(self.y_data) except TypeError: y_data_length = self.y_data.shape[0] if self.y_data is not None and len(self) != y_data_length: raise ValueError( f"X_data and y_data must have equal lenght. X_data has {len(self)} elements and y_data has {y_data_length} elements." )