deepr.jobs package

Submodules

deepr.jobs.base module

Interface for jobs

class deepr.jobs.base.Job[source]

Bases: ABC

Interface for jobs

abstract run()[source]

Run Job

deepr.jobs.cleanup_checkpoints module

Cleanup Checkpoints in path_model

class deepr.jobs.cleanup_checkpoints.CleanupCheckpoints(path_model, path_checkpoints='checkpoints')[source]

Bases: Job

Cleanup Checkpoints in path_model

path_checkpoints: str = 'checkpoints'
path_model: str
run()[source]

Run Job

deepr.jobs.combinators module

Pipeline

class deepr.jobs.combinators.Pipeline(jobs)[source]

Bases: Job

Pipeline, executes list of jobs in order

jobs: List[Union[Job, Callable]]
run()[source]

Run Job

deepr.jobs.copy_dir module

Copy Directory

class deepr.jobs.copy_dir.CopyDir(source, target, skip_copy=False, overwrite=True)[source]

Bases: Job

Copy Directory. Overwrite destination by default

overwrite: bool = True
run()[source]

Run Job

skip_copy: bool = False
source: str
target: str

deepr.jobs.evaluate module

Evaluate Job.

class deepr.jobs.evaluate.Evaluate(path_model, pred_fn, loss_fn, input_fn, prepro_fn=<function Evaluate.<lambda>>, metrics=<factory>, hooks=<factory>, spec=<factory>)[source]

Bases: Job

Evaluate Job.

hooks: List
input_fn: Callable[DatasetV1]
loss_fn: Callable[[Dict[str, Tensor], str], Dict[str, tensorflow.python.framework.ops.Tensor]]
metrics: List[Callable]
path_model: str
pred_fn: Callable[[Dict[str, Tensor], str], Dict[str, tensorflow.python.framework.ops.Tensor]]
prepro_fn(_)
run()[source]

Run Job

spec: Dict

deepr.jobs.export_xla_model_metadata module

Export xla compatible model metadata from a saved model

class deepr.jobs.export_xla_model_metadata.ExportXlaModelMetadata(path_optimized_model, path_metadata, graph_name, metadata_name, feed_shapes, fetch_shapes)[source]

Bases: Job

Export xla compatible model metadata from a saved model

path_optimized_model

Path to directory containing optimized saved model exports to convert

Type:

str

path_metadata

Path to directory that will contain the metadata

Type:

str

graph_name

Name of the saved model graph (name of the protobuf file)

Type:

str

metadata_name

Name of the metadata file

Type:

str

feed_shapes

Shapes of feeds to expose

Type:

Dict[str, List[int]]

fetch_shapes

Shapes of fetches to expose

Type:

Dict[str, List[int]]

feed_shapes: Dict[str, List[int]]
fetch_shapes: Dict[str, List[int]]
graph_name: str
metadata_name: str
path_metadata: str
path_optimized_model: str
run()[source]

Run Job

deepr.jobs.export_xla_model_metadata.add_metadata_item(item, node, target_shape=None)[source]
deepr.jobs.export_xla_model_metadata.get_nodes(graph_def, names)[source]

deepr.jobs.log_metric module

Log Metric Job

class deepr.jobs.log_metric.LogMetric(key, value, use_mlflow=False)[source]

Bases: Job

Log Metric job

key: str
run()[source]

Run Job

use_mlflow: bool = False
value: Any

deepr.jobs.mlflow_save_configs module

Upload Configs to MLFlow

class deepr.jobs.mlflow_save_configs.MLFlowFormatter(include_keys=None, skip_keys=(), skip_values=())[source]

Bases: object

Flattens dictionaries and extract sub-keys

Example

>>> from deepr.jobs import MLFlowFormatter
>>> params = {
...     "foo": {
...         "type": "foo.Foo",
...         "bar": {
...             "x": 1,
...             "y": 2,
...         }
...     }
... }
>>> formatter = MLFlowFormatter(include_keys=("bar", "x"), skip_values=(2,))
>>> formatter(params)
{'bar.x': 1, 'x': 1}
include_keys

If not None, keep only dictionaries nested under such keys

Type:

Tuple[str, …], Optional

skip_keys

Do not include dictionaries nested under such keys

Type:

Tuple[str, …]

skip_values

Do not include such values

Type:

Tuple[str, …]

class deepr.jobs.mlflow_save_configs.MLFlowSaveConfigs(use_mlflow=False, config=None, macros=None, macros_eval=None, formatter=None)[source]

Bases: Job

Upload Configs to MLFlow

config: Optional[Dict] = None
formatter: Optional[Callable[Dict, Dict]] = None
macros: Optional[Dict] = None
macros_eval: Optional[Dict] = None
run()[source]

Run Job

use_mlflow: Optional[bool] = False

deepr.jobs.mlflow_save_info module

Save MLFlow info to path

class deepr.jobs.mlflow_save_info.MLFlowSaveInfo(use_mlflow=False, path_mlflow=None, run_id=None, run_uuid=None, experiment_id=None)[source]

Bases: Job

Save MLFlow info to path

experiment_id: Optional[str] = None
path_mlflow: Optional[str] = None
run()[source]

Run Job

run_id: Optional[str] = None
run_uuid: Optional[str] = None
use_mlflow: Optional[bool] = False

deepr.jobs.optimize_saved_model module

Converts SavedModel into an optimized protobuf for inference

class deepr.jobs.optimize_saved_model.OptimizeSavedModel(path_saved_model, path_optimized_model, graph_name, feeds, fetch, new_names=<factory>, blacklisted_variables=<factory>)[source]

Bases: Job

Converts SavedModel into an optimized protobuf for inference

This job reads the input SavedModel, rename some nodes using the new_names argument (raises an error if some renames cannot be found), create placeholders given by feeds (and removes all other placeholders not in this list), and finally freezes the sub graph that produces the output tensor fetch.

When creating the original SavedModel, it is recommended to use tf.identity operators to mark some tensors as future feeds or fetches.

WARNING: successful completion of this job is no guarantee that the exported Graph is correct. It is recommended to test the export in a separate job.

path_saved_model

Path to directory containing SavedModel exports to convert

Type:

str

path_optimized_model

Path to directory that will contain the export

Type:

str

graph_name

Name of the output graph (name of the protobuf file)

Type:

str

new_names

Mapping old names (SavedModel nodes) -> new names (export)

Type:

Dict[str, str]

blacklisted_variables

List of variable names not to include in the export

Type:

Union[str, List[str]]

feeds

List of nodes to use as inputs, or comma separated string.

Type:

Union[str, List[str]]

fetch

List of nodes to use as output, or comma separated string.

Type:

Union[str, List[str]]

blacklisted_variables: List[str]
feeds: Union[str, List[str]]
fetch: Union[str, List[str]]
graph_name: str
new_names: Dict[str, str]
path_optimized_model: str
path_saved_model: str
run()[source]

Run Job

exception deepr.jobs.optimize_saved_model.TensorsNotFoundError(tensors)[source]

Bases: ValueError

No tensors found for those names.

Most Tensorflow operators take a name argument that should be used if you want to use a custom name for a tensor, otherwise a default will be used.

If accessing the operator creating a Tensor is not possible, you can use tf.identity to name the Tensor. However, note that it adds a new op to the graph that creates a copy of the input Tensor, and thus should be limited to avoid overhead.

deepr.jobs.optimize_saved_model.make_placeholders(graph_def, names)[source]

Create placeholders for names and remove other placeholders

Parameters:
  • graph_def (tf.GraphDef) – Graph definition

  • names (List[str]) – Names of placeholders to keep / create for this graph

Returns:

A copy of the input GraphDef with new placeholders

Return type:

tf.GraphDef

Raises:

TensorsNotFoundError – If names refers to a node that is not present

deepr.jobs.optimize_saved_model.rename_nodes(graph_def, new_names)[source]

Rename items in the graph to new ones defined in new_names

Parameters:
  • graph_def (tf.GraphDef) – Graph Definition

  • new_names (Dict[str, str]) – Mapping old name -> new name

Returns:

A copy of the input GraphDef with renamed nodes

Return type:

tf.GraphDef

Raises:

TensorsNotFoundError – If new_names refers to an node not found in graph_def

deepr.jobs.params_tuner module

Hyper parameter tuning

class deepr.jobs.params_tuner.GridSampler(param_grid, repeat=1)[source]

Bases: Sampler

Grid Sampler wrapping ParameterGrid from sklearn

class deepr.jobs.params_tuner.ParamsSampler(param_grid, n_iter=10, repeat=1, seed=None)[source]

Bases: Sampler

Parameter Sampler

class deepr.jobs.params_tuner.ParamsTuner(job, macros, sampler)[source]

Bases: Job

Params tuner

job: Dict
macros: Dict
run()[source]

Run Job

sampler: Sampler
class deepr.jobs.params_tuner.Sampler[source]

Bases: ABC

Parameters Sampler

deepr.jobs.save_dataset module

Save Dataset Job.

class deepr.jobs.save_dataset.SaveDataset(input_fn, path, fields, prepro_fn=<function SaveDataset.<lambda>>, num_parallel_calls=None, chunk_size=None, compression_type='GZIP', secs=60, mode=None)[source]

Bases: Job

Save Dataset Job.

chunk_size: Optional[int] = None
compression_type: str = 'GZIP'
fields: List[Field]
input_fn: Callable[DatasetV1]
mode: Optional[str] = None
num_parallel_calls: Optional[int] = None
path: str
prepro_fn(_)
run()[source]

Run Job

secs: Optional[int] = 60

deepr.jobs.trainer module

Train Job

class deepr.jobs.trainer.ConfigProto(inter_op_parallelism_threads=16, intra_op_parallelism_threads=16, log_device_placement=False, gpu_device_count=0, cpu_device_count=16, **kwargs)[source]

Bases: dict

Named Dict for ConfigProto arguments with reasonable defaults.

class deepr.jobs.trainer.EvalSpec(steps=None, name=None, start_delay_secs=120, throttle_secs=100)[source]

Bases: dict

Named Dict for EvalSpec arguments with reasonable defaults.

class deepr.jobs.trainer.FinalSpec[source]

Bases: dict

Named Dict for final evaluation with reasonable defaults.

class deepr.jobs.trainer.RunConfig[source]

Bases: dict

Named Dict for RunConfig arguments

class deepr.jobs.trainer.TrainSpec(max_steps=None)[source]

Bases: dict

Named Dict for TrainSpec arguments with reasonable defaults.

class deepr.jobs.trainer.Trainer(path_model, pred_fn, loss_fn, optimizer_fn, train_input_fn, eval_input_fn, prepro_fn=<function Trainer.<lambda>>, initializer_fn=<function Trainer.<lambda>>, exporters=<factory>, train_metrics=<factory>, eval_metrics=<factory>, final_metrics=<factory>, train_hooks=<factory>, eval_hooks=<factory>, final_hooks=<factory>, train_spec=<factory>, eval_spec=<factory>, final_spec=<factory>, run_config=<factory>, config_proto=<factory>, random_seed=42, preds=<factory>)[source]

Bases: TrainerBase

Train and evaluate a tf.Estimator on the current machine.

path_model

Path to the model directory. Can be either local or HDFS.

Type:

str

pred_fn

Typically a Layer instance, but in general, any callable.

Its signature is the following:
  • featuresDict

    Features, yielded by the dataset

  • predictionsDict

    Predictions

Type:

Callable[[Dict[str, tf.Tensor], str], Dict[str, tf.Tensor]]

loss_fn

Typically a Layer instance, but in general, any callable.

Its signature is the following:
  • features_and_predictionsDict

    Features and predictions combined

  • lossesDict

    Losses and metrics

The value for key “loss” from the output dictionary is then fed to the optimizer_fn.

Type:

Callable[[Dict[str, tf.Tensor], str], Dict[str, tf.Tensor]]

optimizer_fn

Typically an Optimizer instance, but in general, any callable.

Its signature is the following:
  • inputsDict[str, tf.Tensor]

    Typically has key “loss”`

  • outputsDict[str, tf.Tensor]

    Need key “train_op”

Type:

Callable[[Dict[str, tf.Tensor]], Dict[str, tf.Tensor]]

train_input_fn

Typically a Reader instance, but in general, any callable.

Used for training.

Its signature is the following:
  • outputstf.data.Dataset

    A newly created dataset. Each call to the input_fn should create a new dataset and a new graph.

Type:

Callable[[], tf.data.Dataset]

eval_input_fn

Typically a Reader instance, but in general, any callable.

Used for evaluation.

Its signature is the following:
  • outputstf.data.Dataset

    A newly created dataset. Each call to the input_fn should create a new dataset and a new graph.

Type:

Callable[[], tf.data.Dataset]

prepro_fn

Typically a Prepro instance, but in general, any callable.

Its signature is the following:
  • inputs :
    datasettf.data.Dataset

    Created by train_input_fn or eval_input_fn.

    modestr

    One of tf.estimator.ModeKeys.TRAIN, PREDICT or EVAL

  • outputstf.data.Dataset

    The preprocessed dataset

Type:

Callable[[tf.data.Dataset, str], tf.data.Dataset], Optional

initializer_fn

Any Callable that sets up initialization by adding an op to the default Graph.

Type:

Callable[[], None], Optional

train_metrics

Typically, Metric instances, but in general, any callables.

Used for training.

Each callable must have the following signature:
  • inputsDict

    Features, Predictions and Losses dictionary

  • outputsDict[str, Tuple]

    Dictionary of tuples of (tensor_value, update_op).

Type:

List[Callable], Optional

eval_metrics

Typically, Metric instances, but in general, any callables.

Used for evaluation.

Each callable must have the following signature:
  • inputsDict

    Features, Predictions and Losses dictionary

  • outputsDict[str, Tuple]

    Dictionary of tuples of (tensor_value, update_op).

Type:

List[Callable], Optional

exporters

Typically, Exporter instances, but in general, any callables.

Used at the end of training on the trained Estimator.

Each callable must have the following signature:
  • inputstf.estimator.Estimator

    A trained Estimator.

Type:

List[Callable], Optional

train_hooks

List of Hooks or HookFactories.

Used for training.

Some hook can be fully defined during instantiation of Trainer, for example a StepsPerSecHook. However, other hooks requires objects to be instantiated that will only be created after running the Trainer.

The hooks module defines factories for more complicated hooks.

Type:

List, Optional

eval_hooks

List of Hooks or HookFactories.

Used for evaluation.

Some hook can be fully defined during instantiation of Trainer, for example a StepsPerSecHook. However, other hooks requires objects to be instantiated that will only be created after running the Trainer.

The hooks module defines factories for more complicated hooks.

Type:

List, Optional

eval_spec

Optional parameters for EvalSpec.

Type:

Dict, Optional

train_spec

Optional parameters for TrainSpec.

Type:

Dict, Optional

run_config

Optional parameters for RunConfig.

Type:

Dict, Optional

config_proto

Optional parameters for RunConfig.

Type:

Dict, Optional

config_proto: Dict
create_experiment()[source]

Create an Experiment object packaging Estimator and Specs.

Returns:

estimator : tf.estimator.Estimator train_spec : tf.estimator.TrainSpec eval_spec : tf.estimator.EvalSpec

Return type:

Experiment (NamedTuple)

eval_hooks: List
eval_input_fn: Callable[DatasetV1]
eval_metrics: List[Callable]
eval_spec: Dict
exporters: List[Callable]
final_hooks: List
final_metrics: List[Callable]
final_spec: Dict
initializer_fn()
loss_fn: Callable[[Dict[str, Tensor], str], Dict[str, tensorflow.python.framework.ops.Tensor]]
optimizer_fn: Callable[Dict[str, Tensor], Dict[str, Tensor]]
path_model: str
pred_fn: Callable[[Dict[str, Tensor], str], Dict[str, tensorflow.python.framework.ops.Tensor]]
preds: List[str]
prepro_fn(_)
random_seed: int = 42
run_config: Dict
run_final_evaluation()[source]

Final evaluation on eval_input_fn with final_hooks

train_hooks: List
train_input_fn: Callable[DatasetV1]
train_metrics: List[Callable]
train_spec: Dict
deepr.jobs.trainer.model_fn(features, mode, pred_fn, loss_fn, optimizer_fn, initializer_fn, train_metrics, eval_metrics, train_hooks, eval_hooks, preds)[source]

Model Function

deepr.jobs.trainer_base module

Trainer Base.

class deepr.jobs.trainer_base.TrainerBase[source]

Bases: Job

Trainer Base.

abstract create_experiment()[source]
run()[source]

Run Job

abstract run_final_evaluation()[source]

deepr.jobs.trainer_keras module

Keras trainer.

class deepr.jobs.trainer_keras.TrainerKeras(path_model, model, train_input_fn, eval_input_fn, prepro_fn=<function TrainerKeras.<lambda>>, exporters=<factory>, train_hooks=<factory>, eval_hooks=<factory>, final_hooks=<factory>, train_spec=<factory>, eval_spec=<factory>, final_spec=<factory>, run_config=<factory>, config_proto=<factory>, random_seed=42)[source]

Bases: TrainerBase

Keras trainer.

config_proto: Dict
create_experiment()[source]

Create an Experiment object packaging Estimator and Specs.

Returns:

estimator : tf.estimator.Estimator train_spec : tf.estimator.TrainSpec eval_spec : tf.estimator.EvalSpec

Return type:

Experiment (NamedTuple)

eval_hooks: List
eval_input_fn: Callable[DatasetV1]
eval_spec: Dict
exporters: List[Callable]
final_hooks: List
final_spec: Dict
model: Model
path_model: str
prepro_fn(_)
random_seed: int = 42
run_config: Dict
run_final_evaluation()[source]

Final evaluation on eval_input_fn with final_hooks

train_hooks: List
train_input_fn: Callable[DatasetV1]
train_spec: Dict

deepr.jobs.yarn_config module

Basic Yarn Config in charge of uploading environment.

class deepr.jobs.yarn_config.YarnConfig(name, gpu_additional_packages=('tensorflow-gpu==1.15.2', 'tf-yarn-gpu==0.4.19'), gpu_ignored_packages=('tensorflow', 'tf-yarn'), cpu_ignored_packages=(), gpu_to_use=None, jvm_memory_in_gb=8, path_pex_cpu=None, path_pex_gpu=None, path_pex_prefix='viewfs://root/user/runner/envs')[source]

Bases: object

Basic Yarn Config in charge of uploading environment.

cpu_ignored_packages: Tuple[str, ...] = ()
get_env_vars()[source]

Return Default Environment Variables

gpu_additional_packages: Tuple[str, ...] = ('tensorflow-gpu==1.15.2', 'tf-yarn-gpu==0.4.19')
gpu_ignored_packages: Tuple[str, ...] = ('tensorflow', 'tf-yarn')
gpu_to_use: Optional[int] = None
jvm_memory_in_gb: int = 8
name: str
path_pex_cpu: Optional[str] = None
path_pex_gpu: Optional[str] = None
path_pex_prefix: str = 'viewfs://root/user/runner/envs'
upload_pex_cpu()[source]

Upload Current Environment as PEX for CPU.

Return type:

str

upload_pex_gpu()[source]

Upload Current Environment as PEX for GPU.

Return type:

str

deepr.jobs.yarn_config.upload_pex(path_pex, path_pex_existing=None, additional_packages=None, ignored_packages=None)[source]

Upload Current Environment and return path to PEX on HDFS

Return type:

str

deepr.jobs.yarn_launcher module

Yarn Launcher Config Interface and Job

class deepr.jobs.yarn_launcher.YarnLauncher(job, config, run_on_yarn=True, use_mlflow=False)[source]

Bases: Job

Packages current environment, upload .pex and run yarn job.

config: YarnLauncherConfig
job: Dict
run()[source]

Run Job

run_on_yarn: bool = True
use_mlflow: bool = False
class deepr.jobs.yarn_launcher.YarnLauncherConfig(name='yarn-launcher-2023-03-07-14-05-28', gpu_additional_packages=('tensorflow-gpu==1.15.2', 'tf-yarn-gpu==0.4.19'), gpu_ignored_packages=('tensorflow', 'tf-yarn'), cpu_ignored_packages=(), gpu_to_use=None, jvm_memory_in_gb=8, path_pex_cpu=None, path_pex_gpu=None, path_pex_prefix='viewfs://root/user/runner/envs', hadoop_file_systems=(), memory='48 GiB', num_cores=48)[source]

Bases: YarnConfig

Yarn Launcher Config.

hadoop_file_systems: Tuple[str, ...] = ()
memory: str = '48 GiB'
name: str = 'yarn-launcher-2023-03-07-14-05-28'
num_cores: int = 48

deepr.jobs.yarn_trainer module

Yarn Trainer Config and Job

class deepr.jobs.yarn_trainer.YarnTrainer(trainer, config, train_on_yarn=True)[source]

Bases: Job

Run a TrainerBase job on yarn.

config: YarnTrainerConfig
run()[source]

Run Job

train_on_yarn: bool = True
trainer: Dict
class deepr.jobs.yarn_trainer.YarnTrainerConfig(name='yarn-trainer-2023-03-07-14-05-28', gpu_additional_packages=('tensorflow-gpu==1.15.2', 'tf-yarn-gpu==0.4.19'), gpu_ignored_packages=('tensorflow', 'tf-yarn'), cpu_ignored_packages=(), gpu_to_use=None, jvm_memory_in_gb=8, path_pex_cpu=None, path_pex_gpu=None, path_pex_prefix='viewfs://root/user/runner/envs', nb_ps=None, nb_retries=0, nb_workers=None, pre_script_hook='source /etc/profile.d/cuda.sh && setupCUDA 10.1 && setupCUDNN cuda10.1_v7.6.4.38', queue='dev', tf_yarn='cpu', tf_yarn_chief_cores=48, tf_yarn_chief_memory='48 GiB', tf_yarn_evaluator_cores=48, tf_yarn_evaluator_memory='48 GiB', tf_yarn_tensorboard_memory='48 GiB')[source]

Bases: YarnConfig

Default Yarn Trainer Config

get_task_specs()[source]

Return Task Specs from parameters

name: str = 'yarn-trainer-2023-03-07-14-05-28'
nb_ps: Optional[int] = None
nb_retries: int = 0
nb_workers: Optional[int] = None
pre_script_hook: str = 'source /etc/profile.d/cuda.sh && setupCUDA 10.1 && setupCUDNN cuda10.1_v7.6.4.38'
queue: str = 'dev'
tf_yarn: str = 'cpu'
tf_yarn_chief_cores: int = 48
tf_yarn_chief_memory: str = '48 GiB'
tf_yarn_evaluator_cores: int = 48
tf_yarn_evaluator_memory: str = '48 GiB'
tf_yarn_tensorboard_memory: str = '48 GiB'

Module contents