""" gather functions necessary to build an index """
import logging
from typing import Dict, Optional, Tuple, Union, Callable, Any, List
import faiss
import pandas as pd
from embedding_reader import EmbeddingReader
from autofaiss.external.metadata import IndexMetadata
from autofaiss.external.optimize import check_if_index_needs_training, get_optimal_index_keys_v2, get_optimal_train_size
from autofaiss.utils.cast import cast_bytes_to_memory_string, cast_memory_to_bytes, to_readable_time
from autofaiss.utils.decorators import Timeit
from autofaiss.indices import distributed
from autofaiss.indices.index_utils import initialize_direct_map
from autofaiss.indices.training import create_and_train_new_index
from autofaiss.indices.build import add_embeddings_to_index_local
logger = logging.getLogger("autofaiss")
[docs]
def estimate_memory_required_for_index_creation(
nb_vectors: int,
vec_dim: int,
index_key: Optional[str] = None,
max_index_memory_usage: Optional[str] = None,
make_direct_map: bool = False,
nb_indices_to_keep: int = 1,
) -> Tuple[int, str]:
"""
Estimates the RAM necessary to create the index
The value returned is in Bytes
"""
if index_key is None:
if max_index_memory_usage is not None:
index_key = get_optimal_index_keys_v2(
nb_vectors, vec_dim, max_index_memory_usage, make_direct_map=make_direct_map
)[0]
else:
raise ValueError("you should give max_index_memory_usage value if no index_key is given")
metadata = IndexMetadata(index_key, nb_vectors, vec_dim, make_direct_map)
index_memory = metadata.estimated_index_size_in_bytes()
needed_for_adding = min(index_memory * 0.1, 10**9)
index_needs_training = check_if_index_needs_training(index_key)
if index_needs_training:
# Compute the smallest number of vectors required to train the index given
# the maximal memory constraint
nb_vectors_train = get_optimal_train_size(nb_vectors, index_key, "1K", vec_dim)
memory_for_training = metadata.compute_memory_necessary_for_training(nb_vectors_train)
else:
memory_for_training = 0
# the calculation for max_index_memory_in_one_index comes from the way we split batches
# see _batch_loader in distributed.py
max_index_memory_in_one_index = index_memory // nb_indices_to_keep + index_memory % nb_indices_to_keep
return int(max(max_index_memory_in_one_index + needed_for_adding, memory_for_training)), index_key
[docs]
def get_estimated_construction_time_infos(nb_vectors: int, vec_dim: int, indent: int = 0) -> str:
"""
Gives a general approximation of the construction time of the index
"""
size = 4 * nb_vectors * vec_dim
train = 1000 # seconds, depends on the number of points for training
add = 450 * size / (150 * 1024**3) # seconds, Linear approx (450s for 150GB in classic conditions)
infos = (
f"-> Train: {to_readable_time(train, rounding=True)}\n"
f"-> Add: {to_readable_time(add, rounding=True)}\n"
f"Total: {to_readable_time(train + add, rounding=True)}"
)
tab = "\t" * indent
infos = tab + infos.replace("\n", "\n" + tab)
return infos
[docs]
def add_embeddings_to_index(
embedding_reader: EmbeddingReader,
trained_index_or_path: Union[str, faiss.Index],
metadata: IndexMetadata,
current_memory_available: str,
embedding_ids_df_handler: Optional[Callable[[pd.DataFrame, int], Any]] = None,
distributed_engine: Optional[str] = None,
temporary_indices_folder: str = "hdfs://root/tmp/distributed_autofaiss_indices",
nb_indices_to_keep: int = 1,
index_optimizer: Optional[Callable] = None,
) -> Tuple[Optional[faiss.Index], Optional[Dict[str, str]]]:
"""Add embeddings to the index"""
with Timeit("-> Adding the vectors to the index", indent=2):
# Estimate memory available for adding embeddings to index
size_per_index = metadata.estimated_index_size_in_bytes() / nb_indices_to_keep
memory_available_for_adding = cast_bytes_to_memory_string(
cast_memory_to_bytes(current_memory_available) - size_per_index
)
logger.info(
f"The memory available for adding the vectors is {memory_available_for_adding}"
"(total available - used by the index)"
)
if distributed_engine is None:
return add_embeddings_to_index_local(
embedding_reader=embedding_reader,
trained_index_or_path=trained_index_or_path,
memory_available_for_adding=memory_available_for_adding,
embedding_ids_df_handler=embedding_ids_df_handler,
index_optimizer=index_optimizer,
add_embeddings_with_ids=False,
)
elif distributed_engine == "pyspark":
return distributed.add_embeddings_to_index_distributed(
trained_index_or_path=trained_index_or_path,
embedding_reader=embedding_reader,
memory_available_for_adding=memory_available_for_adding,
embedding_ids_df_handler=embedding_ids_df_handler,
temporary_indices_folder=temporary_indices_folder,
nb_indices_to_keep=nb_indices_to_keep,
index_optimizer=index_optimizer,
)
else:
raise ValueError(f'Distributed by {distributed_engine} is not supported, only "pyspark" is supported')
[docs]
def create_index(
embedding_reader: EmbeddingReader,
index_key: str,
metric_type: Union[str, int],
current_memory_available: str,
embedding_ids_df_handler: Optional[Callable[[pd.DataFrame, int], Any]] = None,
use_gpu: bool = False,
make_direct_map: bool = False,
distributed_engine: Optional[str] = None,
temporary_indices_folder: str = "hdfs://root/tmp/distributed_autofaiss_indices",
nb_indices_to_keep: int = 1,
index_optimizer: Optional[Callable] = None,
) -> Tuple[Optional[faiss.Index], Optional[Dict[str, str]]]:
"""
Create an index and add embeddings to the index
"""
metadata = IndexMetadata(index_key, embedding_reader.count, embedding_reader.dimension, make_direct_map)
# Create and train index
trained_index = create_and_train_new_index(
embedding_reader, index_key, metadata, metric_type, current_memory_available, use_gpu
)
# Add embeddings to index
index, metrics = add_embeddings_to_index(
embedding_reader,
trained_index,
metadata,
current_memory_available,
embedding_ids_df_handler,
distributed_engine,
temporary_indices_folder,
nb_indices_to_keep,
index_optimizer,
)
if make_direct_map:
initialize_direct_map(index)
return index, metrics
[docs]
def create_partitioned_indexes(
partitions: List[str],
output_root_dir: str,
embedding_column_name: str = "embedding",
index_key: Optional[str] = None,
index_path: Optional[str] = None,
id_columns: Optional[List[str]] = None,
should_be_memory_mappable: bool = False,
max_index_query_time_ms: float = 10.0,
max_index_memory_usage: str = "16G",
min_nearest_neighbors_to_retrieve: int = 20,
current_memory_available: str = "32G",
use_gpu: bool = False,
metric_type: str = "ip",
nb_cores: Optional[int] = None,
make_direct_map: bool = False,
temp_root_dir: str = "hdfs://root/tmp/distributed_autofaiss_indices",
big_index_threshold: int = 5_000_000,
nb_splits_per_big_index: int = 1,
maximum_nb_threads: int = 256,
) -> List[Optional[Dict[str, str]]]:
"""
Create partitioned indexes from a list of parquet partitions, i.e. create one index per parquet partition
Only supported with Pyspark. An active PySpark session must exist before calling this method
"""
return distributed.create_partitioned_indexes(
partitions=partitions,
big_index_threshold=big_index_threshold,
output_root_dir=output_root_dir,
nb_cores=nb_cores,
nb_splits_per_big_index=nb_splits_per_big_index,
id_columns=id_columns,
max_index_query_time_ms=max_index_query_time_ms,
min_nearest_neighbors_to_retrieve=min_nearest_neighbors_to_retrieve,
embedding_column_name=embedding_column_name,
index_key=index_key,
index_path=index_path,
max_index_memory_usage=max_index_memory_usage,
current_memory_available=current_memory_available,
use_gpu=use_gpu,
metric_type=metric_type,
make_direct_map=make_direct_map,
should_be_memory_mappable=should_be_memory_mappable,
temp_root_dir=temp_root_dir,
maximum_nb_threads=maximum_nb_threads,
)