{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# MovieLens Example\n", "\n", "In this notebook, we train an AverageModel on the MovieLens dataset with a BPRLoss.\n", "\n", "The pipeline is made of 4 steps\n", "\n", "- step 1: given the MovieLens ratings.csv file, create tfrecords for the training, evaluation and test sets.\n", "- step 2: train an AverageModel (optionally, use tf-yarn to distribute training and evaluation on a cluster) and export the embeddings as well as the graph.\n", "- step 3: write predictions, i.e. for all the test timelines, compute the user representation.\n", "- step 4: evaluate predictions, i.e. look how similar the recommendations provided by the model are to the actual movies seen by the user in the \"future\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Install" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install deepr[cpu] faiss_cpu" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Download the dataset\n", "\n", "First download the movielens dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!wget http://files.grouplens.org/datasets/movielens/ml-20m.zip\n", "!unzip ml-20m.zip" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import logging\n", "\n", "import tensorflow as tf\n", "\n", "import deepr\n", "from deepr.examples import movielens" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "logging.basicConfig(level=logging.INFO)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "dataset_path = os.getcwd() + \"/ml-20m\"\n", "path_ratings = f\"{dataset_path}/ratings.csv\"\n", "path_root = \"average\"\n", "path_model = path_root + \"/model\"\n", "path_data = path_root + \"/data\"\n", "path_variables = path_root + \"/variables\"\n", "path_predictions = path_root + \"/predictions.parquet.snappy\"\n", "path_saved_model = path_root + \"/saved_model\"\n", "path_mapping = path_data + \"/mapping.txt\"\n", "path_train = path_data + \"/train.tfrecord.gz\"\n", "path_eval = path_data + \"/eval.tfrecord.gz\"\n", "path_test = path_data + \"/test.tfrecord.gz\"\n", "deepr.io.Path(path_root).mkdir(exist_ok=True)\n", "deepr.io.Path(path_model).mkdir(exist_ok=True)\n", "deepr.io.Path(path_data).mkdir(exist_ok=True)\n", "max_steps = 100_000" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1 - Build TF Records\n", "\n", "The job takes as input the csv ratings and create 3 tfrecords files (train, validation, test). \n", "\n", "Each file contains timeline of user ratings split into input and target." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "build = movielens.jobs.BuildRecords(\n", " path_ratings=path_ratings,\n", " path_mapping=path_mapping,\n", " path_train=path_train,\n", " path_eval=path_eval,\n", " path_test=path_test,\n", " min_rating=4,\n", " min_length=5,\n", " num_negatives=8,\n", " target_ratio=0.2,\n", " size_test=10_000,\n", " size_eval=10_000,\n", " shuffle_timelines=True,\n", " seed=2020,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that the job is defined, let's run it. \n", "\n", "We run it before defining the other jobs because we need to know the vocabulary size (number of movies in the dataset) to build the embedding matrix." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "build.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2 - Training\n", "\n", "This defines a training job to build a model that transform a timeline of movies embedding to an user embedding.\n", "It takes as input the tf records and upon completion writes 3 artifacts :\n", "* dataframe of the biases and the embeddings\n", "* saved model : protobuf containing the model definition and weights\n", "\n", "In this specific instance we train an average model with a BPR loss and compute a triple precision on the validation set." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "train = deepr.jobs.Trainer(\n", " path_model=path_model,\n", " pred_fn=movielens.layers.AverageModel(vocab_size=deepr.vocab.size(path_mapping), dim=100, keep_prob=0.5),\n", " loss_fn=movielens.layers.Loss(loss=\"bpr\", vocab_size=deepr.vocab.size(path_mapping)),\n", " optimizer_fn=deepr.optimizers.TensorflowOptimizer(\"LazyAdam\", 0.001),\n", " train_input_fn=deepr.readers.TFRecordReader(path_train),\n", " eval_input_fn=deepr.readers.TFRecordReader(path_eval, shuffle=False),\n", " prepro_fn=movielens.prepros.RecordPrepro(\n", " min_input_size=3,\n", " min_target_size=3,\n", " max_input_size=50,\n", " max_target_size=50,\n", " buffer_size=1024,\n", " batch_size=128,\n", " repeat_size=None,\n", " prefetch_size=1,\n", " num_parallel_calls=8,\n", " ),\n", " train_spec=deepr.jobs.TrainSpec(max_steps=max_steps),\n", " eval_spec=deepr.jobs.EvalSpec(steps=None, start_delay_secs=30, throttle_secs=30),\n", " final_spec=deepr.jobs.FinalSpec(steps=None),\n", " exporters=[\n", " # The training will keep the model with the best triplet precision\n", " deepr.exporters.BestCheckpoint(metric=\"triplet_precision\", mode=\"increase\"),\n", " # Export biases and embeddings as a dataframe\n", " deepr.exporters.SaveVariables(path_variables=path_variables, variable_names=[\"biases\", \"embeddings\"]),\n", " # Export a saved model using specified fields as input\n", " deepr.exporters.SavedModel(\n", " path_saved_model=path_saved_model,\n", " fields=[\n", " deepr.Field(name=\"inputPositives\", shape=(None,), dtype=tf.int64),\n", " deepr.Field(name=\"inputMask\", shape=(None,), dtype=tf.bool),\n", " ],\n", " ),\n", " ],\n", " train_hooks=[\n", " # Log metrics, hyperparams, initial values to the console, and optionally mlflow and graphite\n", " deepr.hooks.LoggingTensorHookFactory(\n", " name=\"training\",\n", " functions={\n", " \"memory_gb\": deepr.hooks.ResidentMemory(unit=\"gb\"),\n", " \"max_memory_gb\": deepr.hooks.MaxResidentMemory(unit=\"gb\"),\n", " },\n", " every_n_iter=300,\n", " use_graphite=False,\n", " use_mlflow=False,\n", " ),\n", " deepr.hooks.SummarySaverHookFactory(save_steps=300),\n", " deepr.hooks.NumParamsHook(use_mlflow=False),\n", " deepr.hooks.LogVariablesInitHook(use_mlflow=False),\n", " deepr.hooks.StepsPerSecHook(\n", " name=\"training\",\n", " batch_size=128,\n", " every_n_steps=300,\n", " skip_after_step=max_steps,\n", " use_mlflow=False,\n", " use_graphite=False,\n", " ),\n", " # Stop the training if triplet precision does not improve\n", " deepr.hooks.EarlyStoppingHookFactory(\n", " metric=\"triplet_precision\",\n", " mode=\"increase\",\n", " max_steps_without_improvement=1000,\n", " min_steps=5_000,\n", " run_every_steps=300,\n", " final_step=max_steps,\n", " ),\n", " ],\n", " eval_hooks=[deepr.hooks.LoggingTensorHookFactory(name=\"validation\", at_end=True)],\n", " final_hooks=[deepr.hooks.LoggingTensorHookFactory(name=\"final_validation\", at_end=True)],\n", " train_metrics=[deepr.metrics.StepCounter(name=\"num_steps\"), deepr.metrics.DecayMean(tensors=[\"loss\"], decay=0.98)],\n", " eval_metrics=[deepr.metrics.Mean(tensors=[\"loss\", \"triplet_precision\"])],\n", " final_metrics=[deepr.metrics.Mean(tensors=[\"loss\", \"triplet_precision\"])],\n", " run_config=deepr.jobs.RunConfig(\n", " save_checkpoints_steps=300, save_summary_steps=300, keep_checkpoint_max=None, log_step_count_steps=300\n", " ),\n", " config_proto=deepr.jobs.ConfigProto(\n", " inter_op_parallelism_threads=8, intra_op_parallelism_threads=8, gpu_device_count=0, cpu_device_count=48,\n", " ),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3 - Predict job\n", "\n", "The predict job reloads the test dataset and the saved model, performs the inference and writes the user embeddings in a dataframe to be reused later by the validation job." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "predict = movielens.jobs.Predict(\n", " path_saved_model=path_saved_model,\n", " path_predictions=path_predictions,\n", " input_fn=deepr.readers.TFRecordReader(path_test, shuffle=False),\n", " prepro_fn=movielens.prepros.RecordPrepro(),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4 - Evaluation job\n", "\n", "This job takes user embedding, and uses faiss to retrieve the k nearest neighboors in the product embedding space, \n", "and compute metrics using the target timelines." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "evaluate = [\n", " movielens.jobs.Evaluate(\n", " path_predictions=path_predictions,\n", " path_embeddings=path_variables + \"/embeddings\",\n", " path_biases=path_variables + \"/biases\",\n", " k=k,\n", " )\n", " for k in [10, 20, 50]\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run the pipeline\n", "All the jobs definition are lazy, and so is the pipeline. \n", "Calling run on it will actually perform all these steps." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "pipeline = deepr.jobs.Pipeline([train, predict] + evaluate)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You should expecte the following metrics\n", "\n", "```\n", "INFO:deepr.examples.movielens.jobs.evaluate:precision@10 = 0.15774000000000002\n", "recall@10 = 0.22966825396825397\n", "f1@10 = 0.17782589040302665\n", "NDCG@10 = 0.218460915855854\n", "INFO:deepr.examples.movielens.jobs.evaluate:precision@20 = 0.12564500000000003\n", "recall@20 = 0.29865131767226577\n", "f1@20 = 0.16083200785947185\n", "NDCG@20 = 0.24568841120719748\n", "INFO:deepr.examples.movielens.jobs.evaluate:precision@50 = 0.08614800000000002\n", "recall@50 = 0.44487653142758526\n", "f1@50 = 0.1303801216635994\n", "NDCG@50 = 0.3022301157999229\n", "```\n", "\n", "You can find configs on [github](https://github.com/criteo/deepr/tree/master/deepr/examples/movielens/configs) that achieve better performance." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize embeddings with a KNN\n", "\n", "Let's check if the movie embeddings produce make sense\n", "* load the movie embeddings\n", "* load the movie title\n", "* build a knn index on that\n", "* do a query with a known movie to check that its closest neighboors make sense" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import pyarrow.parquet as pq\n", "import faiss\n", "import pyarrow.csv as pc\n", "from IPython.display import display\n", "import pandas as pd" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "embeddings = np.vstack(pq.read_table(\"average/variables/embeddings\").to_pandas().to_numpy())\n", "mapp = {int(movie_id):indice for indice, movie_id in enumerate(open(\"average/data/mapping.txt\", \"r\").read().split(\"\\n\"))}\n", "inversed_map = {indice:movie_id for movie_id, indice in mapp.items()}\n", "index = faiss.IndexFlatIP(embeddings.shape[-1])\n", "index.add(np.ascontiguousarray(embeddings))\n", "\n", "def knn_query(index, query, ksearch):\n", " D, I = index.search(np.expand_dims(query,0), ksearch)\n", " distances = D[0]\n", " product_indices = I[0]\n", " product_ids = [inversed_map[i] for i in product_indices]\n", " return list(zip(product_ids, distances))\n", "\n", "movies = pc.read_csv(f\"{dataset_path}/movies.csv\").to_pandas()\n", " \n", "def display_results_df(results):\n", " data = [[movies[movies.movieId == movie_id][\"genres\"].to_numpy()[0], movies[movies.movieId == movie_id][\"title\"].to_numpy()[0], distance] for movie_id, distance in results]\n", " \n", " df = pd.DataFrame(data, columns = ['Genre', 'Title', 'Distance']) \n", " display(df)\n", " \n", "def display_movie(movie_id):\n", " data = [[(movies[movies.movieId == movie_id][\"genres\"].to_numpy()[0]), (movies[movies.movieId == movie_id][\"title\"].to_numpy()[0])]]\n", " \n", " df = pd.DataFrame(data, columns = ['Genre', 'Title']) \n", " display(df)" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Query\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
GenreTitle
0Action|Adventure|Sci-FiStar Wars: Episode IV - A New Hope (1977)
\n", "
" ], "text/plain": [ " Genre Title\n", "0 Action|Adventure|Sci-Fi Star Wars: Episode IV - A New Hope (1977)" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Knn results\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
GenreTitleDistance
0Action|Adventure|Sci-FiStar Wars: Episode IV - A New Hope (1977)16.765795
1Action|Adventure|Sci-FiStar Wars: Episode VI - Return of the Jedi (1983)14.477997
2Action|Adventure|Sci-FiStar Wars: Episode V - The Empire Strikes Back...13.837748
3Action|Adventure|Sci-Fi|ThrillerStar Trek: First Contact (1996)13.092430
4Children|Comedy|Fantasy|MusicalWilly Wonka & the Chocolate Factory (1971)11.858896
\n", "
" ], "text/plain": [ " Genre \\\n", "0 Action|Adventure|Sci-Fi \n", "1 Action|Adventure|Sci-Fi \n", "2 Action|Adventure|Sci-Fi \n", "3 Action|Adventure|Sci-Fi|Thriller \n", "4 Children|Comedy|Fantasy|Musical \n", "\n", " Title Distance \n", "0 Star Wars: Episode IV - A New Hope (1977) 16.765795 \n", "1 Star Wars: Episode VI - Return of the Jedi (1983) 14.477997 \n", "2 Star Wars: Episode V - The Empire Strikes Back... 13.837748 \n", "3 Star Trek: First Contact (1996) 13.092430 \n", "4 Willy Wonka & the Chocolate Factory (1971) 11.858896 " ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "p = movies[movies.title.str.lower().str.contains(\"star wars\")][\"movieId\"].to_numpy()[0]\n", "print(\"Query\")\n", "display_movie(p)\n", "print(\"Knn results\")\n", "display_results_df(knn_query(index, embeddings[mapp[p]], 5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see, the genre of the nearest neighboors is similar and the movies are related to the movie query" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 4 }