Source code for deepr.jobs.yarn_config

"""Basic Yarn Config in charge of uploading environment."""

from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple
import logging
import os

from tf_yarn import packaging
import cluster_pack

from deepr.io.path import Path
from deepr.utils.graphite import HOST_ENV_VAR, PORT_ENV_VAR, PREFIX_ENV_VAR, INTERVAL_ENV_VAR
from deepr.utils.mlflow import RUN_ID_ENV_VAR, TRACKING_URI_ENV_VAR


LOGGER = logging.getLogger(__name__)


[docs]@dataclass class YarnConfig: """Basic Yarn Config in charge of uploading environment.""" name: str gpu_additional_packages: Tuple[str, ...] = ("tensorflow-gpu==1.15.2", "tf-yarn-gpu==0.4.19") gpu_ignored_packages: Tuple[str, ...] = ("tensorflow", "tf-yarn") cpu_ignored_packages: Tuple[str, ...] = () gpu_to_use: Optional[int] = None jvm_memory_in_gb: int = 8 path_pex_cpu: Optional[str] = None path_pex_gpu: Optional[str] = None path_pex_prefix: str = f"viewfs://root/user/{os.environ.get('USER', 'deepr')}/envs"
[docs] def get_env_vars(self): """Return Default Environment Variables""" # JVM environment Variables env_vars = { "LIBHDFS_OPTS": f"-Xms{self.jvm_memory_in_gb}g -Xmx{self.jvm_memory_in_gb}g", "MALLOC_ARENA_MAX": "0", } # GPU Environment Variables if self.gpu_to_use is not None: env_vars["CUDA_VISIBLE_DEVICES"] = str(self.gpu_to_use) # MLFlow Environment Variables if os.environ.get(RUN_ID_ENV_VAR) is not None: env_vars[RUN_ID_ENV_VAR] = os.environ.get(RUN_ID_ENV_VAR) env_vars[TRACKING_URI_ENV_VAR] = os.environ.get(TRACKING_URI_ENV_VAR) # Graphite Environment Variables if os.environ.get(PREFIX_ENV_VAR) is not None: env_vars[HOST_ENV_VAR] = os.environ.get(HOST_ENV_VAR) env_vars[PORT_ENV_VAR] = os.environ.get(PORT_ENV_VAR) env_vars[PREFIX_ENV_VAR] = os.environ.get(PREFIX_ENV_VAR) env_vars[INTERVAL_ENV_VAR] = os.environ.get(INTERVAL_ENV_VAR) return env_vars
[docs] def upload_pex_cpu(self) -> str: """Upload Current Environment as PEX for CPU.""" path_pex = f"{self.path_pex_prefix}/cpu/{self.name}.pex" return upload_pex( path_pex=path_pex, path_pex_existing=self.path_pex_cpu, ignored_packages=list(self.cpu_ignored_packages) if self.cpu_ignored_packages else None, )
[docs] def upload_pex_gpu(self) -> str: """Upload Current Environment as PEX for GPU.""" path_pex = f"{self.path_pex_prefix}/gpu/{self.name}.pex" return upload_pex( path_pex=path_pex, path_pex_existing=self.path_pex_gpu, additional_packages=dict((req.split("==")[0], req.split("==")[1]) for req in self.gpu_additional_packages), ignored_packages=list(self.gpu_ignored_packages) if self.gpu_ignored_packages else None, )
[docs]def upload_pex( path_pex: str, path_pex_existing: str = None, additional_packages: Dict = None, ignored_packages: List = None ) -> str: """Upload Current Environment and return path to PEX on HDFS""" if path_pex_existing is None: LOGGER.info(f"Uploading env to {path_pex}") packaging.upload_env_to_hdfs( archive_on_hdfs=path_pex, additional_packages=additional_packages if additional_packages else {}, ignored_packages=ignored_packages if ignored_packages else [], packer=cluster_pack.packaging.PEX_PACKER, ) elif not Path(path_pex_existing).is_hdfs: LOGGER.info(f"Uploading env to {path_pex}") packaging.upload_zip_to_hdfs(path_pex_existing, archive_on_hdfs=path_pex) else: LOGGER.info(f"Skipping upload, PEX {path_pex_existing} already exists") path_pex = path_pex_existing return path_pex