Source code for deepr.prepros.combinators

"""Combine Preprocessors"""

import itertools
import logging
from typing import Tuple, List, Generator, Union

import tensorflow as tf

from deepr.prepros.base import Prepro
from deepr.prepros import core
from deepr.utils.datastruct import to_flat_tuple


LOGGER = logging.getLogger(__name__)


[docs]class Serial(Prepro): """Chain preprocessors to define complex preprocessing pipelines. It will apply each preprocessing step one after the other on each element. For performance reasons, it fuses :class:`~Map` and :class:`~Filter` operations into single `tf.data` calls. For an example, see the following snippet:: import deepr def gen(): yield {"a": [0], "b": [0, 1]} yield {"a": [0, 1], "b": [0]} yield {"a": [0, 1], "b": [0, 1]} prepro_fn = deepr.prepros.Serial( deepr.prepros.Map(deepr.layers.Sum(inputs=("a", "b"), outputs="c")), deepr.prepros.Filter(deepr.layers.IsMinSize(inputs="a", outputs="a_size", size=2)), deepr.prepros.Filter(deepr.layers.IsMinSize(inputs="b", outputs="b_size", size=2)), ) dataset = tf.data.Dataset.from_generator(gen, {"a": tf.int32, "b": tf.int32}, {"a": (None,), "b": (None,)}) reader = deepr.readers.from_dataset(prepro_fn(dataset)) expected = [{"a": [0, 1], "b": [0, 1], "c": [0, 2]}] Attributes ---------- fuse : bool, Optional If True (default), will fuse :class:`~Map` and :class:`~Filter`. preprocessors : Union[Prepro, Tuple[Prepro], List[Prepro], Generator[Prepro, None, None]] Positional arguments of :class:`~Prepro` instance or Tuple / List / Generator of prepro instances """
[docs] def __init__( self, *preprocessors: Union[Prepro, Tuple[Prepro], List[Prepro], Generator[Prepro, None, None]], fuse: bool = True, num_parallel_calls: int = None, ): super().__init__() self.preprocessors = to_flat_tuple(preprocessors) self.fuse = fuse self.num_parallel_calls = num_parallel_calls # Iterable of preprocessors used by the apply method self._preprocessors = ( _fuse(*self.preprocessors, num_parallel_calls=num_parallel_calls) if fuse else self.preprocessors )
def __repr__(self) -> str: return f"{self.__class__.__name__}({self.preprocessors})"
[docs] def apply(self, dataset: tf.data.Dataset, mode: str = None) -> tf.data.Dataset: """Pre-process a dataset""" for prepro in self._preprocessors: dataset = prepro.apply(dataset, mode=mode) return dataset
def _fuse(*preprocessors: Prepro, num_parallel_calls: int = None) -> Tuple[Prepro, ...]: """Group Map and Filter in _FusedMap and _FusedFilter""" def _flatten(prepros): for prepro in prepros: if isinstance(prepro, Serial): yield from _flatten(prepro.preprocessors) else: yield prepro def _prepro_type(prepro: Prepro) -> str: if isinstance(prepro, core.Map): return "map" elif isinstance(prepro, core.Filter): return "filter" else: return "other" def _gen(): for prepro_type, prepros in itertools.groupby(_flatten(preprocessors), _prepro_type): if prepro_type == "map": yield _FusedMap(*list(prepros), num_parallel_calls=num_parallel_calls) elif prepro_type == "filter": yield _FusedFilter(*list(prepros)) else: yield list(prepros) return to_flat_tuple(_gen()) class _FusedMap(Prepro): """Fused Map""" def __init__(self, *preprocessors: core.Map, num_parallel_calls: int = None): self.preprocessors = preprocessors self.num_parallel_calls = num_parallel_calls if not all(isinstance(prepro, core.Map) for prepro in self.preprocessors): msg = f"All processors must be `deepr.Map` but got {self.preprocessors}" raise TypeError(msg) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.preprocessors})" def apply(self, dataset: tf.data.Dataset, mode: str = None): """Apply preprocessors as one map operation""" # Filter preprocessors for this mode active_prepros: List[core.Map] = [] for prepro in self.preprocessors: if mode is not None and prepro.modes is not None and mode not in prepro.modes: LOGGER.info(f"Not applying {prepro} (mode={mode})") continue active_prepros.append(prepro) # Apply filtered preprocessors if not active_prepros: return dataset else: def _fused_tf_map_func(element): for prepro in active_prepros: element = prepro.tf_map_func(element) return element return dataset.map(_fused_tf_map_func, num_parallel_calls=self.num_parallel_calls) class _FusedFilter(Prepro): """Fused Filter""" def __init__(self, *preprocessors: core.Filter): self.preprocessors = preprocessors if not all(isinstance(prepro, core.Filter) for prepro in self.preprocessors): msg = f"All processors must be `deepr.Filter` but got {self.preprocessors}" raise TypeError(msg) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.preprocessors})" def apply(self, dataset: tf.data.Dataset, mode: str = None): """Apply preprocessors as one filter operation""" # Filter preprocessors for this mode active_prepros: List[core.Filter] = [] for prepro in self.preprocessors: if mode is not None and prepro.modes is not None and mode not in prepro.modes: LOGGER.info(f"Not applying {prepro} (mode={mode})") continue active_prepros.append(prepro) # Apply filtered preprocessors if not active_prepros: return dataset else: def _fused_tf_predicate(element): pred = None for prepro in active_prepros: new_pred = prepro.tf_predicate(element) pred = new_pred if pred is None else tf.logical_and(pred, new_pred) return pred return dataset.filter(_fused_tf_predicate)