diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index 4c5a2cc11dc8b..c5d96757c9219 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -38,6 +38,8 @@ public interface EntitySearchService { */ void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId); + void createUpdateDocument(@Nonnull String document, @Nonnull String docId); + /** * Deletes the document with the given document ID from the index. * diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index f3352484455b4..aceb2ed9a41c4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -70,6 +70,11 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, esWriteDAO.upsertDocument(entityName, document, docId); } + public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { + log.debug(String.format("Creating Update document document: %s, docId %s", document, docId)); + esWriteDAO.createUpdateDocument(document, docId); + } + @Override public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) { log.debug(String.format("Deleting Search document entityName: %s, docId: %s", entityName, docId)); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index 1a63f2d4d0312..a90eb1c43c3e2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -6,7 +6,9 @@ import javax.annotation.Nonnull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; @@ -46,6 +48,17 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, bulkProcessor.add(updateRequest); } + public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { + final String indexName = indexConvention.getIndexName("datahub_update_event"); + final IndexRequest indexRequest = new IndexRequest( + indexName) + .id(docId) + .source(document, XContentType.JSON) + .opType(DocWriteRequest.OpType.CREATE); + + bulkProcessor.add(indexRequest); + } + /** * Deletes the document with the given document ID from the index. * diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index 6899a69b4260a..0ec6d2ba97868 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -194,6 +194,9 @@ private void handleUpdateChangeEvent(@Nonnull final MetadataChangeLog event) { } else { updateGraphService(urn, aspectSpec, aspect, event); } + + // Populate update index with update event + updateUpdateIndex(event); } /** @@ -489,6 +492,24 @@ private void updateTimeseriesFields(String entityType, String aspectName, Urn ur }); } + /** + * Process event and update Update index in Elastic + */ + private void updateUpdateIndex(@Nonnull final MetadataChangeLog event) { + // Events are only handled if their change type is UPDATE or DELETE + // Events with different change types still go through invoke but nothing is executed + // Doing the population of the event index allows us to capture deletes as well + // Should we check if the update is successful before populating update event index? + String updateDocument = _searchDocumentTransformer.transformEvent(event); + + // Generating a hash using event urn, event content and time, to be used as a unique id for ES documents + String stringToBeHashed = event.getEntityType().toString() + '_' + + event.getAspect().getValue().asAvroString() + + '_' + event.getCreated().getTime().toString(); + String docHash = SearchUtils.getDocHash(stringToBeHashed); + _entitySearchService.createUpdateDocument(updateDocument, docHash); + } + private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { _systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName());