MovieLens Example
In this notebook, we train an AverageModel on the MovieLens dataset with a BPRLoss.
The pipeline is made of 4 steps
step 1: given the MovieLens ratings.csv file, create tfrecords for the training, evaluation and test sets.
step 2: train an AverageModel (optionally, use tf-yarn to distribute training and evaluation on a cluster) and export the embeddings as well as the graph.
step 3: write predictions, i.e. for all the test timelines, compute the user representation.
step 4: evaluate predictions, i.e. look how similar the recommendations provided by the model are to the actual movies seen by the user in the “future”
Install
[ ]:
!pip install deepr[cpu] faiss_cpu
Download the dataset
First download the movielens dataset
[ ]:
!wget http://files.grouplens.org/datasets/movielens/ml-20m.zip
!unzip ml-20m.zip
[ ]:
import os
import logging
import tensorflow as tf
import deepr
from deepr.examples import movielens
[3]:
logging.basicConfig(level=logging.INFO)
[4]:
dataset_path = os.getcwd() + "/ml-20m"
path_ratings = f"{dataset_path}/ratings.csv"
path_root = "average"
path_model = path_root + "/model"
path_data = path_root + "/data"
path_variables = path_root + "/variables"
path_predictions = path_root + "/predictions.parquet.snappy"
path_saved_model = path_root + "/saved_model"
path_mapping = path_data + "/mapping.txt"
path_train = path_data + "/train.tfrecord.gz"
path_eval = path_data + "/eval.tfrecord.gz"
path_test = path_data + "/test.tfrecord.gz"
deepr.io.Path(path_root).mkdir(exist_ok=True)
deepr.io.Path(path_model).mkdir(exist_ok=True)
deepr.io.Path(path_data).mkdir(exist_ok=True)
max_steps = 100_000
Step 1 - Build TF Records
The job takes as input the csv ratings and create 3 tfrecords files (train, validation, test).
Each file contains timeline of user ratings split into input and target.
[5]:
build = movielens.jobs.BuildRecords(
path_ratings=path_ratings,
path_mapping=path_mapping,
path_train=path_train,
path_eval=path_eval,
path_test=path_test,
min_rating=4,
min_length=5,
num_negatives=8,
target_ratio=0.2,
size_test=10_000,
size_eval=10_000,
shuffle_timelines=True,
seed=2020,
)
Now that the job is defined, let’s run it.
We run it before defining the other jobs because we need to know the vocabulary size (number of movies in the dataset) to build the embedding matrix.
[ ]:
build.run()
Step 2 - Training
This defines a training job to build a model that transform a timeline of movies embedding to an user embedding. It takes as input the tf records and upon completion writes 3 artifacts : * dataframe of the biases and the embeddings * saved model : protobuf containing the model definition and weights
In this specific instance we train an average model with a BPR loss and compute a triple precision on the validation set.
[9]:
train = deepr.jobs.Trainer(
path_model=path_model,
pred_fn=movielens.layers.AverageModel(vocab_size=deepr.vocab.size(path_mapping), dim=100, keep_prob=0.5),
loss_fn=movielens.layers.Loss(loss="bpr", vocab_size=deepr.vocab.size(path_mapping)),
optimizer_fn=deepr.optimizers.TensorflowOptimizer("LazyAdam", 0.001),
train_input_fn=deepr.readers.TFRecordReader(path_train),
eval_input_fn=deepr.readers.TFRecordReader(path_eval, shuffle=False),
prepro_fn=movielens.prepros.RecordPrepro(
min_input_size=3,
min_target_size=3,
max_input_size=50,
max_target_size=50,
buffer_size=1024,
batch_size=128,
repeat_size=None,
prefetch_size=1,
num_parallel_calls=8,
),
train_spec=deepr.jobs.TrainSpec(max_steps=max_steps),
eval_spec=deepr.jobs.EvalSpec(steps=None, start_delay_secs=30, throttle_secs=30),
final_spec=deepr.jobs.FinalSpec(steps=None),
exporters=[
# The training will keep the model with the best triplet precision
deepr.exporters.BestCheckpoint(metric="triplet_precision", mode="increase"),
# Export biases and embeddings as a dataframe
deepr.exporters.SaveVariables(path_variables=path_variables, variable_names=["biases", "embeddings"]),
# Export a saved model using specified fields as input
deepr.exporters.SavedModel(
path_saved_model=path_saved_model,
fields=[
deepr.Field(name="inputPositives", shape=(None,), dtype=tf.int64),
deepr.Field(name="inputMask", shape=(None,), dtype=tf.bool),
],
),
],
train_hooks=[
# Log metrics, hyperparams, initial values to the console, and optionally mlflow and graphite
deepr.hooks.LoggingTensorHookFactory(
name="training",
functions={
"memory_gb": deepr.hooks.ResidentMemory(unit="gb"),
"max_memory_gb": deepr.hooks.MaxResidentMemory(unit="gb"),
},
every_n_iter=300,
use_graphite=False,
use_mlflow=False,
),
deepr.hooks.SummarySaverHookFactory(save_steps=300),
deepr.hooks.NumParamsHook(use_mlflow=False),
deepr.hooks.LogVariablesInitHook(use_mlflow=False),
deepr.hooks.StepsPerSecHook(
name="training",
batch_size=128,
every_n_steps=300,
skip_after_step=max_steps,
use_mlflow=False,
use_graphite=False,
),
# Stop the training if triplet precision does not improve
deepr.hooks.EarlyStoppingHookFactory(
metric="triplet_precision",
mode="increase",
max_steps_without_improvement=1000,
min_steps=5_000,
run_every_steps=300,
final_step=max_steps,
),
],
eval_hooks=[deepr.hooks.LoggingTensorHookFactory(name="validation", at_end=True)],
final_hooks=[deepr.hooks.LoggingTensorHookFactory(name="final_validation", at_end=True)],
train_metrics=[deepr.metrics.StepCounter(name="num_steps"), deepr.metrics.DecayMean(tensors=["loss"], decay=0.98)],
eval_metrics=[deepr.metrics.Mean(tensors=["loss", "triplet_precision"])],
final_metrics=[deepr.metrics.Mean(tensors=["loss", "triplet_precision"])],
run_config=deepr.jobs.RunConfig(
save_checkpoints_steps=300, save_summary_steps=300, keep_checkpoint_max=None, log_step_count_steps=300
),
config_proto=deepr.jobs.ConfigProto(
inter_op_parallelism_threads=8, intra_op_parallelism_threads=8, gpu_device_count=0, cpu_device_count=48,
),
)
Step 3 - Predict job
The predict job reloads the test dataset and the saved model, performs the inference and writes the user embeddings in a dataframe to be reused later by the validation job.
[10]:
predict = movielens.jobs.Predict(
path_saved_model=path_saved_model,
path_predictions=path_predictions,
input_fn=deepr.readers.TFRecordReader(path_test, shuffle=False),
prepro_fn=movielens.prepros.RecordPrepro(),
)
Step 4 - Evaluation job
This job takes user embedding, and uses faiss to retrieve the k nearest neighboors in the product embedding space, and compute metrics using the target timelines.
[11]:
evaluate = [
movielens.jobs.Evaluate(
path_predictions=path_predictions,
path_embeddings=path_variables + "/embeddings",
path_biases=path_variables + "/biases",
k=k,
)
for k in [10, 20, 50]
]
Run the pipeline
All the jobs definition are lazy, and so is the pipeline. Calling run on it will actually perform all these steps.
[12]:
pipeline = deepr.jobs.Pipeline([train, predict] + evaluate)
[ ]:
pipeline.run()
You should expecte the following metrics
INFO:deepr.examples.movielens.jobs.evaluate:precision@10 = 0.15774000000000002
recall@10 = 0.22966825396825397
f1@10 = 0.17782589040302665
NDCG@10 = 0.218460915855854
INFO:deepr.examples.movielens.jobs.evaluate:precision@20 = 0.12564500000000003
recall@20 = 0.29865131767226577
f1@20 = 0.16083200785947185
NDCG@20 = 0.24568841120719748
INFO:deepr.examples.movielens.jobs.evaluate:precision@50 = 0.08614800000000002
recall@50 = 0.44487653142758526
f1@50 = 0.1303801216635994
NDCG@50 = 0.3022301157999229
You can find configs on github that achieve better performance.
Visualize embeddings with a KNN
Let’s check if the movie embeddings produce make sense * load the movie embeddings * load the movie title * build a knn index on that * do a query with a known movie to check that its closest neighboors make sense
[14]:
import numpy as np
import pyarrow.parquet as pq
import faiss
import pyarrow.csv as pc
from IPython.display import display
import pandas as pd
[17]:
embeddings = np.vstack(pq.read_table("average/variables/embeddings").to_pandas().to_numpy())
mapp = {int(movie_id):indice for indice, movie_id in enumerate(open("average/data/mapping.txt", "r").read().split("\n"))}
inversed_map = {indice:movie_id for movie_id, indice in mapp.items()}
index = faiss.IndexFlatIP(embeddings.shape[-1])
index.add(np.ascontiguousarray(embeddings))
def knn_query(index, query, ksearch):
D, I = index.search(np.expand_dims(query,0), ksearch)
distances = D[0]
product_indices = I[0]
product_ids = [inversed_map[i] for i in product_indices]
return list(zip(product_ids, distances))
movies = pc.read_csv(f"{dataset_path}/movies.csv").to_pandas()
def display_results_df(results):
data = [[movies[movies.movieId == movie_id]["genres"].to_numpy()[0], movies[movies.movieId == movie_id]["title"].to_numpy()[0], distance] for movie_id, distance in results]
df = pd.DataFrame(data, columns = ['Genre', 'Title', 'Distance'])
display(df)
def display_movie(movie_id):
data = [[(movies[movies.movieId == movie_id]["genres"].to_numpy()[0]), (movies[movies.movieId == movie_id]["title"].to_numpy()[0])]]
df = pd.DataFrame(data, columns = ['Genre', 'Title'])
display(df)
[18]:
p = movies[movies.title.str.lower().str.contains("star wars")]["movieId"].to_numpy()[0]
print("Query")
display_movie(p)
print("Knn results")
display_results_df(knn_query(index, embeddings[mapp[p]], 5))
Query
Genre | Title | |
---|---|---|
0 | Action|Adventure|Sci-Fi | Star Wars: Episode IV - A New Hope (1977) |
Knn results
Genre | Title | Distance | |
---|---|---|---|
0 | Action|Adventure|Sci-Fi | Star Wars: Episode IV - A New Hope (1977) | 16.765795 |
1 | Action|Adventure|Sci-Fi | Star Wars: Episode VI - Return of the Jedi (1983) | 14.477997 |
2 | Action|Adventure|Sci-Fi | Star Wars: Episode V - The Empire Strikes Back... | 13.837748 |
3 | Action|Adventure|Sci-Fi|Thriller | Star Trek: First Contact (1996) | 13.092430 |
4 | Children|Comedy|Fantasy|Musical | Willy Wonka & the Chocolate Factory (1971) | 11.858896 |
As you can see, the genre of the nearest neighboors is similar and the movies are related to the movie query