Source code for deepr.jobs.yarn_trainer

"""Yarn Trainer Config and Job"""

from dataclasses import dataclass
from typing import Dict, Optional
import atexit
import datetime
import logging

from cluster_pack.packaging import get_editable_requirements
import mlflow
import skein
import tf_yarn

from deepr.config.base import from_config
from deepr.jobs import base
from deepr.jobs.yarn_config import YarnConfig


LOGGER = logging.getLogger(__name__)


[docs]@dataclass class YarnTrainerConfig(YarnConfig): """Default Yarn Trainer Config""" name: str = f"yarn-trainer-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" nb_ps: Optional[int] = None nb_retries: int = 0 nb_workers: Optional[int] = None pre_script_hook: str = "source /etc/profile.d/cuda.sh && setupCUDA 10.1 && " "setupCUDNN cuda10.1_v7.6.4.38" queue: str = "dev" tf_yarn: str = "cpu" tf_yarn_chief_cores: int = 48 tf_yarn_chief_memory: str = "48 GiB" tf_yarn_evaluator_cores: int = 48 tf_yarn_evaluator_memory: str = "48 GiB" tf_yarn_tensorboard_memory: str = "48 GiB"
[docs] def get_task_specs(self): """Return Task Specs from parameters""" label = tf_yarn.NodeLabel.CPU if self.tf_yarn == "cpu" else tf_yarn.NodeLabel.GPU specs = { "chief": tf_yarn.TaskSpec(memory=self.tf_yarn_chief_memory, vcores=self.tf_yarn_chief_cores, label=label), "evaluator": tf_yarn.TaskSpec(memory=self.tf_yarn_evaluator_memory, vcores=self.tf_yarn_evaluator_cores), "tensorboard": tf_yarn.TaskSpec(memory=self.tf_yarn_tensorboard_memory, vcores=8), } if self.nb_workers is not None: specs["worker"] = tf_yarn.TaskSpec(memory="32 GiB", vcores=48, instances=self.nb_workers, label=label) if self.nb_ps is not None: specs["ps"] = tf_yarn.TaskSpec(memory="32 GiB", vcores=48, instances=self.nb_ps) return specs
[docs]@dataclass class YarnTrainer(base.Job): """Run a :class:`~deepr.jobs.TrainerBase` job on yarn.""" trainer: Dict config: YarnTrainerConfig train_on_yarn: bool = True
[docs] def run(self): if self.train_on_yarn: # Upload environment(s) to HDFS (CPU and / or GPU environments) pyenv_zip_path = {tf_yarn.NodeLabel.CPU: self.config.upload_pex_cpu()} if self.config.tf_yarn == "gpu": pyenv_zip_path[tf_yarn.NodeLabel.GPU] = self.config.upload_pex_gpu() def _experiment_fn(): # Remove auto-termination of active MLFlow runs from # inside the chief / evaluator atexit.unregister(mlflow.end_run) return from_config(self.trainer).create_experiment() tf_yarn.run_on_yarn( acls=skein.model.ACLs(enable=True, ui_users=["*"], view_users=["*"]), env=self.config.get_env_vars(), experiment_fn=_experiment_fn, files=get_editable_requirements(), name=self.config.name, nb_retries=self.config.nb_retries, pre_script_hook=self.config.pre_script_hook, pyenv_zip_path=pyenv_zip_path, queue=self.config.queue, task_specs=self.config.get_task_specs(), ) # Run exporters and final evaluation trainer = from_config(self.trainer) experiment = trainer.create_experiment() for exporter in trainer.exporters: exporter(experiment.estimator) trainer.run_final_evaluation() else: LOGGER.info("Not training on yarn.") trainer = from_config(self.trainer) trainer.run()