[ ]:
!pip install deepr[cpu]

Advanced

In this notebook, we cover more advanced functionality of deepr, namely metrics, hooks and exporters.

We train the same model (multiply a number by 2) as in the quickstart.

First, some imports

[1]:
import logging
import sys
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logging.getLogger("tensorflow").setLevel(logging.CRITICAL)
[2]:
import deepr
import deepr.examples.multiply as multiply
INFO:faiss:Loading faiss.
[3]:
if deepr.io.Path("model").is_dir():
    deepr.io.Path("model").delete_dir()

Let’s reuse the same build_job to create a dataset of random pairs of (x, 2x)

[4]:
build_job = multiply.jobs.Build(path_dataset="data.tfrecord", num_examples=1000)

Original TrainerJob

Before defining our more advanced Trainer, let’s remind what our original Trainer looked like

[5]:
original_trainer_job = deepr.jobs.Trainer(
    path_model="model",
    pred_fn=multiply.layers.Multiply(inputs="x", outputs="y_pred"),
    loss_fn=multiply.layers.SquaredL2(inputs=("y", "y_pred"), outputs="loss"),
    optimizer_fn=deepr.optimizers.TensorflowOptimizer(optimizer="Adam", learning_rate=0.1),
    train_input_fn=deepr.readers.TFRecordReader(path="data.tfrecord"),
    eval_input_fn=deepr.readers.TFRecordReader(path="data.tfrecord"),
    prepro_fn=multiply.prepros.DefaultPrepro(batch_size=32, repeat_size=10),
)

All it did was train a model given a prediction function, a loss function, a dataset and a preprocessing function.

In real-life scenarios, training is slightly more complicated.

TrainSpec and EvalSpec

For starters, the tf.estimator API uses the concepts of TrainSpec and EvalSpec to configure how often / how many batches of data the training and the evaluation should use. You can pass this information along as follows

[6]:
# Train our model on 1000 batches of data
train_spec = deepr.jobs.TrainSpec(max_steps=1000)

# Run evaluation (in non-distributed mode), every 10 seconds if a new checkpoint is available.
eval_spec = deepr.jobs.EvalSpec(
    throttle_secs=10,
    start_delay_secs=10,
    steps=None  # None means "use all the validation set"
)

This just defines two dictionaries whose arguments will be given to the actual TrainSpec and EvalSpec of the resulting tf.estimator.

Metrics

Now, we can also add some metrics to monitor training and evaluation. There are 3 types of metrics

  • Training: during training, on the training set.

  • Evaluation: during evaluation, on the evaluation set.

  • Final: after the training is complete, re-evaluate on the whole validation set.

Let’s add some to our model, by computing an exponential moving average of the loss during training, or computing the mean of the loss on the validation set (already done by tf.estimator but we do it for the sake of the example).

[7]:
train_metrics = [
    deepr.metrics.StepCounter(name="num_steps"),
    deepr.metrics.DecayMean(tensors=["loss"], decay=0.98)
]
eval_metrics = [deepr.metrics.Mean(tensors=["loss"])]
final_metrics = [deepr.metrics.Mean(tensors=["loss"])]

In deepr, metrics implement the following base class

class Metric(ABC):
    """Base class for Metrics"""

    def __call__(self, tensors: Dict[str, tf.Tensor]) -> Dict[str, Tuple]:
        raise NotImplementedError()

where Tuple is a tuple of (last_value, update_op) that is nothing else than the tf.metrics approach.

In other words, metrics are just a way to build tf.metrics objects using the dictionaries produced by the model’s layers.

Hooks

Now we can configure some basic parameters of the training as well as add metrics.

What about more advanced logic? For example, what if we want to stop the training if some metric on the validation set stops improving after a given number of steps (early stopping)?

tf.estimator uses the concept of hooks, that, as the name suggests, will be injected inside the tf.estimator training code and run once in while.

Similar to the metrics, we can define hooks for all 3 modes : training, evaluation and final evaluation.

For training, let’s add

For evaluation and final evaluation, let’s just add a LoggingTensorHookFactory to log the metrics values and optionaly send them to MLFlow Graphite (with the use_mlflow and use_graphite arguments)

[8]:
train_hooks = [
    deepr.hooks.LoggingTensorHookFactory(
        name="training",
        functions={
            "memory_gb": deepr.hooks.ResidentMemory(unit="gb"),
            "max_memory_gb": deepr.hooks.MaxResidentMemory(unit="gb")
        },
        every_n_iter=100,
        use_graphite=False,
        use_mlflow=False
    ),
    deepr.hooks.SummarySaverHookFactory(save_steps=100),
    deepr.hooks.NumParamsHook(use_mlflow=False),
    deepr.hooks.LogVariablesInitHook(use_mlflow=False),
    deepr.hooks.StepsPerSecHook(
        name="training",
        batch_size=32,
        every_n_steps=100,
        skip_after_step=1000,
        use_mlflow=False,
        use_graphite=False
    ),
    deepr.hooks.EarlyStoppingHookFactory(
        metric="loss",
        mode="decrease",
        max_steps_without_improvement=100,
        min_steps=500,
        run_every_steps=100,
        final_step=1000
    )
]
eval_hooks = [deepr.hooks.LoggingTensorHookFactory(name="validation", at_end=True)]
final_hooks = [deepr.hooks.LoggingTensorHookFactory(name="final_validation", at_end=True)]

A note to more experienced users : most of those hooks are simple wrappers / factories for native tf.estimator hooks so that they can be configured here and not directly in the model_fn of the estimator (for example the native LoggingTensorHook needs to be initialized with actual Tensors, that we obviously can’t access at this level).

However, the EarlyStoppingHook, though it reuses most of the official code, adds an important tweak: it allows you to set a final_step. If given, when early stopping, it will set the global step to that value. Why? Simply because this is currently the easiest way to signal the end of training in distributed settings mode, as other workers all know the maximum number of steps. When the chief broadcasts the final step, all know that it’s time to stop.

Exporters

The latest argument that comes in handy is the exporters one. Now that the tf.estimator is created behind the scenes, we might want to do some things with it at the end of the training.

Here, we do two things

  • use BestCheckpoint to select the best checkpoint based on the validation metrics, and change the checkpoint file of the tf.estimator to point to that specific checkpoint.

  • use SavedModel to export the tf.estimator as a SavedModel. As it runs after the BestCheckpoint, it will use the best checkpoint. Note that you need to define the input fields of your model (they are not currently inferred from the other parameters of the trainer, though we might add this in the future).

[9]:
exporters = [
    deepr.exporters.BestCheckpoint(metric="loss"),
    deepr.exporters.SavedModel(
        path_saved_model="model/saved_model",
        fields=[
            deepr.utils.Field(name="x", shape=(), dtype="float32")
        ]
    )
]

Full TrainerJob

Now that we’ve defined the specs, metrics, hooks and exporters, we update our original Trainer job into

[10]:
trainer_job = deepr.jobs.Trainer(
    path_model="model",
    pred_fn=multiply.layers.Multiply(inputs="x", outputs="y_pred"),
    loss_fn=multiply.layers.SquaredL2(inputs=("y", "y_pred"), outputs="loss"),
    optimizer_fn=deepr.optimizers.TensorflowOptimizer(optimizer="Adam", learning_rate=0.1),
    train_input_fn=deepr.readers.TFRecordReader(path="data.tfrecord"),
    eval_input_fn=deepr.readers.TFRecordReader(path="data.tfrecord"),
    prepro_fn=multiply.prepros.DefaultPrepro(batch_size=32, repeat_size=10),
    train_spec=train_spec,
    eval_spec=eval_spec,
    train_metrics=train_metrics,
    eval_metrics=eval_metrics,
    final_metrics=final_metrics,
    train_hooks=train_hooks,
    eval_hooks=eval_hooks,
    final_hooks=final_hooks,
    exporters=exporters
)

Cleanup Job

Now that we have a more powerful Trainer job, let’s also review some additional jobs that might be added at the end of the training pipeline.

One of them is the CleanupCheckpoints job : it does what it says it does, i.e. deleting the model’s checkpoints (which can save you a lot of disk usage if you run thousands of experiments and have no need to reuse the checkpoints). Because we exported our model as a SavedModel, we probably don’t need those.

[11]:
cleanup_checkpoints = deepr.jobs.CleanupCheckpoints(path_model="model")

OptimizeSavedModel

The SavedModel is by itself sufficient, but still has a few drawbacks

  • it’s actually comprised of a few different files (the graph protobuffer, some files to store variable values)

  • also, it might contain parts of the graph that are not actually useful for inference.

  • finally, maybe the actual inputs of our graph will be intermediate nodes. For example, in NLP, if we have fine-tuned some embeddings and they are part of the graph, the SavedModel probably will expect the word indices. However, in a deployment scenario, the service producing embeddings might be independent from our model. In other words, during training, the graph inputs were the word indices. During inference, the graph inputs are the actual embeddings.

We can do some of these optimizations using the OptimizeSavedModel job.

It produces one self-contained file (a .pb file, like the SavedModel), that contains an updated version of the graph (only the part that produces some tensor fetch given some other tensors feeds), makes it possible to update the tensor’s names, and adds the variable values directly inside the graph definition (effectively making them constants).

Note to more experienced users: the .pb format has a limit in size, which means that Tensorflow graphs cannot be bigger than 2GB. If your model has too many parameters, the OptimizeSavedModel won’t be able to produce one protobuffer file for your graph.

[12]:
optimize_saved_model = deepr.jobs.OptimizeSavedModel(
    path_saved_model="model/saved_model",
    path_optimized_model="model/optimized_saved_model",
    graph_name="_model.pb",
    feeds=["inputs/x"],
    fetch="y_pred",
    new_names={"x": "inputs/x"}
)

ExportXlaModelMetadata

An optional step in our pipeline can be to use XLA.

XLA is a technology that can compile a tensorflow graph to machine code and in the process optimize this graph even more.

ExportXlaModelMetadata is a job that can create a metadata file that is optimized for XLA: it will fix all the shapes so XLA can do its best work.

[13]:
export_xla_model_metadata = deepr.jobs.ExportXlaModelMetadata(
    path_optimized_model= "model/optimized_saved_model",
    path_metadata="model/optimized_saved_model",
    graph_name="_model.pb",
    metadata_name="_meta.pbtxt",
    feed_shapes = {
    'inputs/x': (1,),
    },
    fetch_shapes = {
        'y_pred': None,
    }
)

Predict Job

Once the OptimizeSavedModel has run, how do we use it for inference? Usually, this file will be sent to some production service (probably not using python) in charge of using it to compute predictions. However, we might want to use it in python.

The Predict job in example illustrates how to reload the _model.pb file and use it to compute predictions. All it does is print the predictions given some input_fn.

[14]:
predict_proto = multiply.jobs.PredictProto(
    path_model="model/optimized_saved_model",
    graph_name="_model.pb",
    input_fn=deepr.readers.TFRecordReader(path="data.tfrecord"),
    prepro_fn=multiply.prepros.InferencePrepro(batch_size=1, count=2, inputs="inputs/x"),
    feeds="inputs/x",
    fetches="y_pred",
)
[15]:
predict_saved_model = multiply.jobs.PredictSavedModel(
    path_saved_model="model/saved_model",
    input_fn=deepr.readers.TFRecordReader(path="data.tfrecord"),
    prepro_fn=multiply.prepros.InferencePrepro(batch_size=1, count=2, inputs="x"),
    feeds="x",
    fetches="y_pred",
)

Full Pipeline

Now that we have a production-ready pipeline, let’s run it!

[16]:
pipeline = deepr.jobs.Pipeline([
    build_job,
    trainer_job,
    cleanup_checkpoints,
    optimize_saved_model,
    export_xla_model_metadata,
    predict_proto,
    predict_saved_model
])
[17]:
pipeline.run()
INFO:deepr.examples.multiply.jobs.build:Wrote dataset to 'data.tfrecord'
INFO:deepr.metrics.mean:DecayMean(decay=0.98, tensors=['loss'], pattern=None) -> loss
INFO:deepr.hooks.num_params:Number of parameters (global) = 7
INFO:deepr.hooks.num_params:Number of parameters (trainable) = 1
INFO:deepr.hooks.log_variables_init:alpha_init_average_norm = 1.5546354055404663
INFO:deepr.hooks.log_variables_init:alpha_init_num_zeros = 0
INFO:deepr.hooks.logging_tensor:global_step = 1, loss = 116.3018036, num_steps = 1, max_memory_gb = 0.3282471, memory_gb = 0.3282471
INFO:deepr.hooks.steps_per_sec:steps_per_sec = 372.45, examples_per_sec = 11918.43
INFO:deepr.hooks.logging_tensor:global_step = 101, loss = 21.0533333, num_steps = 101, max_memory_gb = 0.3284912, memory_gb = 0.3284912
INFO:deepr.hooks.steps_per_sec:steps_per_sec = 329.49, examples_per_sec = 10543.83
INFO:deepr.hooks.logging_tensor:global_step = 201, loss = 2.7921832, num_steps = 201, max_memory_gb = 0.3284912, memory_gb = 0.3284912
INFO:deepr.hooks.steps_per_sec:steps_per_sec = 481.47, examples_per_sec = 15406.98
INFO:deepr.hooks.logging_tensor:global_step = 301, loss = 0.3702988, num_steps = 301, max_memory_gb = 0.3284912, memory_gb = 0.3284912
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.metrics.mean:Mean(tensors=['loss'], pattern=None) -> loss
INFO:deepr.hooks.logging_tensor:global_step = 320, loss = 0.0000000
INFO:deepr.exporters.best_checkpoint:Reloading summaries from model/checkpoints
INFO:deepr.exporters.best_checkpoint:- 320: {'average_loss': 2.422635876631052e-12, 'loss': 2.422635876631052e-12}
INFO:deepr.exporters.best_checkpoint:Best summary at step 320: {'average_loss': 2.422635876631052e-12, 'loss': 2.422635876631052e-12}
INFO:deepr.exporters.best_checkpoint:Selected checkpoint 320
INFO:deepr.jobs.trainer:Running final evaluation, using global_step = 320
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.metrics.mean:Mean(tensors=['loss'], pattern=None) -> loss
INFO:deepr.hooks.logging_tensor:global_step = 320, loss = 0.0000000
INFO:deepr.jobs.trainer:{'average_loss': 2.4226359e-12, 'loss': 2.4226359e-12, 'global_step': 320}
INFO:deepr.jobs.cleanup_checkpoints:Cleanup checkpoints in model/checkpoints
INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-320.meta
INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-0.data-00000-of-00001
INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-0.index
INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-320.index
INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-320.data-00000-of-00001
INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-0.meta
INFO:deepr.jobs.optimize_saved_model:Using SavedModel model/saved_model/1595845092
INFO:deepr.jobs.optimize_saved_model:Node renamed: x -> inputs/x
INFO:deepr.jobs.optimize_saved_model:Optimized Model successfully exported to model/optimized_saved_model/_model.pb
INFO:deepr.jobs.export_xla_model_metadata:Metadata successfully saved to model/optimized_saved_model/_meta.pbtxt
INFO:deepr.predictors.proto:Running init_all_tables
INFO:deepr.examples.multiply.jobs.predict:{'inputs/x': array([0.58949345], dtype=float32), 'y_pred': array([1.1789867], dtype=float32)}
INFO:deepr.examples.multiply.jobs.predict:{'inputs/x': array([0.9973951], dtype=float32), 'y_pred': array([1.9947897], dtype=float32)}
INFO:deepr.examples.multiply.jobs.predict:{'x': array([0.58949345], dtype=float32), 'y_pred': array([1.1789867], dtype=float32)}
INFO:deepr.examples.multiply.jobs.predict:{'x': array([0.9973951], dtype=float32), 'y_pred': array([1.9947897], dtype=float32)}