Source code for deepr.hooks.logging_tensor

"""MLFlow Metrics Hook"""

from typing import Any, Dict, List, Callable
import logging
import psutil

import tensorflow as tf

from deepr.hooks.base import TensorHookFactory
from deepr.utils import mlflow
from deepr.utils import graphite
from deepr.metrics import sanitize_metric_name


LOGGER = logging.getLogger(__name__)


def _default_formatter(tag, value):
    """Convert to float / int if relevant."""
    try:
        value = float(value)
        if value == int(value):
            value = int(value)
    except ValueError:
        pass
    except Exception as e:
        LOGGER.info(f"Exception on {tag}={value}")
        raise e
    return f"{tag} = {value:.7f}" if isinstance(value, float) else f"{tag} = {value}"


[docs]class LoggingTensorHookFactory(TensorHookFactory): """Parametrize the creation of a LoggingTensorHook factory. Arguments for instantiation should be provided as keyword arguments. Attributes ---------- tensors : List[str], Optional Name of the tensors to use at runtime. If None (default), log all scalars. functions : Dict[str, Callable[[], float]], Optional Additional "python" metrics. Each function should return a float name: str, Optional Name used as prefix of tags when sending to MLFlow / Graphite use_mlflow: bool, Optional If True, send metrics to MLFlow. Default is False. use_graphite: bool, Optional If True, send metrics to Graphite. Default is False. skip_after_step: int, Optional If not None, do not run the hooks after this step. Prevents outliers when used in conjunction with an early stopping hook that overrides the global_step. formatter: Callable[[str, Any], str], Optional Formatter for logging, default uses 7 digits precision. """
[docs] def __init__( self, tensors: List[str] = None, functions: Dict[str, Callable[[], float]] = None, name: str = None, use_mlflow: bool = False, use_graphite: bool = False, skip_after_step: int = None, every_n_iter: int = None, every_n_secs: int = None, at_end: bool = False, formatter: Callable[[str, Any], str] = _default_formatter, ): self.tensors = tensors self.functions = functions self.name = name self.use_mlflow = use_mlflow self.use_graphite = use_graphite self.skip_after_step = skip_after_step self.every_n_iter = every_n_iter self.every_n_secs = every_n_secs self.at_end = at_end self.formatter = formatter
def __call__(self, tensors: Dict[str, tf.Tensor]) -> tf.estimator.LoggingTensorHook: if self.tensors is None: tensors = {key: tensor for key, tensor in tensors.items() if len(tensor.shape) == 0} else: tensors = {name: tensors[name] for name in self.tensors} return LoggingTensorHook( tensors=tensors, functions=self.functions, name=self.name, use_mlflow=self.use_mlflow, use_graphite=self.use_graphite, skip_after_step=self.skip_after_step, every_n_iter=self.every_n_iter, every_n_secs=self.every_n_secs, at_end=self.at_end, formatter=self.formatter, )
[docs]class LoggingTensorHook(tf.train.LoggingTensorHook): """Logging Hook (tensors and custom metrics as functions)""" def __init__( self, tensors: Dict[str, tf.Tensor], functions: Dict[str, Callable[[], float]] = None, name: str = None, use_mlflow: bool = False, use_graphite: bool = False, skip_after_step: int = None, every_n_iter: int = None, every_n_secs: int = None, at_end: bool = False, formatter: Callable[[str, Any], str] = _default_formatter, ): if "global_step" not in tensors: tensors["global_step"] = tf.train.get_global_step() super().__init__(tensors=tensors, every_n_iter=every_n_iter, every_n_secs=every_n_secs, at_end=at_end) self.functions = functions self.name = name self.use_mlflow = use_mlflow self.use_graphite = use_graphite self.skip_after_step = skip_after_step self.formatter = formatter self._fn_order = sorted(self.functions.keys()) if self.functions else [] def _log_tensors(self, tensor_values): """Update timer, log tensors, send to MLFlow and Graphite""" self._timer.update_last_triggered_step(self._iter_count) global_step = tensor_values["global_step"] if self.skip_after_step is not None and global_step >= self.skip_after_step: return # Log tensor and function values ord_tensor_values = [(tag, tensor_values[tag]) for tag in self._tag_order] ord_function_values = [(tag, self.functions[tag]()) for tag in self._fn_order] if self.functions else [] LOGGER.info(", ".join(self.formatter(tag, value) for tag, value in ord_tensor_values + ord_function_values)) # Send to MLFlow and Graphite for tag, value in ord_tensor_values + ord_function_values: if self.use_graphite: graphite.log_metric(tag, value, postfix=self.name) if self.use_mlflow: tag = tag if self.name is None else f"{self.name}_{tag}" mlflow.log_metric(sanitize_metric_name(tag), value, step=global_step)
_UNIT_TO_FACTOR = {"kb": 1024, "mb": 1024 ** 2, "gb": 1024 ** 3}
[docs]class ResidentMemory: """Measure resident memory of the current process"""
[docs] def __init__(self, unit: str = "gb"): self.unit = unit self._factor = _UNIT_TO_FACTOR[self.unit] self._process = psutil.Process()
def __call__(self): return self._process.memory_info().rss / self._factor
[docs]class MaxResidentMemory(ResidentMemory): """Measure maximum resident memory of the current process"""
[docs] def __init__(self, unit: str = "gb"): super().__init__(unit=unit) self._max = None
def __call__(self): memory = super().__call__() self._max = memory if self._max is None else max(self._max, memory) return self._max