[ ]:
!pip install deepr[cpu]
Pipeline
This notebook builds upon the model defined in the quickstart.
The goal of this notebook is to define a full pipeline that not only trains the model, but also builds the dataset, and run this pipeline on a yarn cluster.
We’ll see how to
Define a custom job to build the dataset.
Define a pipeline that builds and trains the model.
Use configs to run the pipeline on yarn.
First, some imports
[1]:
import logging
import sys
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logging.getLogger("tensorflow").setLevel(logging.CRITICAL)
logging.getLogger("cluster_pack").setLevel(logging.CRITICAL)
[2]:
from dataclasses import dataclass
import tensorflow as tf
import numpy as np
import deepr
[3]:
if deepr.io.Path("model").is_dir():
deepr.io.Path("model").delete_dir()
1. Custom Build Dataset Job
The quickstart shortly introduced the concept of a job with the Trainer
job.
Real-life pipelines consist of multiple jobs. In our example, we want to define a special job that creates the dataset.
Let’s see how we would define a custom job that writes the dataset content in a tfrecord file.
Build Job
[4]:
@dataclass
class Build(deepr.jobs.Job):
"""Build a dummy dataset of random (x, 2*x) as a tfrecord file"""
path_dataset: str
num_examples: int = 1000
def run(self):
def _generator_fn():
for _ in range(self.num_examples):
x = np.random.random()
yield {"x": x, "y": 2 * x}
def _dict_to_example(data):
features = {
"x": deepr.readers.float_feature([data["x"]]),
"y": deepr.readers.float_feature([data["y"]])
}
example = tf.train.Example(features=tf.train.Features(feature=features))
return example
with tf.python_io.TFRecordWriter(self.path_dataset) as writer:
for data in _generator_fn():
example = _dict_to_example(data)
writer.write(example.SerializeToString())
print(f"Wrote dataset to '{self.path_dataset}'")
[5]:
build_job = Build(path_dataset="data.tfrecord", num_examples=1000)
Let’s run the job
[6]:
build_job.run()
Wrote dataset to 'data.tfrecord'
Prepro
Because the data is now stored in tfrecord files, the prepro_fn
needs to deserialize the file’s content.
Let’s define a preprocessor and check that everything works correctly with the dataset created by the BuildDataset
job.
[7]:
def DefaultPrepro(batch_size, repeat_size):
return deepr.prepros.Serial(
deepr.prepros.TFRecordSequenceExample(fields=[
deepr.Field(name="x", shape=(), dtype=tf.float32),
deepr.Field(name="y", shape=(), dtype=tf.float32)
]),
deepr.prepros.Batch(batch_size=batch_size),
deepr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),
)
The @prepro
decorator creates a class from the function that would be equivalent to
class DefaultPrepro(deepr.prepros.Prepro):
def __init__(self, batch_size, repeat_size):
super().__init__()
self.batch_size = batch_size
self.repeat_size = repeat_size
def apply(self, dataset: tf.data.Dataset, mode: str = None) -> tf.data.Dataset:
prepro_fn = deepr.prepros.Serial(
deepr.prepros.TFRecordSequenceExample(fields=[
deepr.Field(name="x", shape=(), dtype=tf.float32),
deepr.Field(name="y", shape=(), dtype=tf.float32)
]),
deepr.prepros.Batch(batch_size=batch_size),
deepr.prepros.Repeat(repeat_size, modes=[tf.estimator.ModeKeys.TRAIN]),
)
return prepro_fn(dataset, mode)
One of the advantages of the decorator is that the body of the function DefaultPrepro
does not get executed until the preprocessor is actually applied to the dataset.
This lazy behavior is convenient when resources are created in the function (like tables), resources that should only be defined at runtime.
Let’s create an instance of DefaultPrepro
[8]:
prepro_fn = DefaultPrepro(batch_size=32, repeat_size=10)
Reader
In the quickstart we used a GeneratorReader
. With tfrecords, let’s use a TFRecordReader
.
[9]:
reader = deepr.readers.TFRecordReader("data.tfrecord")
[10]:
for batch in deepr.readers.base.from_dataset(prepro_fn(reader())):
print(batch)
break
{'x': array([0.12249236, 0.19407886, 0.4287153 , 0.11553267, 0.00252286,
0.37583396, 0.9285378 , 0.11992821, 0.11691252, 0.54549456,
0.20779829, 0.22857946, 0.82357705, 0.6685327 , 0.3074787 ,
0.18468134, 0.77053934, 0.90410686, 0.00688817, 0.48377946,
0.01943498, 0.5244569 , 0.18175201, 0.25505018, 0.9191886 ,
0.33966148, 0.15110607, 0.10617658, 0.10038193, 0.87724835,
0.64753866, 0.6283632 ], dtype=float32), 'y': array([0.24498472, 0.38815773, 0.8574306 , 0.23106533, 0.00504571,
0.7516679 , 1.8570756 , 0.23985642, 0.23382504, 1.0909891 ,
0.41559657, 0.45715892, 1.6471541 , 1.3370655 , 0.6149574 ,
0.36936268, 1.5410787 , 1.8082137 , 0.01377634, 0.9675589 ,
0.03886997, 1.0489138 , 0.36350402, 0.51010036, 1.8383772 ,
0.67932296, 0.30221215, 0.21235317, 0.20076387, 1.7544967 ,
1.2950773 , 1.2567264 ], dtype=float32)}
2. Define a Pipeline
So far, we have defined
A custom
BuildDataset
jobCustom layers
Multiply
andSquaredL2
(in the quickstart)A custom preprocessor
DefaultPrepro
We will need to make these classes available on the pex
that will be shipped to yarn, so let’s add them to a module living alongside the core library.
For example,
deepr
├── __init__.py
├── core
├── example
│ ├── __init__.py
│ ├── jobs
│ │ ├── __init__.py
│ │ └── build_dataset.py # BuildDataset
│ ├── layers
│ │ ├── __init__.py
│ │ ├── loss.py # SquaredL2
│ │ └── model.py # Multiply
│ └── prepros
│ ├── __init__.py
│ └── default.py # DefaultPrepro
Now, these classes can easily be imported from anywhere.
Let’s replicate the quickstart by defining and running a full pipeline that builds the dataset and then trains a model.
[11]:
import deepr.examples.multiply as multiply
INFO:faiss:Loading faiss.
[12]:
build_job = multiply.jobs.Build(path_dataset="data.tfrecord", num_examples=1000)
[13]:
trainer_job = deepr.jobs.Trainer(
path_model="model",
pred_fn=multiply.layers.Multiply(),
loss_fn=multiply.layers.SquaredL2(),
optimizer_fn=deepr.optimizers.TensorflowOptimizer("Adam", 0.1),
train_input_fn=deepr.readers.TFRecordReader("data.tfrecord"),
eval_input_fn=deepr.readers.TFRecordReader("data.tfrecord"),
prepro_fn=multiply.prepros.DefaultPrepro(batch_size=32, repeat_size=10)
)
[14]:
pipeline = deepr.jobs.Pipeline([build_job, trainer_job])
The pipeline is made of 2 jobs
The
BuildDataset
that creates thetfrecord
fileThe
Trainer
that trains the model
We can simply run it with
[15]:
pipeline.run()
INFO:deepr.examples.multiply.jobs.build:Wrote dataset to 'data.tfrecord'
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.jobs.trainer:Running final evaluation, using global_step = 320
INFO:deepr.prepros.core:Not applying Repeat(10) (mode=eval)
INFO:deepr.jobs.trainer:{'loss': 0.0, 'global_step': 320}
3. Run on Yarn
We can’t just submit python objects on yarn.
We need to parametrize the execution. Though this could be done in a ad-hoc manner using custom entry points, you can use the config
capabilities.
To read more about the config system, see the config introduction.
In short, you can define arbitrary trees of objects using dictionaries. The special key “type” contains the full import string of the object’s class. Other keys will be given as keyword arguments at instantiation time.
Build Job
[16]:
build_job_config = {
"type": "deepr.examples.multiply.jobs.Build",
"path_dataset": "viewfs://root/user/deepr/dev/example/data.tfrecord",
"num_examples": 1000
}
Reader
[17]:
reader_config = {
"type": "deepr.readers.TFRecordReader",
"path": "viewfs://root/user/deepr/dev/example/data.tfrecord",
"num_parallel_reads": 8,
"num_parallel_calls": 8,
"shuffle": True
}
Prepro
[18]:
prepro_fn_config = {
"type": "deepr.examples.multiply.prepros.DefaultPrepro",
"batch_size": 32,
"repeat_size": 10
}
Prediction Function
[19]:
pred_fn_config = {"type": "deepr.examples.multiply.layers.Multiply"}
Loss Function
[20]:
loss_fn_config = {"type": "deepr.examples.multiply.layers.SquaredL2"}
Trainer Job
This is a good example of what a nested config looks like
[21]:
trainer_job_config = {
"type": "deepr.jobs.Trainer",
"path_model": "viewfs://root/user/deepr/dev/example/model",
"pred_fn": pred_fn_config,
"loss_fn": loss_fn_config,
"optimizer_fn": {
"type": "deepr.optimizers.TensorflowOptimizer",
"optimizer": "Adam",
"learning_rate": 0.1
},
"prepro_fn": prepro_fn_config,
"train_input_fn": reader_config,
"eval_input_fn": reader_config
}
Train Locally
We can use these configs to re-instantiate the objects using the from_config
function, which supports arbitrary nesting of configs.
For example, we can re-create the build and trainer jobs with
[22]:
build_job = deepr.from_config(build_job_config)
[23]:
trainer_job = deepr.from_config(trainer_job_config)
and define a new pipeline, that we could then run like above
[24]:
pipeline = deepr.jobs.Pipeline([build_job, trainer_job])
Instead of training locally (something we’ve already done twice), let’s see how we can leverage the configs to execute the pipeline on yarn.
Train on Yarn
Let’s not run any code on the local machine, but instead submit the pipeline to a yarn
machine.
Also, instead of running the trainer job on the same machine as the build job, let’s use tf_yarn
distributed training capabilities and launch the trainer job on other yarn machines.
To submit jobs on yarn, it’s actually as simple as wrapping job configs into special jobs.
YarnLauncher
: submits a job to yarnYarnTrainer
: usestf_yarn
to run aTrainer
job on multiple machines
Let’s do it
[25]:
yarn_launcher_config = deepr.jobs.YarnLauncherConfig(
path_pex_prefix="viewfs://root/user/deepr/dev/example/envs"
)
job_config = {
"type": "deepr.jobs.Pipeline",
"jobs": [
build_job_config,
{
"type": "deepr.jobs.YarnTrainer",
"trainer": {
**trainer_job_config,
"eval": None # from_config will not instantiate the trainer argument
},
"config": {
"type": "deepr.jobs.YarnTrainerConfig"
}
}
]
}
pipeline_yarn = deepr.jobs.YarnLauncher(config=yarn_launcher_config, job=job_config)
Once the YarnLauncher
job is defined, we can run it.
It uploads the current environment as a pex
to HDFS using the settings provided by the DefaultYarnLauncherConfig
, and then executes the job from its config by simply doing something equivalent to what we did above, i.e. from_config(job).run()
.
[26]:
HAS_HADOOP = False
[27]:
if HAS_HADOOP:
pipeline_yarn.run()
When the job completes, it only means that the job was successfully submitted to yarn. We need to wait for the job to finish.
After a few minutes, we can check that the build and training jobs ran successfully by looking at the files on the HDFS!
[28]:
if HAS_HADOOP:
list(deepr.io.Path("viewfs://root/user/deepr/dev/example").glob("*"))
Using config files
Because it is sometimes convenient to commit config files for reproducibility and production, it is possible (and recommended) to store configs as .json
files.
A convenient way to compose configs (similar to what we did by defining different dictionaries before putting them together) is to use jsonnet.
For example, we can define a file build.jsonnet
like so
{
"type": "deepr.examples.multiply.jobs.BuildDataset",
"path_dataset": "viewfs://root/user/deepr/dev/example/data.tfrecord",
"num_examples": 1000
}
and import it into our pipline config file config.jsonnet
with
local build = import 'build.jsonnet';
{
"type": "deepr.jobs.YarnLauncher",
"config": {
"type": "deepr.jobs.YarnLauncherConfig",
},
"job": build
}
You can run config files defining jobs with
deepr run config.jsonnet