Source code for deepr.io.parquet

"""Utilities for parquet"""

from contextlib import contextmanager
import logging
from typing import Union, List

import pandas as pd
import numpy as np
import pyarrow
import pyarrow.parquet as pq
from pyarrow.filesystem import FileSystem

from deepr.io.path import Path
from deepr.io.hdfs import HDFSFileSystem


LOGGER = logging.getLogger(__name__)


[docs]class ParquetDataset: """Context aware ParquetDataset with support for chunk writing. Makes it easier to read / write :class:`~ParquetDataset`. For example >>> from deepr.io import ParquetDataset >>> df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]}) >>> with ParquetDataset("viewfs://root/foo.parquet.snappy").open() as ds: # doctest: +SKIP ... ds.write_pandas(df, chunk_size=100) # doctest: +SKIP The use of context managers automatically opens / closes the dataset as well as the connection to the FileSystem. Attributes ---------- path_or_paths : Union[str, Path, List[Union[str, Path]]] Path to parquet dataset (directory or file), or list of files. filesystem : FileSystem, Optional FileSystem, if None, will be inferred automatically later. """
[docs] def __init__( self, path_or_paths: Union[str, Path, List[Union[str, Path]]], filesystem: FileSystem = None, metadata: pq.FileMetaData = None, schema: pyarrow.Schema = None, split_row_groups: bool = False, validate_schema: bool = True, filters: List = None, metadata_nthreads: int = 1, memory_map: bool = False, ): self.path_or_paths = list(map(str, path_or_paths)) if isinstance(path_or_paths, list) else str(path_or_paths) self.filesystem = filesystem self.schema = schema self.metadata = metadata self.split_row_groups = split_row_groups self.validate_schema = validate_schema self.filters = filters self.metadata_nthreads = metadata_nthreads self.memory_map = memory_map
@property def pq_dataset(self): return pq.ParquetDataset( path_or_paths=self.path_or_paths, filesystem=self.filesystem, metadata=self.metadata, schema=self.schema, split_row_groups=self.split_row_groups, validate_schema=self.validate_schema, filters=self.filters, metadata_nthreads=self.metadata_nthreads, memory_map=self.memory_map, ) @property def is_hdfs(self) -> bool: if isinstance(self.path_or_paths, str): return Path(self.path_or_paths).is_hdfs # type: ignore else: return Path(self.path_or_paths[0]).is_hdfs @property def is_local(self) -> bool: return not self.is_hdfs
[docs] @contextmanager def open(self): """Open HDFS Filesystem if dataset on HDFS""" if self.filesystem is None and self.is_hdfs: with HDFSFileSystem() as hdfs: yield ParquetDataset( path_or_paths=self.path_or_paths, filesystem=hdfs, metadata=self.metadata, schema=self.schema, split_row_groups=self.split_row_groups, validate_schema=self.validate_schema, filters=self.filters, metadata_nthreads=self.metadata_nthreads, memory_map=self.memory_map, ) else: yield self
[docs] def read(self, columns: List[str] = None, use_threads: bool = True, use_pandas_metadata: bool = False): return self.pq_dataset.read(columns=columns, use_threads=use_threads, use_pandas_metadata=use_pandas_metadata)
[docs] def read_pandas(self, columns: List[str] = None, use_threads: bool = True): return self.pq_dataset.read_pandas(columns=columns, use_threads=use_threads)
[docs] def write(self, table: pyarrow.Table, compression="snappy"): if not isinstance(self.path_or_paths, str): msg = f"Cannot write table to {self.path_or_paths} (expected string)" raise TypeError(msg) LOGGER.info(f"Writing table to {self.path_or_paths}") with Path(self.path_or_paths).open("wb", filesystem=self.filesystem) as file: pq.write_table(table, file, compression=compression)
[docs] def write_pandas( self, df: pd.DataFrame, compression="snappy", num_chunks: int = None, chunk_size: int = None, schema: pyarrow.Schema = None, ): """Write DataFrame as Parquet Dataset""" # Check arguments if not isinstance(self.path_or_paths, str): msg = f"Cannot write table to {self.path_or_paths} (expected string)" raise TypeError(msg) if num_chunks is not None and chunk_size is not None: msg = "Both num_chunks and chunk_size are given, not allowed" raise ValueError(msg) if chunk_size is not None: num_chunks = max(len(df) // chunk_size, 1) # Write DataFrame to parquet if num_chunks is None: table = pyarrow.Table.from_pandas(df, schema=schema, preserve_index=False) self.write(table, compression=compression) else: Path(self.path_or_paths).mkdir(parents=True, exist_ok=True, filesystem=self.filesystem) chunks = np.array_split(df, num_chunks) for idx, chunk in enumerate(chunks): filename = f"part-{idx:05d}.parquet.{compression}" chunk_path = Path(self.path_or_paths, filename) LOGGER.info(f"Writing chunk:{idx} to {chunk_path}") with chunk_path.open("wb", filesystem=self.filesystem) as file: table = pyarrow.Table.from_pandas(chunk, schema=schema, preserve_index=False) pq.write_table(table, file, compression=compression)