Source code for deepr.io.path

"""Path Utilities"""

from contextlib import contextmanager
from typing import Union, Generator, Optional
import os
import pathlib
from urllib import parse
import shutil
import logging

import tensorflow as tf
from pyarrow.filesystem import FileSystem

from deepr.io.hdfs import HDFSFileSystem, HDFSFile
from deepr.utils.datastruct import to_flat_tuple


LOGGER = logging.getLogger(__name__)


[docs]class Path: """Equivalent of pathlib.Path for local and HDFS FileSystem Automatically opens and closes an HDFS connection if the path is an HDFS path. Allows you to work with local / HDFS files in an agnostic manner. Example ------- .. code-block:: python path = Path("viewfs://foo", "bar") / "baz" path.parent.mkdir() with path.open("r") as file: for line in file: print(line) for path in path.glob("*"): print(path.is_file()) """
[docs] def __init__(self, *args: Union[str, pathlib.Path, "Path"]): self.path = os.path.join(*[str(arg) for arg in to_flat_tuple(args)])
def __str__(self) -> str: return self.path def __repr__(self) -> str: return f"Path({str(self)})" def __eq__(self, other) -> bool: return str(self) == str(other) def __truediv__(self, other) -> "Path": """Syntactic sugar for path definition.""" return Path(self, other) @property def name(self) -> str: """Final path component.""" return os.path.basename(self.path) @property def parent(self): """Path to the parent of the current path""" return Path("/".join(self.path.split("/")[:-1])) @property def is_hdfs(self) -> bool: """Return True if the path points to an HDFS location""" scheme = parse.urlparse(str(self)).scheme return scheme in {"hdfs", "viewfs"} @property def is_local(self) -> bool: """Return True if the path points to a local file or dir.""" return not self.is_hdfs @property def suffix(self): """File extension of the file if any.""" return pathlib.Path(str(self)).suffix
[docs] def exists(self, filesystem: FileSystem = None) -> bool: """Return True if the path points to an existing file or dir.""" if filesystem is not None: return filesystem.exists(str(self)) else: if self.is_hdfs: with HDFSFileSystem() as hdfs: return hdfs.exists(str(self)) else: return pathlib.Path(str(self)).exists()
[docs] def is_dir(self, filesystem: FileSystem = None) -> bool: """Return True if the path points to a regular directory.""" if filesystem is not None: return filesystem.isdir(str(self)) else: if self.is_hdfs: with HDFSFileSystem() as hdfs: return hdfs.isdir(str(self)) else: return pathlib.Path(str(self)).is_dir()
[docs] def is_file(self, filesystem: FileSystem = None) -> bool: """Return True if the path points to a regular file.""" if filesystem is not None: return filesystem.isfile(str(self)) else: if self.is_hdfs: with HDFSFileSystem() as hdfs: return hdfs.isfile(str(self)) else: return pathlib.Path(str(self)).is_file()
[docs] def mkdir(self, parents: bool = False, exist_ok: bool = False, filesystem: FileSystem = None): """Create directory""" if self.is_dir(filesystem=filesystem): if exist_ok: return else: raise Exception(f"Directory {self} already exists.") if filesystem is not None: filesystem.mkdir(str(self)) else: if self.is_hdfs: with HDFSFileSystem() as hdfs: hdfs.mkdir(str(self)) else: pathlib.Path(str(self)).mkdir(parents=parents, exist_ok=exist_ok)
[docs] def delete_dir(self, filesystem: FileSystem = None): """Delete dir from filesystem""" if not self.is_dir(filesystem=filesystem): raise FileNotFoundError(str(self)) if filesystem is not None: filesystem.rm(str(self), recursive=True) else: if self.is_hdfs: with HDFSFileSystem() as hdfs: hdfs.rm(str(self), recursive=True) else: shutil.rmtree(str(self))
[docs] def delete(self, filesystem: FileSystem = None): """Delete file from filesystem""" if not self.is_file(filesystem=filesystem): raise FileNotFoundError(str(self)) if filesystem is not None: filesystem.delete(str(self)) else: if self.is_hdfs: with HDFSFileSystem() as hdfs: hdfs.delete(str(self)) else: pathlib.Path(str(self)).unlink()
[docs] def copy_file(self, dest, filesystem: FileSystem = None): """Copy current file to dest (target directory must exist).""" LOGGER.info(f"Copying file {self} to {dest}") if not self.is_file(filesystem=filesystem): raise FileNotFoundError(str(self)) if self.is_hdfs or Path(dest).is_hdfs: tf.io.gfile.copy(str(self), str(dest), overwrite=True) else: shutil.copy(str(self), str(dest))
[docs] def copy_dir(self, dest, recursive: bool = False, filesystem: FileSystem = None): """Copy current files and directories if recursive to dest.""" LOGGER.info(f"Copying {self} to {dest}") if not self.is_dir(filesystem=filesystem): raise FileNotFoundError(str(self)) Path(dest).mkdir(parents=True, exist_ok=True, filesystem=filesystem) for path in self.glob("*"): if path.is_file(filesystem): path.copy_file(Path(dest) / path.name, filesystem=filesystem) elif path.is_dir(filesystem): if recursive: path.copy_dir(Path(dest) / path.name, recursive=recursive, filesystem=filesystem) else: raise Exception(f"Unable to copy {path}")
[docs] def iterdir(self, filesystem: FileSystem = None) -> Generator["Path", None, None]: """Retrieve directory content.""" if filesystem is not None: return (Path(path) for path in list(filesystem.ls(str(self)))) else: if self.is_hdfs: with HDFSFileSystem() as hdfs: return (Path(path) for path in list(hdfs.ls(str(self)))) else: return (Path(str(path)) for path in pathlib.Path(str(self)).iterdir())
[docs] def glob(self, pattern) -> Generator["Path", None, None]: """Retrieve directory content matching pattern""" if not self.is_hdfs: return (Path(path) for path in pathlib.Path(str(self)).glob(pattern)) else: def _glob_rec(path, patt): content = [Path(p) for p in tf.io.gfile.glob(str(Path(path, patt)))] subdirs = [p for p in Path(path).iterdir() if p.is_dir()] for p in subdirs: content.extend(_glob_rec(p, patt)) return content if pattern.startswith("**/"): return (p for p in _glob_rec(self, pattern[3:])) else: return (Path(p) for p in tf.io.gfile.glob(str(Path(self, pattern))))
[docs] @contextmanager def open(self, mode: str = "r", encoding: Optional[str] = "utf-8", filesystem: FileSystem = None): """Open file on both HDFS and Local File Systems. Example ------- Use a context manager like so .. code-block:: python path = Path("viewfs://root/user/path/to/file.txt") with path.open("w") as file: file.write("Hello world!") """ if "b" in mode: encoding = None # mypy: ignore if filesystem is not None: with HDFSFile(filesystem=filesystem, path=str(self), mode=mode, encoding=encoding) as file: yield file else: if self.is_hdfs: with HDFSFileSystem() as hdfs: with HDFSFile(filesystem=hdfs, path=str(self), mode=mode, encoding=encoding) as file: yield file else: with pathlib.Path(str(self)).open(mode=mode, encoding=encoding) as file: yield file