{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install deepr[cpu]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Pipeline\n", "\n", "This notebook builds upon the model defined in the [quickstart](quickstart.ipynb).\n", "\n", "The goal of this notebook is to define a full pipeline that not only trains the model, but also builds the dataset, and run this pipeline on a yarn cluster.\n", "\n", "We'll see how to\n", "\n", "1. Define a custom job to build the dataset.\n", "2. Define a pipeline that builds and trains the model.\n", "3. Use configs to run the pipeline on yarn.\n", "\n", "First, some imports" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import logging\n", "import sys\n", "logging.basicConfig(level=logging.INFO, stream=sys.stdout)\n", "logging.getLogger(\"tensorflow\").setLevel(logging.CRITICAL)\n", "logging.getLogger(\"cluster_pack\").setLevel(logging.CRITICAL)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from dataclasses import dataclass\n", "\n", "import tensorflow as tf\n", "import numpy as np\n", "import deepr" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "if deepr.io.Path(\"model\").is_dir():\n", " deepr.io.Path(\"model\").delete_dir()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Custom Build Dataset Job\n", "\n", "The quickstart shortly introduced the concept of a job with the `Trainer` job.\n", "\n", "Real-life pipelines consist of multiple jobs. In our example, we want to define a special job that creates the dataset.\n", "\n", "Let's see how we would define a custom job that writes the dataset content in a tfrecord file." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Build Job" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class Build(deepr.jobs.Job):\n", " \"\"\"Build a dummy dataset of random (x, 2*x) as a tfrecord file\"\"\"\n", " \n", " path_dataset: str\n", " num_examples: int = 1000\n", " \n", " def run(self):\n", " \n", " def _generator_fn():\n", " for _ in range(self.num_examples):\n", " x = np.random.random()\n", " yield {\"x\": x, \"y\": 2 * x}\n", " \n", " def _dict_to_example(data):\n", " features = {\n", " \"x\": deepr.readers.float_feature([data[\"x\"]]),\n", " \"y\": deepr.readers.float_feature([data[\"y\"]])\n", " }\n", " example = tf.train.Example(features=tf.train.Features(feature=features))\n", " return example\n", "\n", " with tf.python_io.TFRecordWriter(self.path_dataset) as writer:\n", " for data in _generator_fn():\n", " example = _dict_to_example(data)\n", " writer.write(example.SerializeToString())\n", " \n", " print(f\"Wrote dataset to '{self.path_dataset}'\")" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "build_job = Build(path_dataset=\"data.tfrecord\", num_examples=1000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's run the job" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Wrote dataset to 'data.tfrecord'\n" ] } ], "source": [ "build_job.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepro\n", "\n", "Because the data is now stored in tfrecord files, the `prepro_fn` needs to deserialize the file's content.\n", "\n", "Let's define a preprocessor and check that everything works correctly with the dataset created by the `BuildDataset` job." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def DefaultPrepro(batch_size, repeat_size):\n", " return deepr.prepros.Serial(\n", " deepr.prepros.TFRecordSequenceExample(fields=[\n", " deepr.Field(name=\"x\", shape=(), dtype=tf.float32),\n", " deepr.Field(name=\"y\", shape=(), dtype=tf.float32)\n", " ]),\n", " deepr.prepros.Batch(batch_size=batch_size),\n", " deepr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `@prepro` decorator creates a class from the function that would be equivalent to \n", "\n", "```python\n", "class DefaultPrepro(deepr.prepros.Prepro):\n", "\n", " def __init__(self, batch_size, repeat_size):\n", " super().__init__()\n", " self.batch_size = batch_size\n", " self.repeat_size = repeat_size\n", " \n", " def apply(self, dataset: tf.data.Dataset, mode: str = None) -> tf.data.Dataset:\n", " prepro_fn = deepr.prepros.Serial(\n", " deepr.prepros.TFRecordSequenceExample(fields=[\n", " deepr.Field(name=\"x\", shape=(), dtype=tf.float32),\n", " deepr.Field(name=\"y\", shape=(), dtype=tf.float32)\n", " ]),\n", " deepr.prepros.Batch(batch_size=batch_size),\n", " deepr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),\n", " )\n", " return prepro_fn(dataset, mode)\n", "```\n", "\n", "One of the advantages of the decorator is that the body of the function `DefaultPrepro` does not get executed until the preprocessor is actually applied to the dataset.\n", "\n", "This lazy behavior is convenient when resources are created in the function (like tables), resources that should only be defined at runtime.\n", "\n", "\n", "Let's create an instance of `DefaultPrepro`" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "prepro_fn = DefaultPrepro(batch_size=32, repeat_size=10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Reader\n", "\n", "In the quickstart we used a `GeneratorReader`. With tfrecords, let's use a `TFRecordReader`." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "reader = deepr.readers.TFRecordReader(\"data.tfrecord\")" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'x': array([0.12249236, 0.19407886, 0.4287153 , 0.11553267, 0.00252286,\n", " 0.37583396, 0.9285378 , 0.11992821, 0.11691252, 0.54549456,\n", " 0.20779829, 0.22857946, 0.82357705, 0.6685327 , 0.3074787 ,\n", " 0.18468134, 0.77053934, 0.90410686, 0.00688817, 0.48377946,\n", " 0.01943498, 0.5244569 , 0.18175201, 0.25505018, 0.9191886 ,\n", " 0.33966148, 0.15110607, 0.10617658, 0.10038193, 0.87724835,\n", " 0.64753866, 0.6283632 ], dtype=float32), 'y': array([0.24498472, 0.38815773, 0.8574306 , 0.23106533, 0.00504571,\n", " 0.7516679 , 1.8570756 , 0.23985642, 0.23382504, 1.0909891 ,\n", " 0.41559657, 0.45715892, 1.6471541 , 1.3370655 , 0.6149574 ,\n", " 0.36936268, 1.5410787 , 1.8082137 , 0.01377634, 0.9675589 ,\n", " 0.03886997, 1.0489138 , 0.36350402, 0.51010036, 1.8383772 ,\n", " 0.67932296, 0.30221215, 0.21235317, 0.20076387, 1.7544967 ,\n", " 1.2950773 , 1.2567264 ], dtype=float32)}\n" ] } ], "source": [ "for batch in deepr.readers.base.from_dataset(prepro_fn(reader())):\n", " print(batch)\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Define a Pipeline\n", "\n", "So far, we have defined\n", "\n", "1. A custom `BuildDataset` job\n", "2. Custom layers `Multiply` and `SquaredL2` (in the [quickstart](quickstart.ipynb))\n", "3. A custom preprocessor `DefaultPrepro`\n", "\n", "\n", "We will need to make these classes available on the `pex` that will be shipped to yarn, so let's add them to a module living alongside the core library.\n", "\n", "For example,\n", "\n", "```\n", "deepr\n", "├── __init__.py\n", "├── core\n", "├── example\n", "│   ├── __init__.py\n", "│   ├── jobs\n", "│   │   ├── __init__.py\n", "│   │   └── build_dataset.py # BuildDataset\n", "│   ├── layers\n", "│   │   ├── __init__.py\n", "│   │   ├── loss.py # SquaredL2\n", "│   │   └── model.py # Multiply\n", "│   └── prepros\n", "│   ├── __init__.py\n", "│   └── default.py # DefaultPrepro\n", "```\n", "\n", "Now, these classes can easily be imported from anywhere.\n", "\n", "Let's replicate the quickstart by defining and running a full pipeline that builds the dataset and then trains a model." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO:faiss:Loading faiss.\n" ] } ], "source": [ "import deepr.examples.multiply as multiply" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "build_job = multiply.jobs.Build(path_dataset=\"data.tfrecord\", num_examples=1000)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "trainer_job = deepr.jobs.Trainer(\n", " path_model=\"model\", \n", " pred_fn=multiply.layers.Multiply(), \n", " loss_fn=multiply.layers.SquaredL2(),\n", " optimizer_fn=deepr.optimizers.TensorflowOptimizer(\"Adam\", 0.1),\n", " train_input_fn=deepr.readers.TFRecordReader(\"data.tfrecord\"),\n", " eval_input_fn=deepr.readers.TFRecordReader(\"data.tfrecord\"),\n", " prepro_fn=multiply.prepros.DefaultPrepro(batch_size=32, repeat_size=10)\n", ")" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "pipeline = deepr.jobs.Pipeline([build_job, trainer_job])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The pipeline is made of 2 jobs\n", "\n", "1. The `BuildDataset` that creates the `tfrecord` file\n", "2. The `Trainer` that trains the model\n", "\n", "We can simply run it with" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO:deepr.examples.multiply.jobs.build:Wrote dataset to 'data.tfrecord'\n", "INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)\n", "INFO:deepr.jobs.trainer:Running final evaluation, using global_step = 320\n", "INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)\n", "INFO:deepr.jobs.trainer:{'loss': 0.0, 'global_step': 320}\n" ] } ], "source": [ "pipeline.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Run on Yarn\n", "\n", "We can't just submit python objects on yarn.\n", "\n", "We need to parametrize the execution. Though this could be done in a ad-hoc manner using custom entry points, you can use the `config` capabilities.\n", "\n", "To read more about the config system, see the [config introduction](config.ipynb).\n", "\n", "\n", "In short, you can define arbitrary trees of objects using dictionaries. The special key \"type\" contains the full import string of the object's class. Other keys will be given as keyword arguments at instantiation time." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Build Job" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "build_job_config = {\n", " \"type\": \"deepr.examples.multiply.jobs.Build\",\n", " \"path_dataset\": \"viewfs://root/user/deepr/dev/example/data.tfrecord\",\n", " \"num_examples\": 1000\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Reader" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "reader_config = {\n", " \"type\": \"deepr.readers.TFRecordReader\",\n", " \"path\": \"viewfs://root/user/deepr/dev/example/data.tfrecord\",\n", " \"num_parallel_reads\": 8,\n", " \"num_parallel_calls\": 8,\n", " \"shuffle\": True\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepro" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "prepro_fn_config = {\n", " \"type\": \"deepr.examples.multiply.prepros.DefaultPrepro\",\n", " \"batch_size\": 32,\n", " \"repeat_size\": 10\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prediction Function" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "pred_fn_config = {\"type\": \"deepr.examples.multiply.layers.Multiply\"}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Loss Function" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "loss_fn_config = {\"type\": \"deepr.examples.multiply.layers.SquaredL2\"}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Trainer Job\n", "\n", "This is a good example of what a nested config looks like" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "trainer_job_config = {\n", " \"type\": \"deepr.jobs.Trainer\",\n", " \"path_model\": \"viewfs://root/user/deepr/dev/example/model\",\n", " \"pred_fn\": pred_fn_config,\n", " \"loss_fn\": loss_fn_config,\n", " \"optimizer_fn\": {\n", " \"type\": \"deepr.optimizers.TensorflowOptimizer\",\n", " \"optimizer\": \"Adam\",\n", " \"learning_rate\": 0.1\n", " },\n", " \"prepro_fn\": prepro_fn_config,\n", " \"train_input_fn\": reader_config,\n", " \"eval_input_fn\": reader_config\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train Locally\n", "\n", "We can use these configs to re-instantiate the objects using the `from_config` function, which supports arbitrary nesting of configs.\n", "\n", "For example, we can re-create the build and trainer jobs with " ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "build_job = deepr.from_config(build_job_config)" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "trainer_job = deepr.from_config(trainer_job_config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "and define a new pipeline, that we could then run like above" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "pipeline = deepr.jobs.Pipeline([build_job, trainer_job])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Instead of training locally (something we've already done twice), let's see how we can leverage the configs to execute the pipeline on yarn." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train on Yarn\n", "\n", "Let's not run any code on the local machine, but instead submit the pipeline to a `yarn` machine.\n", "\n", "Also, instead of running the trainer job on the same machine as the build job, let's use `tf_yarn` distributed training capabilities and launch the trainer job on other yarn machines.\n", "\n", "To submit jobs on yarn, it's actually as simple as wrapping job configs into special jobs.\n", "\n", "- `YarnLauncher`: submits a job to yarn\n", "- `YarnTrainer`: uses `tf_yarn` to run a `Trainer` job on multiple machines\n", "\n", "\n", "Let's do it" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "yarn_launcher_config = deepr.jobs.YarnLauncherConfig(\n", " path_pex_prefix=\"viewfs://root/user/deepr/dev/example/envs\"\n", ")\n", "job_config = {\n", " \"type\": \"deepr.jobs.Pipeline\",\n", " \"jobs\": [\n", " build_job_config,\n", " {\n", " \"type\": \"deepr.jobs.YarnTrainer\",\n", " \"trainer\": {\n", " **trainer_job_config, \n", " \"eval\": None # from_config will not instantiate the trainer argument\n", " },\n", " \"config\": {\n", " \"type\": \"deepr.jobs.YarnTrainerConfig\"\n", " }\n", " }\n", " ]\n", "}\n", "pipeline_yarn = deepr.jobs.YarnLauncher(config=yarn_launcher_config, job=job_config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the `YarnLauncher` job is defined, we can run it. \n", "\n", "It uploads the current environment as a `pex` to HDFS using the settings provided by the `DefaultYarnLauncherConfig`, and then executes the job from its config by simply doing something equivalent to what we did above, i.e. `from_config(job).run()`." ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "HAS_HADOOP = False" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "scrolled": true }, "outputs": [], "source": [ "if HAS_HADOOP:\n", " pipeline_yarn.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When the job completes, it only means that the job was successfully submitted to yarn. We need to wait for the job to finish.\n", "\n", "After a few minutes, we can check that the build and training jobs ran successfully by looking at the files on the HDFS!" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "if HAS_HADOOP:\n", " list(deepr.io.Path(\"viewfs://root/user/deepr/dev/example\").glob(\"*\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using config files\n", "\n", "Because it is sometimes convenient to commit config files for reproducibility and production, it is possible (and recommended) to store configs as `.json` files.\n", "\n", "\n", "A convenient way to compose configs (similar to what we did by defining different dictionaries before putting them together) is to use [jsonnet](https://jsonnet.org/).\n", "\n", "For example, we can define a file `build.jsonnet` like so\n", "\n", "```json\n", "{\n", " \"type\": \"deepr.examples.multiply.jobs.BuildDataset\",\n", " \"path_dataset\": \"viewfs://root/user/deepr/dev/example/data.tfrecord\",\n", " \"num_examples\": 1000\n", "}\n", "```\n", "\n", "and import it into our pipline config file `config.jsonnet` with\n", "\n", "```json\n", "local build = import 'build.jsonnet';\n", "{\n", " \"type\": \"deepr.jobs.YarnLauncher\",\n", " \"config\": {\n", " \"type\": \"deepr.jobs.YarnLauncherConfig\",\n", " },\n", " \"job\": build\n", "}\n", "```\n", "\n", "You can run config files defining jobs with\n", "\n", "```bash\n", "deepr run config.jsonnet\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 4 }