Skip to content

Commit

Permalink
Remove bulk request concerns from Document.to_index
Browse files Browse the repository at this point in the history
  • Loading branch information
hannes-ucsc committed Oct 24, 2024
1 parent 1b14b36 commit 90a037f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
23 changes: 6 additions & 17 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -1318,8 +1318,7 @@ def from_index(cls,

def to_index(self,
catalog: Optional[CatalogName],
field_types: CataloguedFieldTypes,
bulk: bool = False
field_types: CataloguedFieldTypes
) -> JSON:
"""
Build request parameters from the document for indexing
Expand All @@ -1328,27 +1327,17 @@ def to_index(self,
coordinates must supply it. Otherwise this document's
coordinates must supply the same catalog or none at all.
:param field_types: A mapping of field paths to field type
:param bulk: If bulk indexing
:return: Request parameters for indexing
"""
op_type = self.op_type
coordinates = self.coordinates.with_catalog(catalog)
result = {
'_index' if bulk else 'index': coordinates.index_name,
**(
{}
if op_type is OpType.delete else
{
'_source' if bulk else 'body':
self._body(field_types[coordinates.entity.catalog])
}
),
'_id' if bulk else 'id': self.coordinates.document_id
'index': coordinates.index_name,
'id': self.coordinates.document_id
}
# For non-bulk updates, self.op_type determines which client
# method is invoked.
if bulk:
result['_op_type'] = op_type.name
if op_type is not OpType.delete:
result['body'] = self._body(field_types[coordinates.entity.catalog])

if self.version_type is VersionType.none:
pass
elif self.version_type is VersionType.create_only:
Expand Down
30 changes: 25 additions & 5 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
attrgetter,
)
from typing import (
Any,
MutableSet,
Optional,
TYPE_CHECKING,
Expand Down Expand Up @@ -881,11 +882,25 @@ def _write_bulk(self, documents: Iterable[Document]):
doc.coordinates: doc
for doc in documents
}
actions = [
doc.to_index(self.catalog, self.field_types, bulk=True)
for doc in documents.values()
]

def expand_action(doc: Any) -> tuple[dict[str, Any], dict[str, Any] | None]:
# Document.to_index returns the keyword arguments to the ES client
# method referenced by Document.op_type. In bulk requests, these
# methods are not invoked individually. This function converts the
# keyword arguments returned by Document.to_index to the form
# internally used by the ES client's `bulk` method: a pair
# consisting of 1) the action and associated metadata and 2) an
# optional document source.
assert isinstance(doc, Document), doc
action = dict(doc.to_index(self.catalog, self.field_types))
action['_index'] = action.pop('index')
action['_id'] = action.pop('id')
body = action.pop('body', None)
action = {doc.op_type.name: action}
return action, body

log.info('Writing documents using streaming_bulk().')

# We cannot use parallel_bulk() for 1024+ actions because Lambda doesn't
# support shared memory. See the issue below for details.
#
Expand All @@ -896,8 +911,13 @@ def _write_bulk(self, documents: Iterable[Document]):
# There is no way to split a single action and hence a single document
# into multiple requests.
#
# Technically, we're not supposed to pass Document instances in the
# `action` parameter but we're exploiting the undocumented fact that the
# method immediately maps the value of the `expand_action_callback`
# parameter over the list passed in the `actions` parameter.
response = streaming_bulk(client=self.es_client,
actions=actions,
actions=list(documents.values()),
expand_action_callback=expand_action,
refresh=self.refresh,
raise_on_error=False,
max_chunk_bytes=config.max_chunk_size)
Expand Down

0 comments on commit 90a037f

Please sign in to comment.