Source code for deepr.jobs.yarn_launcher

"""Yarn Launcher Config Interface and Job"""

from dataclasses import dataclass
from typing import Dict, Tuple
import datetime
import json
import logging

from cluster_pack.skein import skein_launcher
import skein

from deepr.config.base import from_config
from deepr.jobs import base
from deepr.jobs.yarn_config import YarnConfig
from deepr.utils import mlflow


LOGGER = logging.getLogger(__name__)


EDITABLE_PACKAGES_INDEX = "editable_packages_index"


[docs]@dataclass class YarnLauncherConfig(YarnConfig): """Yarn Launcher Config.""" name: str = f"yarn-launcher-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" hadoop_file_systems: Tuple[str, ...] = () memory: str = "48 GiB" num_cores: int = 48
YARN_LAUNCHER_APP_ID_MLFLOW_KEY = "yarn_launcher_app_id"
[docs]@dataclass class YarnLauncher(base.Job): """Packages current environment, upload `.pex` and run yarn job.""" job: Dict config: YarnLauncherConfig run_on_yarn: bool = True use_mlflow: bool = False
[docs] def run(self): if self.run_on_yarn: # Dump job and base as local json files for yarn_launcher job_name = f"job-{self.config.name}.json" with open(job_name, "w") as file: json.dump(self.job, file, indent=4) # Launch job on yarn pex_path = self.config.upload_pex_cpu() with skein.Client() as skein_client: LOGGER.info(f"Submitting job {self.config.name}") app_id = skein_launcher.submit( skein_client=skein_client, module_name="deepr.cli.main", additional_files=[job_name], package_path=pex_path, args=["from_config", job_name, "-", "run"], env_vars=self.config.get_env_vars(), hadoop_file_systems=list(self.config.hadoop_file_systems), memory=self.config.memory, name=self.config.name, num_cores=self.config.num_cores, ) report = skein_client.application_report(app_id) LOGGER.info(f"TRACKING_URL: {report.tracking_url}") if self.use_mlflow: mlflow.log_param(YARN_LAUNCHER_APP_ID_MLFLOW_KEY, app_id) mlflow.clear_run() else: LOGGER.info("Not running on yarn.") job = from_config(self.job) job.run()