From 64d4172e55e26fb5993d5c8a69a3a2880461b9e1 Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Mon, 1 Jul 2024 18:14:30 -0400 Subject: [PATCH] feat(forms) Handle deleting forms references when hard deleting forms (#10820) --- .../entity/DeleteEntityServiceTest.java | 214 +++++++++++++- .../entity/DeleteEntityServiceFactory.java | 7 +- .../metadata/entity/DeleteEntityService.java | 265 +++++++++++++++++- 3 files changed, 483 insertions(+), 3 deletions(-) diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java index ca0c7322337ef..fe3608a2cf71d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java @@ -7,6 +7,10 @@ import com.datahub.util.RecordUtils; import com.google.common.collect.ImmutableList; import com.linkedin.common.AuditStamp; +import com.linkedin.common.FormAssociation; +import com.linkedin.common.FormAssociationArray; +import com.linkedin.common.FormVerificationAssociationArray; +import com.linkedin.common.Forms; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.container.Container; @@ -19,11 +23,17 @@ import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.RelatedEntitiesResult; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.run.DeleteReferencesResponse; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.ScrollResult; +import com.linkedin.metadata.search.SearchEntity; +import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.service.UpdateIndicesService; import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.SystemMetadataUtils; +import com.linkedin.mxe.MetadataChangeProposal; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; import java.sql.Timestamp; @@ -39,17 +49,20 @@ public class DeleteEntityServiceTest { protected GraphService _graphService = Mockito.mock(GraphService.class); protected DeleteEntityService _deleteEntityService; protected UpdateIndicesService _mockUpdateIndicesService; + protected EntitySearchService _mockSearchService; public DeleteEntityServiceTest() { opContext = TestOperationContexts.systemContextNoSearchAuthorization(); _aspectDao = mock(EbeanAspectDao.class); _mockUpdateIndicesService = mock(UpdateIndicesService.class); + _mockSearchService = mock(EntitySearchService.class); PreProcessHooks preProcessHooks = new PreProcessHooks(); preProcessHooks.setUiEnabled(true); _entityServiceImpl = new EntityServiceImpl(_aspectDao, mock(EventProducer.class), true, preProcessHooks, true); _entityServiceImpl.setUpdateIndicesService(_mockUpdateIndicesService); - _deleteEntityService = new DeleteEntityService(_entityServiceImpl, _graphService); + _deleteEntityService = + new DeleteEntityService(_entityServiceImpl, _graphService, _mockSearchService); } /** @@ -119,4 +132,203 @@ public void testDeleteUniqueRefGeneratesValidMCP() { assertEquals(1, (int) response.getTotal()); assertFalse(response.getRelatedAspects().isEmpty()); } + + /** This test checks whether updating search references works properly (for forms only for now) */ + @Test + public void testDeleteSearchReferences() { + EntityService mockEntityService = Mockito.mock(EntityService.class); + DeleteEntityService deleteEntityService = + new DeleteEntityService(mockEntityService, _graphService, _mockSearchService); + + final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); + final Urn form = UrnUtils.getUrn("urn:li:form:12345"); + + ScrollResult scrollResult = new ScrollResult(); + SearchEntityArray entities = new SearchEntityArray(); + SearchEntity searchEntity = new SearchEntity(); + searchEntity.setEntity(dataset); + entities.add(searchEntity); + scrollResult.setEntities(entities); + scrollResult.setNumEntities(1); + scrollResult.setScrollId("1"); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.any(), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq(null), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(scrollResult); + + ScrollResult scrollResult2 = new ScrollResult(); + scrollResult2.setNumEntities(0); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.any(), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq("1"), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(scrollResult2); + + Forms formsAspect = new Forms(); + FormAssociationArray incompleteForms = new FormAssociationArray(); + FormAssociation formAssociation = new FormAssociation(); + formAssociation.setUrn(form); + incompleteForms.add(formAssociation); + formsAspect.setIncompleteForms(incompleteForms); + formsAspect.setCompletedForms(new FormAssociationArray()); + formsAspect.setVerifications(new FormVerificationAssociationArray()); + Mockito.when( + mockEntityService.getLatestAspect( + Mockito.any(OperationContext.class), Mockito.eq(dataset), Mockito.eq("forms"))) + .thenReturn(formsAspect); + + // no entities with relationships on forms + final RelatedEntitiesResult mockRelatedEntities = + new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); + Mockito.when( + _graphService.findRelatedEntities( + null, + newFilter("urn", form.toString()), + null, + EMPTY_FILTER, + ImmutableList.of(), + newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), + 0, + 10000)) + .thenReturn(mockRelatedEntities); + + final DeleteReferencesResponse response = + deleteEntityService.deleteReferencesTo(opContext, form, false); + + // ensure we ingest one MCP for cleaning up forms reference + Mockito.verify(mockEntityService, Mockito.times(1)) + .ingestProposal( + any(), + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(true)); + assertEquals(1, (int) response.getTotal()); + assertTrue(response.getRelatedAspects().isEmpty()); + } + + /** This test ensures we aren't issuing MCPs if there are no search references */ + @Test + public void testDeleteNoSearchReferences() { + EntityService mockEntityService = Mockito.mock(EntityService.class); + DeleteEntityService deleteEntityService = + new DeleteEntityService(mockEntityService, _graphService, _mockSearchService); + + final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); + final Urn form = UrnUtils.getUrn("urn:li:form:12345"); + + ScrollResult scrollResult = new ScrollResult(); + scrollResult.setEntities(new SearchEntityArray()); + scrollResult.setNumEntities(0); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.any(), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq(null), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(scrollResult); + + // no entities with relationships on forms + final RelatedEntitiesResult mockRelatedEntities = + new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); + Mockito.when( + _graphService.findRelatedEntities( + null, + newFilter("urn", form.toString()), + null, + EMPTY_FILTER, + ImmutableList.of(), + newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), + 0, + 10000)) + .thenReturn(mockRelatedEntities); + + final DeleteReferencesResponse response = + deleteEntityService.deleteReferencesTo(opContext, form, false); + + // ensure we did not ingest anything if there are no references + Mockito.verify(mockEntityService, Mockito.times(0)) + .ingestProposal( + any(), + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(true)); + assertEquals(0, (int) response.getTotal()); + assertTrue(response.getRelatedAspects().isEmpty()); + } + + /** This test checks to make sure we don't issue MCPs if this is a dry-run */ + @Test + public void testDeleteSearchReferencesDryRun() { + EntityService mockEntityService = Mockito.mock(EntityService.class); + DeleteEntityService deleteEntityService = + new DeleteEntityService(mockEntityService, _graphService, _mockSearchService); + + final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); + final Urn form = UrnUtils.getUrn("urn:li:form:12345"); + + ScrollResult scrollResult = new ScrollResult(); + SearchEntityArray entities = new SearchEntityArray(); + SearchEntity searchEntity = new SearchEntity(); + searchEntity.setEntity(dataset); + entities.add(searchEntity); + scrollResult.setEntities(entities); + scrollResult.setNumEntities(1); + scrollResult.setScrollId("1"); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.any(), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq(null), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(scrollResult); + + // no entities with relationships on forms + final RelatedEntitiesResult mockRelatedEntities = + new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); + Mockito.when( + _graphService.findRelatedEntities( + null, + newFilter("urn", form.toString()), + null, + EMPTY_FILTER, + ImmutableList.of(), + newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), + 0, + 10000)) + .thenReturn(mockRelatedEntities); + + final DeleteReferencesResponse response = + deleteEntityService.deleteReferencesTo(opContext, form, false); + + // ensure we do not ingest anything since this is dry-run, but the total returns 1 + Mockito.verify(mockEntityService, Mockito.times(0)) + .ingestProposal( + any(), + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + Mockito.eq(true)); + assertEquals(1, (int) response.getTotal()); + assertTrue(response.getRelatedAspects().isEmpty()); + } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java index 6bc2d3c7be63f..11eb75009dc09 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java @@ -3,6 +3,7 @@ import com.linkedin.metadata.entity.DeleteEntityService; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.search.EntitySearchService; import javax.annotation.Nonnull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -21,9 +22,13 @@ public class DeleteEntityServiceFactory { @Qualifier("graphService") private GraphService _graphService; + @Autowired + @Qualifier("entitySearchService") + private EntitySearchService _entitySearchService; + @Bean(name = "deleteEntityService") @Nonnull protected DeleteEntityService createDeleteEntityService() { - return new DeleteEntityService(_entityService, _graphService); + return new DeleteEntityService(_entityService, _graphService, _entitySearchService); } } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java index ab8786aeca8d6..aed9b97411ff6 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java @@ -6,6 +6,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.linkedin.common.AuditStamp; +import com.linkedin.common.FormAssociation; +import com.linkedin.common.FormAssociationArray; +import com.linkedin.common.FormVerificationAssociation; +import com.linkedin.common.FormVerificationAssociationArray; +import com.linkedin.common.Forms; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.schema.PathSpec; @@ -22,14 +27,24 @@ import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.RelationshipFieldSpec; import com.linkedin.metadata.models.extractor.FieldExtractor; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.run.DeleteReferencesResponse; import com.linkedin.metadata.run.RelatedAspect; import com.linkedin.metadata.run.RelatedAspectArray; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.ScrollResult; +import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; import io.datahubproject.metadata.context.OperationContext; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +54,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -50,8 +66,10 @@ public class DeleteEntityService { private final EntityService _entityService; private final GraphService _graphService; + private final EntitySearchService _searchService; private static final Integer ELASTIC_BATCH_DELETE_SLEEP_SEC = 5; + private static final Integer BATCH_SIZE = 1000; /** * Public endpoint that deletes references to a given urn across DataHub's metadata graph. This is @@ -65,7 +83,14 @@ public class DeleteEntityService { */ public DeleteReferencesResponse deleteReferencesTo( @Nonnull OperationContext opContext, final Urn urn, final boolean dryRun) { + // TODO: update DeleteReferencesResponse to have searchAspects and provide more helpful comment + // in CLI final DeleteReferencesResponse result = new DeleteReferencesResponse(); + + // Delete references for entities referencing the deleted urn with searchables. + // Only works for Form deletion for now + int totalSearchAssetCount = deleteSearchReferences(opContext, urn, dryRun); + RelatedEntitiesResult relatedEntities = _graphService.findRelatedEntities( null, @@ -90,7 +115,7 @@ public DeleteReferencesResponse deleteReferencesTo( .collect(Collectors.toList()); result.setRelatedAspects(new RelatedAspectArray(relatedAspects)); - result.setTotal(relatedEntities.getTotal()); + result.setTotal(relatedEntities.getTotal() + totalSearchAssetCount); if (dryRun) { return result; @@ -489,6 +514,244 @@ private void handleError(final DeleteEntityServiceError error) { // NO-OP for now. } + private static class AssetScrollResult { + String scrollId; + List assets; + int totalAssetCount; + } + + /** + * Method that will find references through the search index and clean aspects up. Right now this + * only works for deleting Forms entities. Later, we need to extend this for all searchables. + */ + private int deleteSearchReferences( + @Nonnull OperationContext opContext, @Nonnull final Urn deletedUrn, final boolean dryRun) { + int totalAssetCount = 0; + String scrollId = null; + do { + AssetScrollResult assetScrollResult = + getAssetsReferencingUrn(opContext, deletedUrn, scrollId, dryRun); + List assetsReferencingUrn = assetScrollResult.assets; + totalAssetCount += assetScrollResult.totalAssetCount; + // if it's a dry run, exit early and stop looping over assets + scrollId = dryRun ? null : assetScrollResult.scrollId; + if (!dryRun) { + assetsReferencingUrn.forEach( + assetUrn -> { + List mcps = + deleteSearchReferencesForAsset(opContext, assetUrn, deletedUrn); + mcps.forEach( + mcp -> _entityService.ingestProposal(opContext, mcp, createAuditStamp(), true)); + }); + } + } while (scrollId != null); + return totalAssetCount; + } + + /** + * Get all of the asset urns that reference the deleted urn This is hardcoded for forms right now, + * we will use a more generic field to work with all searchables later. + * + *

TODO: Use the new referencedUrns index field once that exists + */ + private AssetScrollResult getAssetsReferencingUrn( + @Nonnull OperationContext opContext, + @Nonnull final Urn deletedUrn, + @Nullable String scrollId, + final boolean dryRun) { + AssetScrollResult result = new AssetScrollResult(); + result.scrollId = null; + result.totalAssetCount = 0; + result.assets = new ArrayList<>(); + + if (deletedUrn.getEntityType().equals("form")) { + // first, get all entities with this form assigned on it + final CriterionArray incompleteFormsArray = new CriterionArray(); + incompleteFormsArray.add( + new Criterion() + .setField("incompleteForms") + .setValue(deletedUrn.toString()) + .setCondition(Condition.EQUAL)); + final CriterionArray completedFormsArray = new CriterionArray(); + completedFormsArray.add( + new Criterion() + .setField("completedForms") + .setValue(deletedUrn.toString()) + .setCondition(Condition.EQUAL)); + // next, get all metadata tests created for this form + final CriterionArray metadataTestSourceArray = new CriterionArray(); + metadataTestSourceArray.add( + new Criterion() + .setField("sourceEntity") + .setValue(deletedUrn.toString()) + .setCondition(Condition.EQUAL)); + metadataTestSourceArray.add( + new Criterion().setField("sourceType").setValue("FORMS").setCondition(Condition.EQUAL)); + Filter filter = + new Filter() + .setOr( + new ConjunctiveCriterionArray( + new ConjunctiveCriterion().setAnd(incompleteFormsArray), + new ConjunctiveCriterion().setAnd(completedFormsArray), + new ConjunctiveCriterion().setAnd(metadataTestSourceArray))); + ScrollResult scrollResult = + _searchService.structuredScroll( + opContext, + ImmutableList.of( + "dataset", + "dataJob", + "dataFlow", + "chart", + "dashboard", + "corpuser", + "corpGroup", + "domain", + "container", + "glossaryTerm", + "glossaryNode", + "mlModel", + "mlModelGroup", + "mlFeatureTable", + "mlFeature", + "mlPrimaryKey", + "schemaField", + "dataProduct", + "test"), + "*", + filter, + null, + scrollId, + "5m", + dryRun ? 1 : BATCH_SIZE); // need to pass in 1 for count otherwise get index error + if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) { + return result; + } + result.scrollId = scrollResult.getScrollId(); + result.totalAssetCount = scrollResult.getNumEntities(); + result.assets = + scrollResult.getEntities().stream() + .map(SearchEntity::getEntity) + .collect(Collectors.toList()); + return result; + } + return result; + } + + /** Get all the aspects that need updating, then loop over and update them */ + private List deleteSearchReferencesForAsset( + @Nonnull OperationContext opContext, + @Nonnull final Urn assetUrn, + @Nonnull final Urn deletedUrn) { + // delete entities that should be deleted first + if (shouldDeleteAssetReferencingUrn(assetUrn, deletedUrn)) { + _entityService.deleteUrn(opContext, assetUrn); + } + + List mcps = new ArrayList<>(); + List aspectsToUpdate = getAspectsToUpdate(deletedUrn); + aspectsToUpdate.forEach( + aspectName -> { + try { + MetadataChangeProposal mcp = + updateAspectForSearchReference(opContext, assetUrn, deletedUrn, aspectName); + if (mcp != null) { + mcps.add(mcp); + } + } catch (Exception e) { + log.error( + String.format( + "Error trying to update aspect %s for asset %s when deleting %s", + aspectName, assetUrn, deletedUrn), + e); + } + }); + return mcps; + } + + /** + * Get the names of all the aspects that need to be updated when the deleted urn is removed. + * + *

TODO: extend this to support other types of deletes and be more dynamic depending on aspects + * that the asset has + */ + private List getAspectsToUpdate(@Nonnull final Urn deletedUrn) { + if (deletedUrn.getEntityType().equals("form")) { + return ImmutableList.of("forms"); + } + return new ArrayList<>(); + } + + /** + * Determine whether the asset referencing the deleted urn should be deleted itself. + * + *

TODO: extend this to support other types of deletes and be more dynamic depending on aspects + * that the asset has + */ + private boolean shouldDeleteAssetReferencingUrn( + @Nonnull final Urn assetUrn, @Nonnull final Urn deletedUrn) { + if (assetUrn.getEntityType().equals("test") && deletedUrn.getEntityType().equals("form")) { + return true; + } + return false; + } + + @Nullable + private MetadataChangeProposal updateAspectForSearchReference( + @Nonnull OperationContext opContext, + @Nonnull final Urn assetUrn, + @Nonnull final Urn deletedUrn, + @Nonnull final String aspectName) { + if (aspectName.equals("forms")) { + return updateFormsAspect(opContext, assetUrn, deletedUrn); + } + return null; + } + + @Nullable + private MetadataChangeProposal updateFormsAspect( + @Nonnull OperationContext opContext, + @Nonnull final Urn assetUrn, + @Nonnull final Urn deletedUrn) { + RecordTemplate record = _entityService.getLatestAspect(opContext, assetUrn, "forms"); + if (record == null) return null; + + Forms formsAspect = new Forms(record.data()); + final AtomicReference updatedAspect; + try { + updatedAspect = new AtomicReference<>(formsAspect.copy()); + } catch (Exception e) { + throw new RuntimeException("Failed to copy the forms aspect for updating", e); + } + + List incompleteForms = + formsAspect.getIncompleteForms().stream() + .filter(incompleteForm -> !incompleteForm.getUrn().equals(deletedUrn)) + .collect(Collectors.toList()); + List completedForms = + formsAspect.getCompletedForms().stream() + .filter(completedForm -> completedForm.getUrn() != deletedUrn) + .collect(Collectors.toList()); + final List verifications = + formsAspect.getVerifications().stream() + .filter(verification -> verification.getForm() != deletedUrn) + .collect(Collectors.toList()); + + updatedAspect.get().setIncompleteForms(new FormAssociationArray(incompleteForms)); + updatedAspect.get().setCompletedForms(new FormAssociationArray(completedForms)); + updatedAspect.get().setVerifications(new FormVerificationAssociationArray(verifications)); + + if (!formsAspect.equals(updatedAspect.get())) { + return AspectUtils.buildMetadataChangeProposal(assetUrn, "forms", updatedAspect.get()); + } + return null; + } + + private AuditStamp createAuditStamp() { + return new AuditStamp() + .setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)) + .setTime(System.currentTimeMillis()); + } + @AllArgsConstructor @Data private static class DeleteEntityServiceError {