Source code for deepr.examples.movielens.readers.csv

# pylint: disable=invalid-name
"""CSV Reader for MovieLens."""

import logging
import collections

import tensorflow as tf
import numpy as np
import deepr

from deepr.examples.movielens.utils import fields


try:
    import pandas as pd
except ImportError as e:
    print(f"Pandas needs to be installed for MovieLens {e}")


try:
    from scipy import sparse
except ImportError as e:
    print(f"Scipy needs to be installed for MovieLens {e}")


LOGGER = logging.getLogger(__name__)


[docs]class TrainCSVReader(deepr.readers.Reader): """Reader of MovieLens CSV files of the Multi-VAE paper. See https://github.com/dawenl/vae_cf """ def __init__( self, path_csv: str, vocab_size: int, target_ratio: float = None, bucket_size: int = 16 * 512, shuffle: bool = True, seed: int = 42, take_ratio: float = None, ): self.path_csv = path_csv self.vocab_size = vocab_size self.target_ratio = target_ratio self.bucket_size = bucket_size self.shuffle = shuffle self.seed = seed self.take_ratio = take_ratio self.fields = [ fields.UID, fields.INPUT_POSITIVES, fields.TARGET_POSITIVES, fields.INPUT_POSITIVES_ONE_HOT(vocab_size), fields.TARGET_POSITIVES_ONE_HOT(vocab_size), ]
[docs] def as_dataset(self): with deepr.io.Path(self.path_csv).open() as file: tp = pd.read_csv(file) rows, cols = tp["uid"], tp["sid"] n_users = rows.max() + 1 data = sparse.csr_matrix((np.ones_like(rows), (rows, cols)), dtype="int64", shape=(n_users, self.vocab_size)) LOGGER.info(f"Reloaded user-item matrix of shape {data.shape} with num_events={len(rows)}") if self.take_ratio is not None: data = data[: int(self.take_ratio * n_users)] LOGGER.info(f"Sliced user-item matrix, new shape = {data.shape}, num_events={data.sum()}") np.random.seed(self.seed) counts = np.array(data.sum(axis=1)).flatten() if self.bucket_size: buckets = collections.defaultdict(list) for bucket, (_, idx) in enumerate(sorted(zip(counts, range(data.shape[0])))): buckets[bucket // self.bucket_size].append(idx) def _gen(): # Resolve idxlist (shuffle + buckets) if self.shuffle: if self.bucket_size: idxlist = [] # Shuffle buckets bucket_idx = list(range(len(buckets))) np.random.shuffle(bucket_idx) for idx in bucket_idx: # Shuffle bucket indices bucket_idxlist = buckets[idx] np.random.shuffle(bucket_idxlist) idxlist.extend(bucket_idxlist) else: idxlist = list(range(data.shape[0])) np.random.shuffle(idxlist) else: if self.bucket_size: idxlist = [idx for _, idx in sorted(zip(counts, range(data.shape[0])))] else: idxlist = list(range(data.shape[0])) # Iterate over items in the sparse matrix for idx in idxlist: X = data[idx] if sparse.isspmatrix(X): X = X.toarray() X = X.astype("int64") indices = np.nonzero(X[0])[0] # Split into input / target if self.target_ratio is not None: np.random.shuffle(indices) size = int(len(indices) * (1 - self.target_ratio)) input_positives = indices[:size] target_positives = indices[size:] input_one_hot = np.zeros((self.vocab_size,), dtype=np.int64) input_one_hot[input_positives] = 1 target_one_hot = np.zeros((self.vocab_size,), dtype=np.int64) target_one_hot[target_positives] = 1 # Same input / target else: input_positives = indices target_positives = indices input_one_hot = X[0] target_one_hot = X[0] yield { "uid": idx, "inputPositives": input_positives, "inputPositivesOneHot": input_one_hot, "targetPositives": target_positives, "targetPositivesOneHot": target_one_hot, } return tf.data.Dataset.from_generator( _gen, output_types={field.name: field.dtype for field in self.fields}, output_shapes={field.name: field.shape for field in self.fields}, )
[docs]class TestCSVReader(deepr.readers.Reader): """Reader of MovieLens test CSV files of the Multi-VAE paper. See https://github.com/dawenl/vae_cf """ def __init__(self, path_csv_tr: str, path_csv_te: str, vocab_size: int): self.path_csv_tr = path_csv_tr self.path_csv_te = path_csv_te self.vocab_size = vocab_size self.fields = [ fields.UID, fields.INPUT_POSITIVES, fields.TARGET_POSITIVES, fields.INPUT_POSITIVES_ONE_HOT(vocab_size), fields.TARGET_POSITIVES_ONE_HOT(vocab_size), ]
[docs] def as_dataset(self): with deepr.io.Path(self.path_csv_tr).open() as file: tp_tr = pd.read_csv(file) with deepr.io.Path(self.path_csv_te).open() as file: tp_te = pd.read_csv(file) start_idx = min(tp_tr["uid"].min(), tp_te["uid"].min()) end_idx = max(tp_tr["uid"].max(), tp_te["uid"].max()) rows_tr, cols_tr = tp_tr["uid"] - start_idx, tp_tr["sid"] rows_te, cols_te = tp_te["uid"] - start_idx, tp_te["sid"] data_tr = sparse.csr_matrix( (np.ones_like(rows_tr), (rows_tr, cols_tr)), dtype="int64", shape=(end_idx - start_idx + 1, self.vocab_size) ) data_te = sparse.csr_matrix( (np.ones_like(rows_te), (rows_te, cols_te)), dtype="int64", shape=(end_idx - start_idx + 1, self.vocab_size) ) def _gen(): for idx in range(data_tr.shape[0]): X = data_tr[idx] y = data_te[idx] if sparse.isspmatrix(X): X = X.toarray() if sparse.isspmatrix(y): y = y.toarray() X = X.astype("int64") y = y.astype("int64") yield { "uid": idx, "inputPositives": np.nonzero(X[0])[0], "inputPositivesOneHot": X[0], "targetPositives": np.nonzero(y[0])[0], "targetPositivesOneHot": y[0], } return tf.data.Dataset.from_generator( _gen, output_types={field.name: field.dtype for field in self.fields}, output_shapes={field.name: field.shape for field in self.fields}, )