Source code for deepr.examples.movielens.jobs.build_records

"""Build MovieLens dataset as TFRecords."""

import logging
import random
from typing import List, Dict, Tuple, Callable
from functools import partial
from dataclasses import dataclass

import tensorflow as tf

import deepr
from deepr.examples.movielens.utils import fields

try:
    import pandas as pd
except ImportError as e:
    print(f"Pandas needs to be installed for MovieLens {e}")


LOGGER = logging.getLogger(__name__)


FIELDS = [fields.UID, fields.INPUT_POSITIVES, fields.TARGET_POSITIVES, fields.TARGET_NEGATIVES]


[docs]@dataclass class BuildRecords(deepr.jobs.Job): """Build MovieLens dataset as TFRecords. It aggregates movie ratings by user and build timelines of movies. The users are split into train / validation / test sets. Each timeline is split in two sub-timelines: one input, one target. For each item in the target, n negatives are sampled. The resulting tfrecords have the following fields - "uid": () - "inputPositives": [size_input] - "targetPositives": [size_target] - "targetNegatives": [size_target, num_negatives] """ path_ratings: str path_mapping: str path_train: str path_eval: str path_test: str min_rating: int = 4 min_length: int = 5 num_negatives: int = 8 target_ratio: float = 0.2 size_test: int = 10_000 size_eval: int = 10_000 shuffle_timelines: bool = True seed: int = 2020
[docs] def run(self): # Read timelines (sorted by timestamp) random.seed(self.seed) timelines = get_timelines( path_ratings=self.path_ratings, min_rating=self.min_rating, min_length=self.min_length ) # Split into train, eval and test random.shuffle(timelines) timelines_test = timelines[: self.size_test] timelines_eval = timelines[self.size_test : self.size_test + self.size_eval] timelines_train = timelines[self.size_test + self.size_eval :] # Build vocabulary LOGGER.info("Building vocabulary of movieId") movies = set() for _, ids in timelines_train: movies.update(ids) vocab = sorted(movies) mapping = {movie: idx for idx, movie in enumerate(vocab)} deepr.io.Path(self.path_mapping).parent.mkdir(parents=True, exist_ok=True) deepr.vocab.write(self.path_mapping, [str(movie) for movie in vocab]) LOGGER.info(f"Number of movies after filtration is: {len(vocab)}") # Write datasets for timelines, path in zip( [timelines_train, timelines_test, timelines_eval], [self.path_train, self.path_test, self.path_eval] ): deepr.io.Path(path).parent.mkdir(parents=True, exist_ok=True) LOGGER.info(f"Writing {len(timelines)} timelines to {path}") LOGGER.info(f"shuffle_timelines = {self.shuffle_timelines}, num_negatives = {self.num_negatives}") write_records( partial( records_generator, timelines=timelines, target_ratio=self.target_ratio, num_negatives=self.num_negatives, shuffle_timelines=self.shuffle_timelines, mapping=mapping, ), path, )
[docs]def get_timelines(path_ratings: str, min_rating: float, min_length: int) -> List[Tuple[str, List[int]]]: """Build timelines from MovieLens Dataset. Apply the following filters keep movies with ratings > min_rating keep users with number of movies > min_length """ # Open path_ratings from HDFS / Local FileSystem LOGGER.info(f"Reading ratings from {path_ratings}") with deepr.io.Path(path_ratings).open() as file: ratings_data = pd.read_csv(file) LOGGER.info(f"Number of timelines before filtration is {len(set(ratings_data.userId))}") LOGGER.info(f"Number of movies before filtration is {len(set(ratings_data.movieId))}") # Group and aggregate ratings per user LOGGER.info("Grouping ratings by user") ratings_data = ratings_data[ratings_data.rating >= min_rating] grouped_data = ratings_data.groupby("userId").agg(list).reset_index() grouped_data = grouped_data[grouped_data.rating.map(len) >= min_length] LOGGER.info(f"Number of timelines after filtration is {len(grouped_data)}") # Sort ratings by timestamp LOGGER.info("Building timelines (sort by timestamp).") timelines = [] for _, row in deepr.utils.progress(grouped_data.iterrows(), secs=10): uid = str(row.userId) movies = [movie for _, movie in sorted(zip(row.timestamp, row.movieId))] timelines.append((uid, movies)) return timelines
[docs]def write_records(gen: Callable, path: str): """Write records to path.""" dataset = tf.data.Dataset.from_generator( gen, output_types={field.name: field.dtype for field in FIELDS}, output_shapes={field.name: field.shape for field in FIELDS}, ) to_example = deepr.prepros.ToExample(fields=FIELDS) writer = deepr.writers.TFRecordWriter(path=path) writer.write(to_example(dataset))
[docs]def records_generator( timelines: List[Tuple[str, List[int]]], target_ratio: float, num_negatives: int, shuffle_timelines: bool, mapping: Dict[int, int], ): """Convert Timelines to list of Records with negative samples.""" for uid, movies in deepr.utils.progress(timelines, secs=10): # Remap movies to index and shuffle indices = [mapping[movie] for movie in movies if movie in mapping] if shuffle_timelines: random.shuffle(indices) # Split into input and target split = int(len(indices) * (1 - target_ratio)) input_positives = indices[:split] target_positives = indices[split:] # Sample negatives target_negatives = [random.sample(range(len(mapping)), num_negatives) for _ in range(len(target_positives))] yield { fields.UID.name: uid, fields.INPUT_POSITIVES.name: input_positives, fields.TARGET_POSITIVES.name: target_positives, fields.TARGET_NEGATIVES.name: target_negatives, }