{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install deepr[cpu]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Advanced\n", "\n", "In this notebook, we cover more advanced functionality of deepr, namely metrics, hooks and exporters.\n", "\n", "We train the same model (multiply a number by 2) as in the quickstart.\n", "\n", "First, some imports" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import logging\n", "import sys\n", "logging.basicConfig(level=logging.INFO, stream=sys.stdout)\n", "logging.getLogger(\"tensorflow\").setLevel(logging.CRITICAL)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO:faiss:Loading faiss.\n" ] } ], "source": [ "import deepr\n", "import deepr.examples.multiply as multiply" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "if deepr.io.Path(\"model\").is_dir():\n", " deepr.io.Path(\"model\").delete_dir()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's reuse the same `build_job` to create a dataset of random pairs of (x, 2x)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "build_job = multiply.jobs.Build(path_dataset=\"data.tfrecord\", num_examples=1000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Original TrainerJob\n", "\n", "Before defining our more advanced `Trainer`, let's remind what our original `Trainer` looked like" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "original_trainer_job = deepr.jobs.Trainer(\n", " path_model=\"model\", \n", " pred_fn=multiply.layers.Multiply(inputs=\"x\", outputs=\"y_pred\"), \n", " loss_fn=multiply.layers.SquaredL2(inputs=(\"y\", \"y_pred\"), outputs=\"loss\"),\n", " optimizer_fn=deepr.optimizers.TensorflowOptimizer(optimizer=\"Adam\", learning_rate=0.1),\n", " train_input_fn=deepr.readers.TFRecordReader(path=\"data.tfrecord\"),\n", " eval_input_fn=deepr.readers.TFRecordReader(path=\"data.tfrecord\"),\n", " prepro_fn=multiply.prepros.DefaultPrepro(batch_size=32, repeat_size=10),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "All it did was train a model given a prediction function, a loss function, a dataset and a preprocessing function.\n", "\n", "In real-life scenarios, training is slightly more complicated.\n", "\n", "## TrainSpec and EvalSpec\n", "\n", "For starters, the `tf.estimator` API uses the concepts of `TrainSpec` and `EvalSpec` to configure how often / how many batches of data the training and the evaluation should use. You can pass this information along as follows" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Train our model on 1000 batches of data\n", "train_spec = deepr.jobs.TrainSpec(max_steps=1000)\n", "\n", "# Run evaluation (in non-distributed mode), every 10 seconds if a new checkpoint is available.\n", "eval_spec = deepr.jobs.EvalSpec(\n", " throttle_secs=10, \n", " start_delay_secs=10, \n", " steps=None # None means \"use all the validation set\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This just defines two dictionaries whose arguments will be given to the actual `TrainSpec` and `EvalSpec` of the resulting `tf.estimator`.\n", "\n", "\n", "## Metrics\n", "\n", "Now, we can also add some metrics to monitor training and evaluation. There are 3 types of metrics \n", "\n", "- Training: during training, on the training set.\n", "- Evaluation: during evaluation, on the evaluation set.\n", "- Final: after the training is complete, re-evaluate on the whole validation set.\n", "\n", "\n", "Let's add some to our model, by computing an exponential moving average of the loss during training, or computing the mean of the loss on the validation set (already done by `tf.estimator` but we do it for the sake of the example)." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "train_metrics = [\n", " deepr.metrics.StepCounter(name=\"num_steps\"),\n", " deepr.metrics.DecayMean(tensors=[\"loss\"], decay=0.98)\n", "]\n", "eval_metrics = [deepr.metrics.Mean(tensors=[\"loss\"])]\n", "final_metrics = [deepr.metrics.Mean(tensors=[\"loss\"])]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In `deepr`, metrics implement the following base class\n", "\n", "```python\n", "class Metric(ABC):\n", " \"\"\"Base class for Metrics\"\"\"\n", "\n", " def __call__(self, tensors: Dict[str, tf.Tensor]) -> Dict[str, Tuple]:\n", " raise NotImplementedError()\n", "```\n", "\n", "where `Tuple` is a tuple of `(last_value, update_op)` that is nothing else than the `tf.metrics` approach. \n", "\n", "In other words, metrics are just a way to build `tf.metrics` objects using the dictionaries produced by the model's layers." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Hooks\n", "\n", "Now we can configure some basic parameters of the training as well as add metrics.\n", "\n", "What about more advanced logic? For example, what if we want to stop the training if some metric on the validation set stops improving after a given number of steps (early stopping)?\n", "\n", "`tf.estimator` uses the concept of hooks, that, as the name suggests, will be injected inside the `tf.estimator` training code and run once in while.\n", "\n", "Similar to the metrics, we can define hooks for all 3 modes : training, evaluation and final evaluation.\n", "\n", "For training, let's add\n", "\n", "- [LoggingTensorHookFactory](https://criteo.github.io/deepr/API/_autosummary/deepr.hooks.LoggingTensorHookFactory.html) : log additional metrics, optionaly send to MLFlow / Graphite\n", "- [SummarySaverHookFactory](https://criteo.github.io/deepr/API/_autosummary/deepr.hooks.SummarySaverHookFactory.html) : save summaries for Tensorboard\n", "- [NumParamsHook](https://criteo.github.io/deepr/API/_autosummary/deepr.hooks.NumParamsHook.html) : log initial number of parameters in the model\n", "- [LogVariablesInitHook](https://criteo.github.io/deepr/API/_autosummary/deepr.hooks.LogVariablesInitHook.html) : log some basic stats about initial parameters (number of zeros, average norm)\n", "- [StepsPerSecHook](https://criteo.github.io/deepr/API/_autosummary/deepr.hooks.StepsPerSecHook.html) : log training speed (number of batches and examples per second)\n", "- [EarlyStoppingHookFactory](https://criteo.github.io/deepr/API/_autosummary/deepr.hooks.EarlyStoppingHookFactory.html) : stop training if the `loss` does not decrease on the validation set of 100 consecutive training steps.\n", "\n", "For evaluation and final evaluation, let's just add a `LoggingTensorHookFactory` to log the metrics values and optionaly send them to MLFlow Graphite (with the `use_mlflow` and `use_graphite` arguments)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "train_hooks = [\n", " deepr.hooks.LoggingTensorHookFactory(\n", " name=\"training\", \n", " functions={\n", " \"memory_gb\": deepr.hooks.ResidentMemory(unit=\"gb\"),\n", " \"max_memory_gb\": deepr.hooks.MaxResidentMemory(unit=\"gb\")\n", " },\n", " every_n_iter=100,\n", " use_graphite=False,\n", " use_mlflow=False\n", " ),\n", " deepr.hooks.SummarySaverHookFactory(save_steps=100),\n", " deepr.hooks.NumParamsHook(use_mlflow=False),\n", " deepr.hooks.LogVariablesInitHook(use_mlflow=False),\n", " deepr.hooks.StepsPerSecHook(\n", " name=\"training\", \n", " batch_size=32, \n", " every_n_steps=100, \n", " skip_after_step=1000, \n", " use_mlflow=False, \n", " use_graphite=False\n", " ),\n", " deepr.hooks.EarlyStoppingHookFactory(\n", " metric=\"loss\", \n", " mode=\"decrease\", \n", " max_steps_without_improvement=100, \n", " min_steps=500,\n", " run_every_steps=100,\n", " final_step=1000\n", " )\n", "]\n", "eval_hooks = [deepr.hooks.LoggingTensorHookFactory(name=\"validation\", at_end=True)]\n", "final_hooks = [deepr.hooks.LoggingTensorHookFactory(name=\"final_validation\", at_end=True)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A note to more experienced users : most of those hooks are simple wrappers / factories for native `tf.estimator` hooks so that they can be configured here and not directly in the `model_fn` of the estimator (for example the native `LoggingTensorHook` needs to be initialized with actual `Tensors`, that we obviously can't access at this level).\n", "\n", "However, the `EarlyStoppingHook`, though it reuses most of the official code, adds an important tweak: it allows you to set a `final_step`. If given, when early stopping, it will set the global step to that value. Why? Simply because this is currently the easiest way to signal the end of training in distributed settings mode, as other workers all know the maximum number of steps. When the chief broadcasts the final step, all know that it's time to stop." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exporters\n", "\n", "The latest argument that comes in handy is the `exporters` one. Now that the `tf.estimator` is created behind the scenes, we might want to do some things with it at the end of the training.\n", "\n", "Here, we do two things\n", "\n", "- use `BestCheckpoint` to select the best checkpoint based on the validation metrics, and change the `checkpoint` file of the `tf.estimator` to point to that specific checkpoint.\n", "- use `SavedModel` to export the `tf.estimator` as a `SavedModel`. As it runs after the `BestCheckpoint`, it will use the best checkpoint. Note that you need to define the input fields of your model (they are not currently inferred from the other parameters of the trainer, though we might add this in the future)." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "exporters = [\n", " deepr.exporters.BestCheckpoint(metric=\"loss\"),\n", " deepr.exporters.SavedModel(\n", " path_saved_model=\"model/saved_model\", \n", " fields=[\n", " deepr.utils.Field(name=\"x\", shape=(), dtype=\"float32\")\n", " ]\n", " )\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Full TrainerJob\n", "\n", "Now that we've defined the specs, metrics, hooks and exporters, we update our original `Trainer` job into" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "trainer_job = deepr.jobs.Trainer(\n", " path_model=\"model\", \n", " pred_fn=multiply.layers.Multiply(inputs=\"x\", outputs=\"y_pred\"), \n", " loss_fn=multiply.layers.SquaredL2(inputs=(\"y\", \"y_pred\"), outputs=\"loss\"),\n", " optimizer_fn=deepr.optimizers.TensorflowOptimizer(optimizer=\"Adam\", learning_rate=0.1),\n", " train_input_fn=deepr.readers.TFRecordReader(path=\"data.tfrecord\"),\n", " eval_input_fn=deepr.readers.TFRecordReader(path=\"data.tfrecord\"),\n", " prepro_fn=multiply.prepros.DefaultPrepro(batch_size=32, repeat_size=10),\n", " train_spec=train_spec,\n", " eval_spec=eval_spec,\n", " train_metrics=train_metrics,\n", " eval_metrics=eval_metrics,\n", " final_metrics=final_metrics,\n", " train_hooks=train_hooks,\n", " eval_hooks=eval_hooks,\n", " final_hooks=final_hooks,\n", " exporters=exporters\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup Job\n", "\n", "Now that we have a more powerful `Trainer` job, let's also review some additional jobs that might be added at the end of the training pipeline.\n", "\n", "One of them is the `CleanupCheckpoints` job : it does what it says it does, i.e. deleting the model's checkpoints (which can save you a lot of disk usage if you run thousands of experiments and have no need to reuse the checkpoints). Because we exported our model as a `SavedModel`, we probably don't need those." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "cleanup_checkpoints = deepr.jobs.CleanupCheckpoints(path_model=\"model\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## OptimizeSavedModel\n", "\n", "The `SavedModel` is by itself sufficient, but still has a few drawbacks\n", "\n", "- it's actually comprised of a few different files (the graph protobuffer, some files to store variable values)\n", "- also, it might contain parts of the graph that are not actually useful for inference.\n", "- finally, maybe the actual inputs of our graph will be intermediate nodes. For example, in NLP, if we have fine-tuned some embeddings and they are part of the graph, the `SavedModel` probably will expect the word indices. However, in a deployment scenario, the service producing embeddings might be independent from our model. In other words, during training, the graph inputs were the word indices. During inference, the graph inputs are the actual embeddings.\n", "\n", "We can do some of these optimizations using the [OptimizeSavedModel](https://criteo.github.io/deepr/API/_autosummary/deepr.jobs.OptimizeSavedModel.html#deepr.jobs.OptimizeSavedModel) job.\n", "\n", "It produces one self-contained file (a `.pb` file, like the `SavedModel`), that contains an updated version of the graph (only the part that produces some tensor `fetch` given some other tensors `feeds`), makes it possible to update the tensor's names, and adds the variable values directly inside the graph definition (effectively making them constants).\n", "\n", "Note to more experienced users: the `.pb` format has a limit in size, which means that Tensorflow graphs cannot be bigger than 2GB. If your model has too many parameters, the `OptimizeSavedModel` won't be able to produce one protobuffer file for your graph." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "optimize_saved_model = deepr.jobs.OptimizeSavedModel(\n", " path_saved_model=\"model/saved_model\",\n", " path_optimized_model=\"model/optimized_saved_model\",\n", " graph_name=\"_model.pb\",\n", " feeds=[\"inputs/x\"],\n", " fetch=\"y_pred\",\n", " new_names={\"x\": \"inputs/x\"}\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ExportXlaModelMetadata" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "An optional step in our pipeline can be to use XLA.\n", "\n", "[XLA](https://www.tensorflow.org/xla) is a technology that can compile a tensorflow graph to machine code and in the process optimize this graph even more.\n", "\n", "[ExportXlaModelMetadata](https://criteo.github.io/deepr/API/_autosummary/deepr.jobs.ExportXlaModelMetadata.html#deepr.jobs.ExportXlaModelMetadata) is a job that can create a metadata file that is optimized for XLA: it will fix all the shapes so XLA can do its best work." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "export_xla_model_metadata = deepr.jobs.ExportXlaModelMetadata(\n", " path_optimized_model= \"model/optimized_saved_model\",\n", " path_metadata=\"model/optimized_saved_model\",\n", " graph_name=\"_model.pb\",\n", " metadata_name=\"_meta.pbtxt\",\n", " feed_shapes = {\n", " 'inputs/x': (1,),\n", " },\n", " fetch_shapes = {\n", " 'y_pred': None,\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Predict Job\n", "\n", "Once the `OptimizeSavedModel` has run, how do we use it for inference? Usually, this file will be sent to some production service (probably not using python) in charge of using it to compute predictions. However, we might want to use it in python.\n", "\n", "The `Predict` job in `example` illustrates how to reload the `_model.pb` file and use it to compute predictions. All it does is print the predictions given some `input_fn`." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "predict_proto = multiply.jobs.PredictProto(\n", " path_model=\"model/optimized_saved_model\",\n", " graph_name=\"_model.pb\",\n", " input_fn=deepr.readers.TFRecordReader(path=\"data.tfrecord\"),\n", " prepro_fn=multiply.prepros.InferencePrepro(batch_size=1, count=2, inputs=\"inputs/x\"),\n", " feeds=\"inputs/x\",\n", " fetches=\"y_pred\",\n", ")" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "predict_saved_model = multiply.jobs.PredictSavedModel(\n", " path_saved_model=\"model/saved_model\",\n", " input_fn=deepr.readers.TFRecordReader(path=\"data.tfrecord\"),\n", " prepro_fn=multiply.prepros.InferencePrepro(batch_size=1, count=2, inputs=\"x\"),\n", " feeds=\"x\",\n", " fetches=\"y_pred\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Full Pipeline\n", "\n", "Now that we have a production-ready pipeline, let's run it!" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "pipeline = deepr.jobs.Pipeline([\n", " build_job, \n", " trainer_job, \n", " cleanup_checkpoints, \n", " optimize_saved_model, \n", " export_xla_model_metadata,\n", " predict_proto, \n", " predict_saved_model\n", "])" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO:deepr.examples.multiply.jobs.build:Wrote dataset to 'data.tfrecord'\n", "INFO:deepr.metrics.mean:DecayMean(decay=0.98, tensors=['loss'], pattern=None) -> loss\n", "INFO:deepr.hooks.num_params:Number of parameters (global) = 7\n", "INFO:deepr.hooks.num_params:Number of parameters (trainable) = 1\n", "INFO:deepr.hooks.log_variables_init:alpha_init_average_norm = 1.5546354055404663\n", "INFO:deepr.hooks.log_variables_init:alpha_init_num_zeros = 0\n", "INFO:deepr.hooks.logging_tensor:global_step = 1, loss = 116.3018036, num_steps = 1, max_memory_gb = 0.3282471, memory_gb = 0.3282471\n", "INFO:deepr.hooks.steps_per_sec:steps_per_sec = 372.45, examples_per_sec = 11918.43\n", "INFO:deepr.hooks.logging_tensor:global_step = 101, loss = 21.0533333, num_steps = 101, max_memory_gb = 0.3284912, memory_gb = 0.3284912\n", "INFO:deepr.hooks.steps_per_sec:steps_per_sec = 329.49, examples_per_sec = 10543.83\n", "INFO:deepr.hooks.logging_tensor:global_step = 201, loss = 2.7921832, num_steps = 201, max_memory_gb = 0.3284912, memory_gb = 0.3284912\n", "INFO:deepr.hooks.steps_per_sec:steps_per_sec = 481.47, examples_per_sec = 15406.98\n", "INFO:deepr.hooks.logging_tensor:global_step = 301, loss = 0.3702988, num_steps = 301, max_memory_gb = 0.3284912, memory_gb = 0.3284912\n", "INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)\n", "INFO:deepr.metrics.mean:Mean(tensors=['loss'], pattern=None) -> loss\n", "INFO:deepr.hooks.logging_tensor:global_step = 320, loss = 0.0000000\n", "INFO:deepr.exporters.best_checkpoint:Reloading summaries from model/checkpoints\n", "INFO:deepr.exporters.best_checkpoint:- 320: {'average_loss': 2.422635876631052e-12, 'loss': 2.422635876631052e-12}\n", "INFO:deepr.exporters.best_checkpoint:Best summary at step 320: {'average_loss': 2.422635876631052e-12, 'loss': 2.422635876631052e-12}\n", "INFO:deepr.exporters.best_checkpoint:Selected checkpoint 320\n", "INFO:deepr.jobs.trainer:Running final evaluation, using global_step = 320\n", "INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)\n", "INFO:deepr.metrics.mean:Mean(tensors=['loss'], pattern=None) -> loss\n", "INFO:deepr.hooks.logging_tensor:global_step = 320, loss = 0.0000000\n", "INFO:deepr.jobs.trainer:{'average_loss': 2.4226359e-12, 'loss': 2.4226359e-12, 'global_step': 320}\n", "INFO:deepr.jobs.cleanup_checkpoints:Cleanup checkpoints in model/checkpoints\n", "INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-320.meta\n", "INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-0.data-00000-of-00001\n", "INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-0.index\n", "INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-320.index\n", "INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-320.data-00000-of-00001\n", "INFO:deepr.jobs.cleanup_checkpoints:- Deleting model/checkpoints/model.ckpt-0.meta\n", "INFO:deepr.jobs.optimize_saved_model:Using SavedModel model/saved_model/1595845092\n", "INFO:deepr.jobs.optimize_saved_model:Node renamed: x -> inputs/x\n", "INFO:deepr.jobs.optimize_saved_model:Optimized Model successfully exported to model/optimized_saved_model/_model.pb\n", "INFO:deepr.jobs.export_xla_model_metadata:Metadata successfully saved to model/optimized_saved_model/_meta.pbtxt\n", "INFO:deepr.predictors.proto:Running init_all_tables\n", "INFO:deepr.examples.multiply.jobs.predict:{'inputs/x': array([0.58949345], dtype=float32), 'y_pred': array([1.1789867], dtype=float32)}\n", "INFO:deepr.examples.multiply.jobs.predict:{'inputs/x': array([0.9973951], dtype=float32), 'y_pred': array([1.9947897], dtype=float32)}\n", "INFO:deepr.examples.multiply.jobs.predict:{'x': array([0.58949345], dtype=float32), 'y_pred': array([1.1789867], dtype=float32)}\n", "INFO:deepr.examples.multiply.jobs.predict:{'x': array([0.9973951], dtype=float32), 'y_pred': array([1.9947897], dtype=float32)}\n" ] } ], "source": [ "pipeline.run()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }