"""Compute MovieLens predictions."""
import logging
from typing import Callable
from dataclasses import dataclass
import numpy as np
import tensorflow as tf
import pyarrow as pa
import deepr
try:
import pandas as pd
except ImportError as e:
print(f"Pandas needs to be installed for MovieLens {e}")
LOGGER = logging.getLogger(__name__)
COLUMNS = ["uid", "user", "input", "target"]
SCHEMA = pa.schema(
[
("uid", pa.int64()),
("user", pa.list_(pa.float32())),
("input", pa.list_(pa.int64())),
("target", pa.list_(pa.int64())),
]
)
[docs]@dataclass
class Predict(deepr.jobs.Job):
"""Compute MovieLens predictions."""
path_saved_model: str
path_predictions: str
input_fn: Callable[[], tf.data.Dataset]
prepro_fn: Callable[[tf.data.Dataset, str], tf.data.Dataset]
[docs] def run(self):
LOGGER.info(f"Computing predictions from {self.path_saved_model}")
predictor = deepr.predictors.SavedModelPredictor(
path=deepr.predictors.get_latest_saved_model(self.path_saved_model)
)
predictions = []
for preds in predictor(lambda: self.prepro_fn(self.input_fn(), tf.estimator.ModeKeys.PREDICT)):
for uid, user, input_idx, input_mask, target_idx, target_mask in zip(
preds["uid"],
preds["userEmbeddings"],
preds["inputPositives"],
preds["inputMask"],
preds["targetPositives"],
preds["targetMask"],
):
predictions.append(
(
uid,
user.astype(np.float32).tolist(),
input_idx[input_mask].astype(np.int64).tolist(),
target_idx[target_mask].astype(np.int64).tolist(),
)
)
with deepr.io.ParquetDataset(self.path_predictions).open() as ds:
df = pd.DataFrame(data=predictions, columns=COLUMNS)
ds.write_pandas(df, compression="snappy", schema=SCHEMA)
LOGGER.info(f"Wrote predictions to {self.path_predictions}")