Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docarray/array/mixins/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def find(

if isinstance(query, dict):
if filter is None:
return self._filter(query)
return self._filter(query, limit=limit)
else:
raise ValueError(
'filter and query cannot be both dict type, set only one for filtering'
Expand Down Expand Up @@ -256,7 +256,8 @@ def _find(

def _filter(
self,
query: Dict,
query: Union[Dict, List[Dict]],
limit: Optional[Union[int, float]] = 20,
) -> 'DocumentArray':
"""Returns a subset of documents by filtering by the given query.

Expand Down
18 changes: 10 additions & 8 deletions docarray/array/storage/annlite/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def _map_embedding(self, embedding: 'ArrayType') -> 'ArrayType':
embedding = np.asarray(embedding).squeeze()
return embedding

def _normalize_columns(self, columns):
columns = super()._normalize_columns(columns)
for i in range(len(columns)):
columns[i] = (
columns[i][0],
self._map_type(columns[i][1]),
)
return columns

def _init_storage(
self,
_docs: Optional['DocumentArraySourceType'] = None,
Expand All @@ -71,14 +80,7 @@ def _init_storage(

self._config = config

if self._config.columns is None:
self._config.columns = []

for i in range(len(self._config.columns)):
self._config.columns[i] = (
self._config.columns[i][0],
self._map_type(self._config.columns[i][1]),
)
self._config.columns = self._normalize_columns(self._config.columns)

config = asdict(config)
self.n_dim = config.pop('n_dim')
Expand Down
5 changes: 5 additions & 0 deletions docarray/array/storage/base/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ def _map_embedding(self, embedding: 'ArrayType') -> 'ArrayType':

def _map_type(self, col_type: str) -> str:
return self.TYPE_MAP[col_type].type

def _normalize_columns(self, columns):
if columns is None:
return []
return columns
19 changes: 17 additions & 2 deletions docarray/array/storage/elastic/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

from ..base.backend import BaseBackendMixin
from ..base.backend import BaseBackendMixin, TypeMap
from .... import Document
from ....helper import dataclass_from_dict
from ....helper import dataclass_from_dict, _safe_cast_int

if TYPE_CHECKING:
from ....typing import (
Expand Down Expand Up @@ -48,6 +48,12 @@ class ElasticConfig:
class BackendMixin(BaseBackendMixin):
"""Provide necessary functions to enable this storage backend."""

TYPE_MAP = {
'str': TypeMap(type='text', converter=str),
'float': TypeMap(type='float', converter=float),
'int': TypeMap(type='integer', converter=_safe_cast_int),
}

def _init_storage(
self,
_docs: Optional['DocumentArraySourceType'] = None,
Expand All @@ -70,6 +76,9 @@ def _init_storage(

self._index_name_offset2id = 'offset2id__' + config.index_name
self._config = config

self._config.columns = self._normalize_columns(self._config.columns)

self.n_dim = self._config.n_dim
self._client = self._build_client()
self._build_offset2id_index()
Expand Down Expand Up @@ -112,6 +121,12 @@ def _build_schema_from_elastic_config(self, elastic_config):
'index': True,
}

for col, coltype in self._config.columns:
da_schema['mappings']['properties'][col] = {
'type': self._map_type(coltype),
'index': True,
}

if self._config.m or self._config.ef_construction:
index_options = {
'type': 'hnsw',
Expand Down
57 changes: 46 additions & 11 deletions docarray/array/storage/elastic/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ....score import NamedScore
from ....array.mixins.find import FindMixin as BaseFindMixin


if TYPE_CHECKING:
import tensorflow
import torch
Expand All @@ -27,11 +28,23 @@
tensorflow.Tensor,
torch.Tensor,
Sequence[float],
Dict,
)


class FindMixin(BaseFindMixin):
def _find_similar_vectors(self, query: 'ElasticArrayType', limit=10):
def _find_similar_vectors(
self, query: 'ElasticArrayType', filter: Optional[Dict] = None, limit=10
):
"""
Return vector search results for the input query. `script_score` will be used in filter_field is set.
:param query: query vector used for vector search
:param filter: filter query used for pre-filtering
:param limit: number of items to be retrieved
:return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing
the closest Document objects for each of the queries in `query`.
"""

query = to_numpy_array(query)
is_all_zero = np.all(query == 0)
if is_all_zero:
Expand All @@ -45,6 +58,7 @@ def _find_similar_vectors(self, query: 'ElasticArrayType', limit=10):
'k': limit,
'num_candidates': 10000,
},
filter=filter,
)
list_of_hits = resp['hits']['hits']

Expand All @@ -62,10 +76,8 @@ def _find_similar_documents_from_text(
):
"""
Return keyword matches for the input query

:param query: text used for keyword search
:param limit: number of items to be retrieved

:return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing
the closest Document objects for each of the queries in `query`.
"""
Expand Down Expand Up @@ -93,7 +105,11 @@ def _find_by_text(
query = [query]

return [
self._find_similar_documents_from_text(q, index=index, limit=limit)
self._find_similar_documents_from_text(
q,
index=index,
limit=limit,
)
for q in query
]

Expand All @@ -105,21 +121,40 @@ def _find(
**kwargs,
) -> List['DocumentArray']:
"""Returns approximate nearest neighbors given a batch of input queries.

:param query: input supported to be stored in Elastic. This includes any from the list '[np.ndarray, tensorflow.Tensor, torch.Tensor, Sequence[float]]'
:param limit: number of retrieved items
:param filter: filter query used for pre-filtering
Comment thread
jemmyshin marked this conversation as resolved.

:return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing
the closest Document objects for each of the queries in `query`.
"""
if filter is not None:
raise ValueError(
'Filtered vector search is not supported for ElasticSearch backend'
)
query = np.array(query)
num_rows, n_dim = ndarray.get_array_rows(query)
if n_dim != 2:
query = query.reshape((num_rows, -1))

return [self._find_similar_vectors(q, limit=limit) for q in query]
return [
self._find_similar_vectors(q, filter=filter, limit=limit) for q in query
]

def _find_with_filter(self, query: Dict, limit: Optional[Union[int, float]] = 20):
resp = self._client.search(
index=self._config.index_name,
query=query,
size=limit,
)

list_of_hits = resp['hits']['hits']

da = DocumentArray()
for result in list_of_hits[:limit]:
doc = Document.from_base64(result['_source']['blob'])
doc.scores['score'] = NamedScore(value=result['_score'])
da.append(doc)

return da

def _filter(
self, query: Dict, limit: Optional[Union[int, float]] = 20
) -> 'DocumentArray':

return self._find_with_filter(query, limit=limit)
2 changes: 2 additions & 0 deletions docarray/array/storage/elastic/getsetdel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ class GetSetDelMixin(BaseGetSetDelMixin):
and ``__delitem__`` for ``DocumentArrayElastic``"""

def _document_to_elastic(self, doc: 'Document') -> Dict:
extra_columns = {col: doc.tags.get(col) for col, _ in self._config.columns}
request = {
'_op_type': 'index',
'_id': doc.id,
'_index': self._config.index_name,
'embedding': self._map_embedding(doc.embedding),
'blob': doc.to_base64(),
**extra_columns,
}

if self._config.tag_indices:
Expand Down
3 changes: 1 addition & 2 deletions docarray/array/storage/qdrant/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def _init_storage(
self._config = config
self._persist = bool(self._config.collection_name)

if self._config.columns is None:
self._config.columns = []
self._config.columns = self._normalize_columns(self._config.columns)

self._config.collection_name = (
self.__class__.__name__ + random_identity()
Expand Down
3 changes: 1 addition & 2 deletions docarray/array/storage/weaviate/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ def _init_storage(
)
self._config = config

if self._config.columns is None:
self._config.columns = []
self._config.columns = self._normalize_columns(self._config.columns)

self._schemas = self._load_or_create_weaviate_schema()

Expand Down
2 changes: 1 addition & 1 deletion docs/advanced/document-store/annlite.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Consider we want the nearest vectors to the embedding `[8. 8. 8.]`, with the res
prices must follow a filter. As an example, let's consider that retrieved documents must have `price` value lower
or equal than `max_price`. We can encode this information in annlite using `filter = {'price': {'$lte': max_price}}`.

Then the search with the proposed filter can implemented and used with the following code:
Then the search with the proposed filter can be implemented and used with the following code:

```python
max_price = 7
Expand Down
126 changes: 126 additions & 0 deletions docs/advanced/document-store/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,132 @@ da2.summary()

Other functions behave the same as in-memory DocumentArray.

### Vector search with filter query
One can perform Approximate Nearest Neighbor Search and pre-filter results using a filter query that follows [ElasticSearch's DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html).


Consider Documents with embeddings `[0,0,0]` up to ` [9,9,9]` where the document with embedding `[i,i,i]`
has as tag `price` with value `i`. We can create such example with the following code:


```python
from docarray import Document, DocumentArray
import numpy as np

n_dim = 3

da = DocumentArray(
storage='elasticsearch',
config={'n_dim': n_dim, 'columns': [('price', 'int')], 'distance': 'l2_norm'},
)

with da:
da.extend(
[
Document(id=f'r{i}', embedding=i * np.ones(n_dim), tags={'price': i})
for i in range(10)
]
)

print('\nIndexed Prices:\n')
for embedding, price in zip(da.embeddings, da[:, 'tags__price']):
print(f'\tembedding={embedding},\t price={price}')
```

Consider we want the nearest vectors to the embedding `[8. 8. 8.]`, with the restriction that
prices must follow a filter. As an example, let's consider that retrieved documents must have `price` value lower
or equal than `max_price`. We can encode this information in ElasticSearch using `filter = {'range': {'price': {'lte': max_price}}}`.

Then the search with the proposed filter can be implemented and used with the following code:

```python
max_price = 7
n_limit = 4

np_query = np.ones(n_dim) * 8
print(f'\nQuery vector: \t{np_query}')

filter = {'range': {'price': {'lte': max_price}}}
results = da.find(np_query, filter=filter, limit=n_limit)

print('\nEmbeddings Nearest Neighbours with "price" at most 7:\n')
for embedding, price in zip(results.embeddings, results[:, 'tags__price']):
print(f'\tembedding={embedding},\t price={price}')
```

This would print:

```bash
Embeddings Nearest Neighbours with "price" at most 7:

embedding=[7. 7. 7.], price=7
embedding=[6. 6. 6.], price=6
embedding=[5. 5. 5.], price=5
embedding=[4. 4. 4.], price=4
```


### Search by filter query

One can search with user-defined query filters using the `.find` method. Such queries can be constructed following the
guidelines in [ElasticSearch's Documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html).

Consider you store Documents with a certain tag `price` into ElasticSearch and you want to retrieve all Documents
with `price` lower or equal to some `max_price` value.


You can index such Documents as follows:
```python
from docarray import Document, DocumentArray

n_dim = 3
da = DocumentArray(
storage='elasticsearch',
config={
'n_dim': n_dim,
'columns': [('price', 'float')],
},
)

with da:
da.extend([Document(id=f'r{i}', tags={'price': i}) for i in range(10)])

print('\nIndexed Prices:\n')
for price in da[:, 'tags__price']:
print(f'\t price={price}')
```

Then you can retrieve all documents whose price is lower than or equal to `max_price` by applying the following
filter:

```python
max_price = 3
n_limit = 4

filter = {
'range': {
'price': {
'lte': max_price,
}
}
}
results = da.find(filter=filter)

print('\n Returned examples that verify filter "price at most 3":\n')
for price in results[:, 'tags__price']:
print(f'\t price={price}')
```

This would print

```
Returned examples that satisfy condition "price at most 3":

price=0
price=1
price=2
price=3
```

### Search by `.text` field

Expand Down
Loading