Skip to content

Commit

Permalink
Updated hook to process UPSERT events into datahub_update_index in ES
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkoh94 committed Nov 7, 2023
1 parent 47a2191 commit 64fea48
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface EntitySearchService {
*/
void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId);

void createUpdateDocument(@Nonnull String document, @Nonnull String docId);

/**
* Deletes the document with the given document ID from the index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document,
esWriteDAO.upsertDocument(entityName, document, docId);
}

public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) {
log.debug(String.format("Creating Update document document: %s, docId %s", document, docId));
esWriteDAO.createUpdateDocument(document, docId);
}

@Override
public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) {
log.debug(String.format("Deleting Search document entityName: %s, docId: %s", entityName, docId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -46,6 +48,17 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document,
bulkProcessor.add(updateRequest);
}

public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) {
final String indexName = indexConvention.getIndexName("datahub_update_event");
final IndexRequest indexRequest = new IndexRequest(
indexName)
.id(docId)
.source(document, XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);

bulkProcessor.add(indexRequest);
}

/**
* Deletes the document with the given document ID from the index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ private void handleUpdateChangeEvent(@Nonnull final MetadataChangeLog event) {
} else {
updateGraphService(urn, aspectSpec, aspect, event);
}

// Populate update index with update event
updateUpdateIndex(event);
}

/**
Expand Down Expand Up @@ -489,6 +492,24 @@ private void updateTimeseriesFields(String entityType, String aspectName, Urn ur
});
}

/**
* Process event and update Update index in Elastic
*/
private void updateUpdateIndex(@Nonnull final MetadataChangeLog event) {
// Events are only handled if their change type is UPDATE or DELETE
// Events with different change types still go through invoke but nothing is executed
// Doing the population of the event index allows us to capture deletes as well
// Should we check if the update is successful before populating update event index?
String updateDocument = _searchDocumentTransformer.transformEvent(event);

// Generating a hash using event urn, event content and time, to be used as a unique id for ES documents
String stringToBeHashed = event.getEntityType().toString() + '_'
+ event.getAspect().getValue().asAvroString()
+ '_' + event.getCreated().getTime().toString();
String docHash = SearchUtils.getDocHash(stringToBeHashed);
_entitySearchService.createUpdateDocument(updateDocument, docHash);
}

private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
_systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName());

Expand Down

0 comments on commit 64fea48

Please sign in to comment.