From fecad0237caa569b0966f9266aa468200d7d1961 Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 7 Nov 2023 17:15:35 +0800 Subject: [PATCH 01/10] added creation of new index datahub_update_event during elasticsearch setup --- docker/elasticsearch-setup/create-indices.sh | 4 ++++ .../resources/index/usage-event/update_event_template.json | 0 2 files changed, 4 insertions(+) create mode 100644 metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json diff --git a/docker/elasticsearch-setup/create-indices.sh b/docker/elasticsearch-setup/create-indices.sh index 343013402394f..d3c4384ef18d9 100755 --- a/docker/elasticsearch-setup/create-indices.sh +++ b/docker/elasticsearch-setup/create-indices.sh @@ -107,6 +107,10 @@ function create_datahub_usage_event_datastream() { create_if_not_exists "_index_template/${PREFIX}datahub_usage_event_index_template" index_template.json # 3. although indexing request creates the data stream, it's not queryable before creation, causing GMS to throw exceptions create_if_not_exists "_data_stream/${PREFIX}datahub_usage_event" "datahub_usage_event" + + # Create index template for update_events + create_if_not_exists "_index_template/${PREFIX}datahub_update_event_index_template" update_event_template.json + create_if_not_exists "_data_stream/${PREFIX}datahub_update_event" "datahub_update_event" } # create indices for ES OSS (AWS) diff --git a/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json b/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json new file mode 100644 index 0000000000000..e69de29bb2d1d From 4e228e7570f0c11e9e34cea029bceed7fc4933ca Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 7 Nov 2023 17:22:09 +0800 Subject: [PATCH 02/10] Added function to extract information from update event into a JSON string --- .../SearchDocumentTransformer.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java index 76f4736f2746e..29fba6992c72a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.schema.DataSchema; import com.linkedin.data.template.RecordTemplate; @@ -14,6 +15,7 @@ import com.linkedin.metadata.models.annotation.SearchableAnnotation.FieldType; import com.linkedin.metadata.models.extractor.FieldExtractor; +import com.linkedin.mxe.MetadataChangeLog; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -82,6 +84,23 @@ public Optional transformAspect( return Optional.of(searchDocument.toString()); } + //TODO: Transform event into a JSON-like string to upsert into search + public String transformEvent(MetadataChangeLog event) { + final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode(); + searchDocument.put("urn", event.getEntityUrn().toString()); + searchDocument.put("changeType", event.getChangeType().toString()); + searchDocument.put("aspectName", event.getAspectName().toString()); + + AuditStamp eventCreationInfo = event.getCreated(); + String actor = eventCreationInfo.getActor().toString(); + Long time = eventCreationInfo.getTime(); + + searchDocument.put("actorUrn", actor); + searchDocument.put("timestamp", time); + searchDocument.put("@timestamp", time); + return searchDocument.toString(); + } + public void setSearchableValue(final SearchableFieldSpec fieldSpec, final List fieldValues, final ObjectNode searchDocument, final Boolean forDelete) { DataSchema.Type valueType = fieldSpec.getPegasusSchema().getType(); From b16f90c2d08b6992977fa608b45605e67f54a432 Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 7 Nov 2023 17:25:41 +0800 Subject: [PATCH 03/10] Added template for datahub_update_event index --- .../usage-event/update_event_template.json | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json b/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json index e69de29bb2d1d..9cbde54d0921d 100644 --- a/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json +++ b/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json @@ -0,0 +1,23 @@ +{ + "index_patterns": ["*PREFIXdatahub_update_event*"], + "data_stream": { }, + "priority": 499, + "template": { + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "type": { + "type": "keyword" + }, + "timestamp": { + "type": "date" + } + } + }, + "settings": { + "index.lifecycle.name": "PREFIXdatahub_usage_event_policy" + } + } +} \ No newline at end of file From 47a2191ecdd89468f8dc6af10a9e84b8182cd795 Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 7 Nov 2023 17:27:09 +0800 Subject: [PATCH 04/10] Added function to generate SHA256 hashes for document IDs --- .../metadata/search/utils/SearchUtils.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java index 38bdef5bd3bdc..efa0ee0b08b76 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java @@ -2,6 +2,7 @@ import com.linkedin.common.UrnArray; import com.linkedin.common.urn.Urn; +import com.linkedin.data.ByteString; import com.linkedin.data.template.LongMap; import com.linkedin.metadata.query.ListResult; import com.linkedin.metadata.query.SearchFlags; @@ -22,6 +23,10 @@ import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -65,6 +70,16 @@ public static Optional getDocId(@Nonnull Urn urn) { } } + public static String getDocHash(@Nonnull String aspectValue) { + try { + MessageDigest hashDigest = MessageDigest.getInstance("SHA-256"); + String hashId = Base64.getEncoder().encodeToString(hashDigest.digest(aspectValue.getBytes(StandardCharsets.UTF_8))); + return hashId; + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + /** * Validates the request params and create a request map out of it. * From 64fea48c8bc6a85a6205541472a621f170f07495 Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 7 Nov 2023 17:34:30 +0800 Subject: [PATCH 05/10] Updated hook to process UPSERT events into datahub_update_index in ES --- .../metadata/search/EntitySearchService.java | 2 ++ .../elasticsearch/ElasticSearchService.java | 5 +++++ .../elasticsearch/update/ESWriteDAO.java | 13 ++++++++++++ .../kafka/hook/UpdateIndicesHook.java | 21 +++++++++++++++++++ 4 files changed, 41 insertions(+) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index 4c5a2cc11dc8b..c5d96757c9219 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -38,6 +38,8 @@ public interface EntitySearchService { */ void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId); + void createUpdateDocument(@Nonnull String document, @Nonnull String docId); + /** * Deletes the document with the given document ID from the index. * diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index f3352484455b4..aceb2ed9a41c4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -70,6 +70,11 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, esWriteDAO.upsertDocument(entityName, document, docId); } + public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { + log.debug(String.format("Creating Update document document: %s, docId %s", document, docId)); + esWriteDAO.createUpdateDocument(document, docId); + } + @Override public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) { log.debug(String.format("Deleting Search document entityName: %s, docId: %s", entityName, docId)); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index 1a63f2d4d0312..a90eb1c43c3e2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -6,7 +6,9 @@ import javax.annotation.Nonnull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; @@ -46,6 +48,17 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, bulkProcessor.add(updateRequest); } + public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { + final String indexName = indexConvention.getIndexName("datahub_update_event"); + final IndexRequest indexRequest = new IndexRequest( + indexName) + .id(docId) + .source(document, XContentType.JSON) + .opType(DocWriteRequest.OpType.CREATE); + + bulkProcessor.add(indexRequest); + } + /** * Deletes the document with the given document ID from the index. * diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index 6899a69b4260a..0ec6d2ba97868 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -194,6 +194,9 @@ private void handleUpdateChangeEvent(@Nonnull final MetadataChangeLog event) { } else { updateGraphService(urn, aspectSpec, aspect, event); } + + // Populate update index with update event + updateUpdateIndex(event); } /** @@ -489,6 +492,24 @@ private void updateTimeseriesFields(String entityType, String aspectName, Urn ur }); } + /** + * Process event and update Update index in Elastic + */ + private void updateUpdateIndex(@Nonnull final MetadataChangeLog event) { + // Events are only handled if their change type is UPDATE or DELETE + // Events with different change types still go through invoke but nothing is executed + // Doing the population of the event index allows us to capture deletes as well + // Should we check if the update is successful before populating update event index? + String updateDocument = _searchDocumentTransformer.transformEvent(event); + + // Generating a hash using event urn, event content and time, to be used as a unique id for ES documents + String stringToBeHashed = event.getEntityType().toString() + '_' + + event.getAspect().getValue().asAvroString() + + '_' + event.getCreated().getTime().toString(); + String docHash = SearchUtils.getDocHash(stringToBeHashed); + _entitySearchService.createUpdateDocument(updateDocument, docHash); + } + private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { _systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName()); From 19b80dc2d1ec7d8014ac299094f2dba1d00bbe8e Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 7 Nov 2023 17:44:01 +0800 Subject: [PATCH 06/10] Cleaned up comments and added descriptions for functions --- .../com/linkedin/metadata/search/EntitySearchService.java | 5 +++++ .../metadata/search/elasticsearch/ElasticSearchService.java | 1 + .../metadata/search/elasticsearch/update/ESWriteDAO.java | 6 ++++++ .../search/transformer/SearchDocumentTransformer.java | 1 - 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index c5d96757c9219..3ef6fb86ac12f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -38,6 +38,11 @@ public interface EntitySearchService { */ void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId); + /** + * + * @param document the document to update into update index + * @param docId the ID of the document + */ void createUpdateDocument(@Nonnull String document, @Nonnull String docId); /** diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index aceb2ed9a41c4..170285527b361 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -70,6 +70,7 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, esWriteDAO.upsertDocument(entityName, document, docId); } + @Override public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { log.debug(String.format("Creating Update document document: %s, docId %s", document, docId)); esWriteDAO.createUpdateDocument(document, docId); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index a90eb1c43c3e2..eced7714b5497 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -48,6 +48,12 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, bulkProcessor.add(updateRequest); } + /** + * Creates a request to insert new document into datahub_update_event index + * + * @param document the document to insert + * @param docId the ID of the document + */ public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { final String indexName = indexConvention.getIndexName("datahub_update_event"); final IndexRequest indexRequest = new IndexRequest( diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java index 29fba6992c72a..e6eb31c8983cd 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java @@ -84,7 +84,6 @@ public Optional transformAspect( return Optional.of(searchDocument.toString()); } - //TODO: Transform event into a JSON-like string to upsert into search public String transformEvent(MetadataChangeLog event) { final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode(); searchDocument.put("urn", event.getEntityUrn().toString()); From 3fa538508b22aa71f05bcd46f44e6009805fad31 Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 14 Nov 2023 10:09:57 +0800 Subject: [PATCH 07/10] Removed unused imports and comments --- .../java/com/linkedin/metadata/search/utils/SearchUtils.java | 1 - .../com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java | 4 ---- 2 files changed, 5 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java index efa0ee0b08b76..c31b19b7a546b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/SearchUtils.java @@ -2,7 +2,6 @@ import com.linkedin.common.UrnArray; import com.linkedin.common.urn.Urn; -import com.linkedin.data.ByteString; import com.linkedin.data.template.LongMap; import com.linkedin.metadata.query.ListResult; import com.linkedin.metadata.query.SearchFlags; diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index 0ec6d2ba97868..eb21915ceb343 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -496,10 +496,6 @@ private void updateTimeseriesFields(String entityType, String aspectName, Urn ur * Process event and update Update index in Elastic */ private void updateUpdateIndex(@Nonnull final MetadataChangeLog event) { - // Events are only handled if their change type is UPDATE or DELETE - // Events with different change types still go through invoke but nothing is executed - // Doing the population of the event index allows us to capture deletes as well - // Should we check if the update is successful before populating update event index? String updateDocument = _searchDocumentTransformer.transformEvent(event); // Generating a hash using event urn, event content and time, to be used as a unique id for ES documents From 40b9a52686c2abce10f755d7843e78d2ce121c27 Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 14 Nov 2023 14:59:37 +0800 Subject: [PATCH 08/10] added entityType to the ES update event document --- .../metadata/search/transformer/SearchDocumentTransformer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java index e6eb31c8983cd..83a90034a2758 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java @@ -88,6 +88,7 @@ public String transformEvent(MetadataChangeLog event) { final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode(); searchDocument.put("urn", event.getEntityUrn().toString()); searchDocument.put("changeType", event.getChangeType().toString()); + searchDocument.put("entityType", event.getEntityType().toString()); searchDocument.put("aspectName", event.getAspectName().toString()); AuditStamp eventCreationInfo = event.getCreated(); From 56e9cb20b825d151203febbc849b43826e60676d Mon Sep 17 00:00:00 2001 From: daniel Date: Mon, 20 Nov 2023 17:11:43 +0800 Subject: [PATCH 09/10] Removed creation of document hash for document id to let ES auto generate docID --- .../com/linkedin/metadata/search/EntitySearchService.java | 4 +--- .../search/elasticsearch/ElasticSearchService.java | 6 +++--- .../metadata/search/elasticsearch/update/ESWriteDAO.java | 4 +--- .../linkedin/metadata/kafka/hook/UpdateIndicesHook.java | 8 +------- 4 files changed, 6 insertions(+), 16 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java index 3ef6fb86ac12f..6414afa45168f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/EntitySearchService.java @@ -39,11 +39,9 @@ public interface EntitySearchService { void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId); /** - * * @param document the document to update into update index - * @param docId the ID of the document */ - void createUpdateDocument(@Nonnull String document, @Nonnull String docId); + void createUpdateDocument(@Nonnull String document); /** * Deletes the document with the given document ID from the index. diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 170285527b361..cc57e211e2c29 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -71,9 +71,9 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, } @Override - public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { - log.debug(String.format("Creating Update document document: %s, docId %s", document, docId)); - esWriteDAO.createUpdateDocument(document, docId); + public void createUpdateDocument(@Nonnull String document) { + log.debug(String.format("Creating Update document document: %s", document)); + esWriteDAO.createUpdateDocument(document); } @Override diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index eced7714b5497..c9a9ea14d5789 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -52,13 +52,11 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, * Creates a request to insert new document into datahub_update_event index * * @param document the document to insert - * @param docId the ID of the document */ - public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) { + public void createUpdateDocument(@Nonnull String document) { final String indexName = indexConvention.getIndexName("datahub_update_event"); final IndexRequest indexRequest = new IndexRequest( indexName) - .id(docId) .source(document, XContentType.JSON) .opType(DocWriteRequest.OpType.CREATE); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java index eb21915ceb343..72aafa67324d9 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHook.java @@ -497,13 +497,7 @@ private void updateTimeseriesFields(String entityType, String aspectName, Urn ur */ private void updateUpdateIndex(@Nonnull final MetadataChangeLog event) { String updateDocument = _searchDocumentTransformer.transformEvent(event); - - // Generating a hash using event urn, event content and time, to be used as a unique id for ES documents - String stringToBeHashed = event.getEntityType().toString() + '_' - + event.getAspect().getValue().asAvroString() - + '_' + event.getCreated().getTime().toString(); - String docHash = SearchUtils.getDocHash(stringToBeHashed); - _entitySearchService.createUpdateDocument(updateDocument, docHash); + _entitySearchService.createUpdateDocument(updateDocument); } private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { From 94aef19443f52c810a00692838bcc148e1d361f1 Mon Sep 17 00:00:00 2001 From: daniel Date: Tue, 21 Nov 2023 11:47:34 +0800 Subject: [PATCH 10/10] added explicit definition of update index mappings and disabled dynamic mappings --- .../usage-event/update_event_template.json | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json b/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json index 9cbde54d0921d..d2d32e4cf17aa 100644 --- a/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json +++ b/metadata-service/restli-servlet-impl/src/main/resources/index/usage-event/update_event_template.json @@ -4,18 +4,31 @@ "priority": 499, "template": { "mappings": { + "dynamic" : false, "properties": { "@timestamp": { "type": "date" }, - "type": { + "urn": { + "type": "keyword" + }, + "changeType": { + "type": "keyword" + }, + "entityType": { + "type": "keyword" + }, + "aspectName": { + "type": "keyword" + }, + "actorUrn": { "type": "keyword" }, "timestamp": { "type": "date" } } - }, + }, "settings": { "index.lifecycle.name": "PREFIXdatahub_usage_event_policy" }