[ ]:
!pip install deepr[cpu]

Pipeline

This notebook builds upon the model defined in the quickstart.

The goal of this notebook is to define a full pipeline that not only trains the model, but also builds the dataset, and run this pipeline on a yarn cluster.

We’ll see how to

  1. Define a custom job to build the dataset.

  2. Define a pipeline that builds and trains the model.

  3. Use configs to run the pipeline on yarn.

First, some imports

[1]:
import logging
import sys
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logging.getLogger("tensorflow").setLevel(logging.CRITICAL)
logging.getLogger("cluster_pack").setLevel(logging.CRITICAL)
[2]:
from dataclasses import dataclass

import tensorflow as tf
import numpy as np
import deepr
[3]:
if deepr.io.Path("model").is_dir():
    deepr.io.Path("model").delete_dir()

1. Custom Build Dataset Job

The quickstart shortly introduced the concept of a job with the Trainer job.

Real-life pipelines consist of multiple jobs. In our example, we want to define a special job that creates the dataset.

Let’s see how we would define a custom job that writes the dataset content in a tfrecord file.

Build Job

[4]:
@dataclass
class Build(deepr.jobs.Job):
    """Build a dummy dataset of random (x, 2*x) as a tfrecord file"""

    path_dataset: str
    num_examples: int = 1000

    def run(self):

        def _generator_fn():
            for _ in range(self.num_examples):
                x = np.random.random()
                yield {"x": x, "y": 2 * x}

        def _dict_to_example(data):
            features = {
                "x": deepr.readers.float_feature([data["x"]]),
                "y": deepr.readers.float_feature([data["y"]])
            }
            example = tf.train.Example(features=tf.train.Features(feature=features))
            return example

        with tf.python_io.TFRecordWriter(self.path_dataset) as writer:
            for data in _generator_fn():
                example = _dict_to_example(data)
                writer.write(example.SerializeToString())

        print(f"Wrote dataset to '{self.path_dataset}'")
[5]:
build_job = Build(path_dataset="data.tfrecord", num_examples=1000)

Let’s run the job

[6]:
build_job.run()
Wrote dataset to 'data.tfrecord'

Prepro

Because the data is now stored in tfrecord files, the prepro_fn needs to deserialize the file’s content.

Let’s define a preprocessor and check that everything works correctly with the dataset created by the BuildDataset job.

[7]:
def DefaultPrepro(batch_size, repeat_size):
    return deepr.prepros.Serial(
        deepr.prepros.TFRecordSequenceExample(fields=[
            deepr.Field(name="x", shape=(), dtype=tf.float32),
            deepr.Field(name="y", shape=(), dtype=tf.float32)
        ]),
        deepr.prepros.Batch(batch_size=batch_size),
        deepr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),
    )

The @prepro decorator creates a class from the function that would be equivalent to

class DefaultPrepro(deepr.prepros.Prepro):

    def __init__(self, batch_size, repeat_size):
        super().__init__()
        self.batch_size = batch_size
        self.repeat_size = repeat_size

    def apply(self, dataset: tf.data.Dataset, mode: str = None) -> tf.data.Dataset:
        prepro_fn = deepr.prepros.Serial(
            deepr.prepros.TFRecordSequenceExample(fields=[
                deepr.Field(name="x", shape=(), dtype=tf.float32),
                deepr.Field(name="y", shape=(), dtype=tf.float32)
            ]),
            deepr.prepros.Batch(batch_size=batch_size),
            deepr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),
        )
        return prepro_fn(dataset, mode)

One of the advantages of the decorator is that the body of the function DefaultPrepro does not get executed until the preprocessor is actually applied to the dataset.

This lazy behavior is convenient when resources are created in the function (like tables), resources that should only be defined at runtime.

Let’s create an instance of DefaultPrepro

[8]:
prepro_fn = DefaultPrepro(batch_size=32, repeat_size=10)

Reader

In the quickstart we used a GeneratorReader. With tfrecords, let’s use a TFRecordReader.

[9]:
reader = deepr.readers.TFRecordReader("data.tfrecord")
[10]:
for batch in deepr.readers.base.from_dataset(prepro_fn(reader())):
    print(batch)
    break
{'x': array([0.12249236, 0.19407886, 0.4287153 , 0.11553267, 0.00252286,
       0.37583396, 0.9285378 , 0.11992821, 0.11691252, 0.54549456,
       0.20779829, 0.22857946, 0.82357705, 0.6685327 , 0.3074787 ,
       0.18468134, 0.77053934, 0.90410686, 0.00688817, 0.48377946,
       0.01943498, 0.5244569 , 0.18175201, 0.25505018, 0.9191886 ,
       0.33966148, 0.15110607, 0.10617658, 0.10038193, 0.87724835,
       0.64753866, 0.6283632 ], dtype=float32), 'y': array([0.24498472, 0.38815773, 0.8574306 , 0.23106533, 0.00504571,
       0.7516679 , 1.8570756 , 0.23985642, 0.23382504, 1.0909891 ,
       0.41559657, 0.45715892, 1.6471541 , 1.3370655 , 0.6149574 ,
       0.36936268, 1.5410787 , 1.8082137 , 0.01377634, 0.9675589 ,
       0.03886997, 1.0489138 , 0.36350402, 0.51010036, 1.8383772 ,
       0.67932296, 0.30221215, 0.21235317, 0.20076387, 1.7544967 ,
       1.2950773 , 1.2567264 ], dtype=float32)}

2. Define a Pipeline

So far, we have defined

  1. A custom BuildDataset job

  2. Custom layers Multiply and SquaredL2 (in the quickstart)

  3. A custom preprocessor DefaultPrepro

We will need to make these classes available on the pex that will be shipped to yarn, so let’s add them to a module living alongside the core library.

For example,

deepr
├── __init__.py
├── core
├── example
│   ├── __init__.py
│   ├── jobs
│   │   ├── __init__.py
│   │   └── build_dataset.py  # BuildDataset
│   ├── layers
│   │   ├── __init__.py
│   │   ├── loss.py           # SquaredL2
│   │   └── model.py          # Multiply
│   └── prepros
│       ├── __init__.py
│       └── default.py        # DefaultPrepro

Now, these classes can easily be imported from anywhere.

Let’s replicate the quickstart by defining and running a full pipeline that builds the dataset and then trains a model.

[11]:
import deepr.examples.multiply as multiply
INFO:faiss:Loading faiss.
[12]:
build_job = multiply.jobs.Build(path_dataset="data.tfrecord", num_examples=1000)
[13]:
trainer_job = deepr.jobs.Trainer(
    path_model="model",
    pred_fn=multiply.layers.Multiply(),
    loss_fn=multiply.layers.SquaredL2(),
    optimizer_fn=deepr.optimizers.TensorflowOptimizer("Adam", 0.1),
    train_input_fn=deepr.readers.TFRecordReader("data.tfrecord"),
    eval_input_fn=deepr.readers.TFRecordReader("data.tfrecord"),
    prepro_fn=multiply.prepros.DefaultPrepro(batch_size=32, repeat_size=10)
)
[14]:
pipeline = deepr.jobs.Pipeline([build_job, trainer_job])

The pipeline is made of 2 jobs

  1. The BuildDataset that creates the tfrecord file

  2. The Trainer that trains the model

We can simply run it with

[15]:
pipeline.run()
INFO:deepr.examples.multiply.jobs.build:Wrote dataset to 'data.tfrecord'
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.jobs.trainer:Running final evaluation, using global_step = 320
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.jobs.trainer:{'loss': 0.0, 'global_step': 320}

3. Run on Yarn

We can’t just submit python objects on yarn.

We need to parametrize the execution. Though this could be done in a ad-hoc manner using custom entry points, you can use the config capabilities.

To read more about the config system, see the config introduction.

In short, you can define arbitrary trees of objects using dictionaries. The special key “type” contains the full import string of the object’s class. Other keys will be given as keyword arguments at instantiation time.

Build Job

[16]:
build_job_config = {
    "type": "deepr.examples.multiply.jobs.Build",
    "path_dataset": "viewfs://root/user/deepr/dev/example/data.tfrecord",
    "num_examples": 1000
}

Reader

[17]:
reader_config = {
    "type": "deepr.readers.TFRecordReader",
    "path": "viewfs://root/user/deepr/dev/example/data.tfrecord",
    "num_parallel_reads": 8,
    "num_parallel_calls": 8,
    "shuffle": True
}

Prepro

[18]:
prepro_fn_config = {
    "type": "deepr.examples.multiply.prepros.DefaultPrepro",
    "batch_size": 32,
    "repeat_size": 10
}

Prediction Function

[19]:
pred_fn_config = {"type": "deepr.examples.multiply.layers.Multiply"}

Loss Function

[20]:
loss_fn_config = {"type": "deepr.examples.multiply.layers.SquaredL2"}

Trainer Job

This is a good example of what a nested config looks like

[21]:
trainer_job_config = {
    "type": "deepr.jobs.Trainer",
    "path_model": "viewfs://root/user/deepr/dev/example/model",
    "pred_fn": pred_fn_config,
    "loss_fn": loss_fn_config,
    "optimizer_fn": {
        "type": "deepr.optimizers.TensorflowOptimizer",
        "optimizer": "Adam",
        "learning_rate": 0.1
    },
    "prepro_fn": prepro_fn_config,
    "train_input_fn": reader_config,
    "eval_input_fn": reader_config
}

Train Locally

We can use these configs to re-instantiate the objects using the from_config function, which supports arbitrary nesting of configs.

For example, we can re-create the build and trainer jobs with

[22]:
build_job = deepr.from_config(build_job_config)
[23]:
trainer_job = deepr.from_config(trainer_job_config)

and define a new pipeline, that we could then run like above

[24]:
pipeline = deepr.jobs.Pipeline([build_job, trainer_job])

Instead of training locally (something we’ve already done twice), let’s see how we can leverage the configs to execute the pipeline on yarn.

Train on Yarn

Let’s not run any code on the local machine, but instead submit the pipeline to a yarn machine.

Also, instead of running the trainer job on the same machine as the build job, let’s use tf_yarn distributed training capabilities and launch the trainer job on other yarn machines.

To submit jobs on yarn, it’s actually as simple as wrapping job configs into special jobs.

  • YarnLauncher: submits a job to yarn

  • YarnTrainer: uses tf_yarn to run a Trainer job on multiple machines

Let’s do it

[25]:
yarn_launcher_config = deepr.jobs.YarnLauncherConfig(
    path_pex_prefix="viewfs://root/user/deepr/dev/example/envs"
)
job_config = {
    "type": "deepr.jobs.Pipeline",
    "jobs": [
        build_job_config,
        {
            "type": "deepr.jobs.YarnTrainer",
            "trainer": {
                **trainer_job_config,
                "eval": None  # from_config will not instantiate the trainer argument
            },
            "config": {
                "type": "deepr.jobs.YarnTrainerConfig"
            }
        }
    ]
}
pipeline_yarn = deepr.jobs.YarnLauncher(config=yarn_launcher_config, job=job_config)

Once the YarnLauncher job is defined, we can run it.

It uploads the current environment as a pex to HDFS using the settings provided by the DefaultYarnLauncherConfig, and then executes the job from its config by simply doing something equivalent to what we did above, i.e. from_config(job).run().

[26]:
HAS_HADOOP = False
[27]:
if HAS_HADOOP:
    pipeline_yarn.run()

When the job completes, it only means that the job was successfully submitted to yarn. We need to wait for the job to finish.

After a few minutes, we can check that the build and training jobs ran successfully by looking at the files on the HDFS!

[28]:
if HAS_HADOOP:
    list(deepr.io.Path("viewfs://root/user/deepr/dev/example").glob("*"))

Using config files

Because it is sometimes convenient to commit config files for reproducibility and production, it is possible (and recommended) to store configs as .json files.

A convenient way to compose configs (similar to what we did by defining different dictionaries before putting them together) is to use jsonnet.

For example, we can define a file build.jsonnet like so

{
    "type": "deepr.examples.multiply.jobs.BuildDataset",
    "path_dataset": "viewfs://root/user/deepr/dev/example/data.tfrecord",
    "num_examples": 1000
}

and import it into our pipline config file config.jsonnet with

local build = import 'build.jsonnet';
{
    "type": "deepr.jobs.YarnLauncher",
    "config": {
        "type": "deepr.jobs.YarnLauncherConfig",
    },
    "job": build
}

You can run config files defining jobs with

deepr run config.jsonnet