Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gms): store update events in a new index in ElasticSearch #135

Merged
merged 10 commits into from
Nov 22, 2023
Merged
4 changes: 4 additions & 0 deletions docker/elasticsearch-setup/create-indices.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ function create_datahub_usage_event_datastream() {
create_if_not_exists "_index_template/${PREFIX}datahub_usage_event_index_template" index_template.json
# 3. although indexing request creates the data stream, it's not queryable before creation, causing GMS to throw exceptions
create_if_not_exists "_data_stream/${PREFIX}datahub_usage_event" "datahub_usage_event"

# Create index template for update_events
create_if_not_exists "_index_template/${PREFIX}datahub_update_event_index_template" update_event_template.json
create_if_not_exists "_data_stream/${PREFIX}datahub_update_event" "datahub_update_event"
}

# create indices for ES OSS (AWS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public interface EntitySearchService {
*/
void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId);

/**
*
* @param document the document to update into update index
* @param docId the ID of the document
*/
void createUpdateDocument(@Nonnull String document, @Nonnull String docId);

/**
* Deletes the document with the given document ID from the index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document,
esWriteDAO.upsertDocument(entityName, document, docId);
}

@Override
public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) {
log.debug(String.format("Creating Update document document: %s, docId %s", document, docId));
esWriteDAO.createUpdateDocument(document, docId);
}

@Override
public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) {
log.debug(String.format("Deleting Search document entityName: %s, docId: %s", entityName, docId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -46,6 +48,23 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document,
bulkProcessor.add(updateRequest);
}

/**
* Creates a request to insert new document into datahub_update_event index
*
* @param document the document to insert
* @param docId the ID of the document
*/
public void createUpdateDocument(@Nonnull String document, @Nonnull String docId) {
final String indexName = indexConvention.getIndexName("datahub_update_event");
final IndexRequest indexRequest = new IndexRequest(
indexName)
.id(docId)
.source(document, XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);

bulkProcessor.add(indexRequest);
}

/**
* Deletes the document with the given document ID from the index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.DataSchema;
import com.linkedin.data.template.RecordTemplate;
Expand All @@ -14,6 +15,7 @@
import com.linkedin.metadata.models.annotation.SearchableAnnotation.FieldType;
import com.linkedin.metadata.models.extractor.FieldExtractor;

import com.linkedin.mxe.MetadataChangeLog;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -82,6 +84,23 @@ public Optional<String> transformAspect(
return Optional.of(searchDocument.toString());
}

public String transformEvent(MetadataChangeLog event) {
final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode();
searchDocument.put("urn", event.getEntityUrn().toString());
searchDocument.put("changeType", event.getChangeType().toString());
searchDocument.put("entityType", event.getEntityType().toString());
searchDocument.put("aspectName", event.getAspectName().toString());

AuditStamp eventCreationInfo = event.getCreated();
String actor = eventCreationInfo.getActor().toString();
Long time = eventCreationInfo.getTime();

searchDocument.put("actorUrn", actor);
searchDocument.put("timestamp", time);
searchDocument.put("@timestamp", time);
return searchDocument.toString();
}

public void setSearchableValue(final SearchableFieldSpec fieldSpec, final List<Object> fieldValues,
final ObjectNode searchDocument, final Boolean forDelete) {
DataSchema.Type valueType = fieldSpec.getPegasusSchema().getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -65,6 +69,16 @@ public static Optional<String> getDocId(@Nonnull Urn urn) {
}
}

public static String getDocHash(@Nonnull String aspectValue) {
try {
MessageDigest hashDigest = MessageDigest.getInstance("SHA-256");
String hashId = Base64.getEncoder().encodeToString(hashDigest.digest(aspectValue.getBytes(StandardCharsets.UTF_8)));
return hashId;
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}

/**
* Validates the request params and create a request map out of it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ private void handleUpdateChangeEvent(@Nonnull final MetadataChangeLog event) {
} else {
updateGraphService(urn, aspectSpec, aspect, event);
}

// Populate update index with update event
updateUpdateIndex(event);
}

/**
Expand Down Expand Up @@ -489,6 +492,20 @@ private void updateTimeseriesFields(String entityType, String aspectName, Urn ur
});
}

/**
* Process event and update Update index in Elastic
*/
private void updateUpdateIndex(@Nonnull final MetadataChangeLog event) {
String updateDocument = _searchDocumentTransformer.transformEvent(event);

// Generating a hash using event urn, event content and time, to be used as a unique id for ES documents
String stringToBeHashed = event.getEntityType().toString() + '_'
+ event.getAspect().getValue().asAvroString()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to create a unique id for the ES documents? are we not storing each events related to the dataset?
can we just use the _id generated by ES?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are using time and content as the hash, it is almost certain that it will result in a new doc. it is unlikely that there will be any update to the document

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to create a unique id for the ES documents? are we not storing each events related to the dataset? can we just use the _id generated by ES?

The ES client in java does not automatically assign a _id to the document, so I need to create a unique _id for each event if not there is an error in uploading the document to the index.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are using time and content as the hash, it is almost certain that it will result in a new doc. it is unlikely that there will be any update to the document

Yes, that is the idea here, where each update event is an individual document so we can track all updates over a time period.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/7.8.0/org/elasticsearch/action/index/IndexRequest.html#id()

what if you do not set the id for the document? will ES generate the id for you? Lets see if we can use autogenerated ID

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm auto generated id works. I have removed the relevant parts to make use of the auto generated ID

+ '_' + event.getCreated().getTime().toString();
String docHash = SearchUtils.getDocHash(stringToBeHashed);
_entitySearchService.createUpdateDocument(updateDocument, docHash);
}

private void updateSystemMetadata(SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
_systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"index_patterns": ["*PREFIXdatahub_update_event*"],
"data_stream": { },
"priority": 499,
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"type": {
"type": "keyword"
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you storing your document? _source?
_source is not indexed and it might be hard for you to do aggr and sorting in the future.
any reason not to store urn, event_type and other impt info as fields?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fields such as urn and other info are automatically indexed by ES when inserting the document into the index so I did not defined them here. Should I define the fields here so it is more transparent?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you saying that it is using dynamic-mapping? I think we should define the fields since the schema is known.
I am wondering if we should use static mapping for this index? With dynamic-mapping, there is no control over what is being sent to the ES for this index. Just need 1 person to send 1 erroneous document with many many fields, it will result in fields explosion in this index. And the mapping is not optimized, I believe each field will have 2 types "keyword" and "text" (waste ram and storage).

"timestamp": {
"type": "date"
}
}
},
"settings": {
"index.lifecycle.name": "PREFIXdatahub_usage_event_policy"
}
}
}
Loading