Module elastiknn.models

Expand source code
import json
from concurrent.futures.thread import ThreadPoolExecutor
from logging import Logger
from time import time
from typing import List, Union

import numpy as np
from elasticsearch import Elasticsearch
from scipy.sparse import csr_matrix
from tqdm import tqdm

from . import ELASTIKNN_NAME
from .api import Mapping, Vec, NearestNeighborsQuery, Similarity
from .client import ElastiknnClient
from .utils import canonical_vectors_to_elastiknn, valid_metrics_algos, dealias_metric, ndarray_to_dense_float_vectors


class ElastiknnModel(object):

    def __init__(self, algorithm: str, metric: str, es: Elasticsearch = None, mapping_params: dict = {},
                 query_params: dict = {}, index: str = None):
        self._logger = Logger(self.__class__.__name__)
        self._vec_field = "vec"
        self._stored_id_field = "id"
        self._index = index
        self._mapping_params = mapping_params
        self._query_params = query_params
        self._eknn = ElastiknnClient(es)
        self._algorithm = algorithm
        assert (self._algorithm, metric) in valid_metrics_algos, \
            f"algorithm [{algorithm}] and metric [{metric}] should be one of {valid_metrics_algos}"
        self._metric = dealias_metric(metric)
        self._dims = None # Defined in fit()
        self._query = None # Defined in fit() and set_query_params()

    def fit(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.DenseFloat]], shards: int = 1):
        self._dims = len(X[0])
        vecs = canonical_vectors_to_elastiknn(X)

        mapping, self._query = self._mk_mapping_query(self._query_params)

        if self._index is None:
            self._index = f"{ELASTIKNN_NAME}-{int(time())}"
            self._logger.warning(f"index was not given, using {self._index} instead")

        self._eknn.es.indices.delete(index=self._index, ignore_unavailable=True)
        self._eknn.es.indices.create(index=self._index, settings=dict(number_of_shards=shards, elastiknn=True, number_of_replicas=0))
        self._eknn.put_mapping(self._index, self._vec_field, mapping, self._stored_id_field)

        self._logger.info(f"indexing {len(X)} vectors into index {self._index}")
        ids = map(lambda i: str(i + 1), range(len(X)))  # Add one because 0 is an invalid id in ES.
        self._eknn.index(self._index, self._vec_field, vecs, self._stored_id_field, ids, refresh=True)
        self._eknn.es.indices.forcemerge(index=self._index, max_num_segments=1)
        self._eknn.index(self._index, self._vec_field, [], self._stored_id_field, [], refresh=True)


    def set_query_params(self, query_params: dict = None):
        self._query_params = query_params
        (_, self._query) = self._mk_mapping_query(self._query_params)

    def kneighbors(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.DenseFloat], List[Vec.Base]],
                   n_neighbors: int, return_similarity: bool = False, progbar: bool = False):

        inds = np.zeros((len(X), n_neighbors), dtype=np.int32) - 1
        sims = inds * np.nan

        for i, v in tqdm(enumerate(canonical_vectors_to_elastiknn(X)), disable=not progbar):
            res = self._eknn.nearest_neighbors(self._index, self._query.with_vec(v), self._stored_id_field,
                                               n_neighbors, fetch_source=False)
            try:
                hits = res['hits']['hits']
            except KeyError as ex:
                self._logger.warning(f"Query returned no hits: {res}")
                continue
            for j, hit in enumerate(hits):
                inds[i][j] = int(hit['fields'][self._stored_id_field][0]) - 1  # Subtract one from id because 0 is an invalid id in ES.
                sims[i][j] = float(hit['_score'])

        if return_similarity:
            return inds, sims
        else:
            return inds

    def _mk_mapping_query(self, query_params: dict()) -> (Mapping.Base, NearestNeighborsQuery.Base):
        field = "vec"
        dummy = Vec.Indexed("", "", "")
        if self._algorithm == "exact":
            if self._metric == 'l1':
                return Mapping.DenseFloat(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.L1)
            elif self._metric == 'l2':
                return Mapping.DenseFloat(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.L2)
            elif self._metric == 'cosine':
                return Mapping.DenseFloat(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.Cosine)
            elif self._metric == 'jaccard':
                return Mapping.SparseBool(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.Jaccard)
            elif self._metric == 'hamming':
                return Mapping.SparseBool(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.Hamming)
        elif self._algorithm == 'lsh':
            if self._metric == 'l2':
                m, q = Mapping.L2Lsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.L2Lsh(field, dummy, **query_params)
                return m, q
            elif self._metric == 'cosine':
                return Mapping.CosineLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.CosineLsh(field, dummy, **query_params)
            elif self._metric == 'hamming':
                return Mapping.CosineLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.HammingLsh(field, dummy, **query_params)
            elif self._metric == 'jaccard':
                return Mapping.JaccardLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.JaccardLsh(field, dummy, **query_params)
        elif self._algorithm == 'permutation_lsh':
            if self._metric == 'cosine':
                return Mapping.PermutationLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.PermutationLsh(field, dummy, Similarity.Cosine, **query_params)

        raise NameError

Classes

class ElastiknnModel (algorithm: str, metric: str, es: elasticsearch.Elasticsearch = None, mapping_params: dict = {}, query_params: dict = {}, index: str = None)
Expand source code
class ElastiknnModel(object):

    def __init__(self, algorithm: str, metric: str, es: Elasticsearch = None, mapping_params: dict = {},
                 query_params: dict = {}, index: str = None):
        self._logger = Logger(self.__class__.__name__)
        self._vec_field = "vec"
        self._stored_id_field = "id"
        self._index = index
        self._mapping_params = mapping_params
        self._query_params = query_params
        self._eknn = ElastiknnClient(es)
        self._algorithm = algorithm
        assert (self._algorithm, metric) in valid_metrics_algos, \
            f"algorithm [{algorithm}] and metric [{metric}] should be one of {valid_metrics_algos}"
        self._metric = dealias_metric(metric)
        self._dims = None # Defined in fit()
        self._query = None # Defined in fit() and set_query_params()

    def fit(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.DenseFloat]], shards: int = 1):
        self._dims = len(X[0])
        vecs = canonical_vectors_to_elastiknn(X)

        mapping, self._query = self._mk_mapping_query(self._query_params)

        if self._index is None:
            self._index = f"{ELASTIKNN_NAME}-{int(time())}"
            self._logger.warning(f"index was not given, using {self._index} instead")

        self._eknn.es.indices.delete(index=self._index, ignore_unavailable=True)
        self._eknn.es.indices.create(index=self._index, settings=dict(number_of_shards=shards, elastiknn=True, number_of_replicas=0))
        self._eknn.put_mapping(self._index, self._vec_field, mapping, self._stored_id_field)

        self._logger.info(f"indexing {len(X)} vectors into index {self._index}")
        ids = map(lambda i: str(i + 1), range(len(X)))  # Add one because 0 is an invalid id in ES.
        self._eknn.index(self._index, self._vec_field, vecs, self._stored_id_field, ids, refresh=True)
        self._eknn.es.indices.forcemerge(index=self._index, max_num_segments=1)
        self._eknn.index(self._index, self._vec_field, [], self._stored_id_field, [], refresh=True)


    def set_query_params(self, query_params: dict = None):
        self._query_params = query_params
        (_, self._query) = self._mk_mapping_query(self._query_params)

    def kneighbors(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.DenseFloat], List[Vec.Base]],
                   n_neighbors: int, return_similarity: bool = False, progbar: bool = False):

        inds = np.zeros((len(X), n_neighbors), dtype=np.int32) - 1
        sims = inds * np.nan

        for i, v in tqdm(enumerate(canonical_vectors_to_elastiknn(X)), disable=not progbar):
            res = self._eknn.nearest_neighbors(self._index, self._query.with_vec(v), self._stored_id_field,
                                               n_neighbors, fetch_source=False)
            try:
                hits = res['hits']['hits']
            except KeyError as ex:
                self._logger.warning(f"Query returned no hits: {res}")
                continue
            for j, hit in enumerate(hits):
                inds[i][j] = int(hit['fields'][self._stored_id_field][0]) - 1  # Subtract one from id because 0 is an invalid id in ES.
                sims[i][j] = float(hit['_score'])

        if return_similarity:
            return inds, sims
        else:
            return inds

    def _mk_mapping_query(self, query_params: dict()) -> (Mapping.Base, NearestNeighborsQuery.Base):
        field = "vec"
        dummy = Vec.Indexed("", "", "")
        if self._algorithm == "exact":
            if self._metric == 'l1':
                return Mapping.DenseFloat(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.L1)
            elif self._metric == 'l2':
                return Mapping.DenseFloat(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.L2)
            elif self._metric == 'cosine':
                return Mapping.DenseFloat(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.Cosine)
            elif self._metric == 'jaccard':
                return Mapping.SparseBool(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.Jaccard)
            elif self._metric == 'hamming':
                return Mapping.SparseBool(self._dims), NearestNeighborsQuery.Exact(field, dummy, Similarity.Hamming)
        elif self._algorithm == 'lsh':
            if self._metric == 'l2':
                m, q = Mapping.L2Lsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.L2Lsh(field, dummy, **query_params)
                return m, q
            elif self._metric == 'cosine':
                return Mapping.CosineLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.CosineLsh(field, dummy, **query_params)
            elif self._metric == 'hamming':
                return Mapping.CosineLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.HammingLsh(field, dummy, **query_params)
            elif self._metric == 'jaccard':
                return Mapping.JaccardLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.JaccardLsh(field, dummy, **query_params)
        elif self._algorithm == 'permutation_lsh':
            if self._metric == 'cosine':
                return Mapping.PermutationLsh(self._dims, **self._mapping_params), \
                       NearestNeighborsQuery.PermutationLsh(field, dummy, Similarity.Cosine, **query_params)

        raise NameError

Methods

def fit(self, X: Union[numpy.ndarray, scipy.sparse._csr.csr_matrix, List[elastiknn.api.Vec.SparseBool], List[elastiknn.api.Vec.DenseFloat]], shards: int = 1)
Expand source code
def fit(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.DenseFloat]], shards: int = 1):
    self._dims = len(X[0])
    vecs = canonical_vectors_to_elastiknn(X)

    mapping, self._query = self._mk_mapping_query(self._query_params)

    if self._index is None:
        self._index = f"{ELASTIKNN_NAME}-{int(time())}"
        self._logger.warning(f"index was not given, using {self._index} instead")

    self._eknn.es.indices.delete(index=self._index, ignore_unavailable=True)
    self._eknn.es.indices.create(index=self._index, settings=dict(number_of_shards=shards, elastiknn=True, number_of_replicas=0))
    self._eknn.put_mapping(self._index, self._vec_field, mapping, self._stored_id_field)

    self._logger.info(f"indexing {len(X)} vectors into index {self._index}")
    ids = map(lambda i: str(i + 1), range(len(X)))  # Add one because 0 is an invalid id in ES.
    self._eknn.index(self._index, self._vec_field, vecs, self._stored_id_field, ids, refresh=True)
    self._eknn.es.indices.forcemerge(index=self._index, max_num_segments=1)
    self._eknn.index(self._index, self._vec_field, [], self._stored_id_field, [], refresh=True)
def kneighbors(self, X: Union[numpy.ndarray, scipy.sparse._csr.csr_matrix, List[elastiknn.api.Vec.SparseBool], List[elastiknn.api.Vec.DenseFloat], List[elastiknn.api.Vec.Base]], n_neighbors: int, return_similarity: bool = False, progbar: bool = False)
Expand source code
def kneighbors(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.DenseFloat], List[Vec.Base]],
               n_neighbors: int, return_similarity: bool = False, progbar: bool = False):

    inds = np.zeros((len(X), n_neighbors), dtype=np.int32) - 1
    sims = inds * np.nan

    for i, v in tqdm(enumerate(canonical_vectors_to_elastiknn(X)), disable=not progbar):
        res = self._eknn.nearest_neighbors(self._index, self._query.with_vec(v), self._stored_id_field,
                                           n_neighbors, fetch_source=False)
        try:
            hits = res['hits']['hits']
        except KeyError as ex:
            self._logger.warning(f"Query returned no hits: {res}")
            continue
        for j, hit in enumerate(hits):
            inds[i][j] = int(hit['fields'][self._stored_id_field][0]) - 1  # Subtract one from id because 0 is an invalid id in ES.
            sims[i][j] = float(hit['_score'])

    if return_similarity:
        return inds, sims
    else:
        return inds
def set_query_params(self, query_params: dict = None)
Expand source code
def set_query_params(self, query_params: dict = None):
    self._query_params = query_params
    (_, self._query) = self._mk_mapping_query(self._query_params)