Skip to content

Commit

Permalink
Eliminate VersionType in favor of OpType
Browse files Browse the repository at this point in the history
  • Loading branch information
hannes-ucsc committed Oct 25, 2024
1 parent 0949201 commit 3d8cf1b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 79 deletions.
104 changes: 35 additions & 69 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,20 +1076,6 @@ def filter(self, relation: str, values: list[AnyJSON]) -> list[JSON]:
FieldTypes = Mapping[str, FieldTypes1]
CataloguedFieldTypes = Mapping[CatalogName, FieldTypes]


class VersionType(Enum):
# No versioning; document is created or overwritten as needed
none = auto()

# Writing a document fails with 409 conflict if one with the same ID already
# exists in the index
create_only = auto()

# Use the Elasticsearch "internal" versioning type
# https://www.elastic.co/guide/en/elasticsearch/reference/6.8/docs-index_.html#_version_types
internal = auto()


InternalVersion = tuple[int, int]


Expand All @@ -1112,24 +1098,11 @@ class OpType(Enum):


@attr.s(frozen=False, kw_only=True, auto_attribs=True)
class Document(Generic[C]):
class Document(Generic[C], metaclass=ABCMeta):
needs_translation: ClassVar[bool] = True

coordinates: C

#: By default, instances are fixed to VersionType.none. A subclass may
#: decide to make the version_type attribute mutable but it still needs to
#: declare what VersionType its instances use initially. This is so that
#: factories of these instances can make the necessary preparations.
#:
initial_version_type: ClassVar[VersionType] = VersionType.none
version_type: VersionType = attr.ib(default=initial_version_type,
on_setattr=attr.setters.frozen)

# For VersionType.internal, version is a tuple composed of the sequence
# number and primary term. For VersionType.none and .create_only, it is
# None.
# https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-bulk.html#bulk-api-response-body
version: Optional[InternalVersion]

# In the index, the `contents` property is always present and never null in
Expand All @@ -1144,6 +1117,22 @@ class Document(Generic[C]):
def entity(self) -> EntityReference:
return self.coordinates.entity

@property
@abstractmethod
def op_type(self) -> OpType:
"""
Get the ES client method to use when writing this document to the index.
"""
raise NotImplementedError

@op_type.setter
def op_type(self, value: OpType):
"""
Set the ES client method to use when writing this document to the index.
This is an optional operation.
"""
raise NotImplementedError

@classmethod
def field_types(cls, field_types: FieldTypes) -> FieldTypes:
return {
Expand Down Expand Up @@ -1302,14 +1291,11 @@ def from_index(cls,
document = cls.translate_fields(document,
field_types[coordinates.entity.catalog],
forward=False)
if cls.initial_version_type is VersionType.internal:
try:
version = (hit['_seq_no'], hit['_primary_term'])
except KeyError:
assert '_seq_no' not in hit
assert '_primary_term' not in hit
version = None
else:
try:
version = hit['_seq_no'], hit['_primary_term']
except KeyError:
assert '_seq_no' not in hit
assert '_primary_term' not in hit
version = None

return cls.from_json(coordinates=coordinates,
Expand All @@ -1333,31 +1319,17 @@ def to_index(self,
:return: Request parameters for indexing
"""
op_type = self.op_type
coordinates = self.coordinates.with_catalog(catalog)
result = {
'index': coordinates.index_name,
'id': self.coordinates.document_id
}
if op_type is not OpType.delete:
if self.op_type is not OpType.delete:
result['body'] = self._body(field_types[coordinates.entity.catalog])

if self.version_type is VersionType.none:
pass
elif self.version_type is VersionType.create_only:
assert op_type is OpType.create, op_type
elif self.version_type is VersionType.internal:
if self.version is not None:
# For internal versioning, self.version is None for new documents
result['if_seq_no'], result['if_primary_term'] = self.version
else:
assert False, self.version_type
if self.version is not None:
result['if_seq_no'], result['if_primary_term'] = self.version
return result

@property
def op_type(self) -> OpType:
raise NotImplementedError

def _body(self, field_types: FieldTypes) -> JSON:
body = self.to_json()
if self.needs_translation:
Expand All @@ -1377,10 +1349,17 @@ class Contribution(Document[ContributionCoordinates[E]]):
contents: JSON
source: DocumentSource

#: The version_type attribute will change to VersionType.none if writing
#: The op_type attribute will change to OpType.index if writing
#: to Elasticsearch fails with 409
initial_version_type: ClassVar[VersionType] = VersionType.create_only
version_type: VersionType = initial_version_type
_op_type: OpType = OpType.create

@property
def op_type(self) -> OpType:
return self._op_type

@op_type.setter
def op_type(self, op_type: OpType):
self._op_type = op_type

def __attrs_post_init__(self):
assert self.contents is not None
Expand Down Expand Up @@ -1438,21 +1417,9 @@ def to_json(self):
bundle_version=self.coordinates.bundle.version,
bundle_deleted=self.coordinates.deleted)

@property
def op_type(self) -> OpType:
if self.version_type is VersionType.create_only:
return OpType.create
elif self.version_type is VersionType.none:
return OpType.index
else:
assert False, self.version_type


@attr.s(frozen=False, kw_only=True, auto_attribs=True)
class Aggregate(Document[AggregateCoordinates]):
initial_version_type: ClassVar[VersionType] = VersionType.internal
version_type: VersionType = attr.ib(default=initial_version_type,
on_setattr=attr.setters.frozen)
sources: set[DocumentSource]
bundles: Optional[list[BundleFQIDJSON]]
num_contributions: int
Expand Down Expand Up @@ -1560,7 +1527,6 @@ def to_json(self) -> JSON:

@property
def op_type(self) -> OpType:
assert self.version_type is VersionType.none, self.version_type
return OpType.update

def _body(self, field_types: FieldTypes) -> JSON:
Expand Down
25 changes: 15 additions & 10 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
OpType,
Replica,
ReplicaCoordinates,
VersionType,
)
from azul.indexer.document_service import (
DocumentService,
Expand Down Expand Up @@ -438,7 +437,7 @@ def contribute(self,
else:
entity = CataloguedEntityReference.for_entity(catalog, c.coordinates.entity)
# Don't count overwrites, but ensure entry exists
was_overwrite = c.version_type is VersionType.none
was_overwrite = c.op_type is OpType.index
tallies[entity] += 0 if was_overwrite else 1
contributions = retry_contributions
writer.raise_on_errors()
Expand Down Expand Up @@ -610,8 +609,6 @@ def _read_contributions(self,
num_contributions = sum(tallies.values())
log.info('Reading %i expected contribution(s)', num_contributions)

is_internal_version = Contribution.initial_version_type is VersionType.internal

def pages() -> Iterable[JSONs]:
body = dict(query=query)
while True:
Expand All @@ -620,7 +617,7 @@ def pages() -> Iterable[JSONs]:
body=body,
size=config.contribution_page_size,
track_total_hits=False,
seq_no_primary_term=is_internal_version)
seq_no_primary_term=True)
hits = response['hits']['hits']
log.debug('Read a page with %i contribution(s)', len(hits))
if hits:
Expand Down Expand Up @@ -963,14 +960,22 @@ def _on_conflict(self, doc: Document, e: Union[Exception, JSON]):
self.retries.add(doc.coordinates)
else:
action = 'giving up'
if doc.version_type is VersionType.create_only:
log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates)
# Try again but allow overwriting
doc.version_type = VersionType.none
else:

def warn():
log.warning('There was a conflict with document %r: %r. Total # of errors: %i, %s.',
doc.coordinates, e, self.conflicts[doc.coordinates], action)

if doc.op_type is OpType.create:
try:
doc.op_type = OpType.index
except AttributeError:
# We don't expect all Document types will let us modify op_type
warn()
else:
log.warning('Document %r exists. Retrying with overwrite.', doc.coordinates)
else:
warn()

def raise_on_errors(self):
if self.errors or self.conflicts:
log.warning('Failures: %r', self.errors)
Expand Down

0 comments on commit 3d8cf1b

Please sign in to comment.