Skip to content

Commit

Permalink
feat(lineage): mark nodes as explored (#10180)
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored Apr 2, 2024
1 parent 3671860 commit a89e189
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.metadata.search.LineageSearchEntity;
import com.linkedin.metadata.search.LineageSearchResult;
import com.linkedin.metadata.search.SearchResultMetadata;
import java.util.ArrayList;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -69,7 +70,8 @@ private SearchAcrossLineageResult mapResult(
.map(p -> mapPath(context, p))
.collect(Collectors.toList()))
.setDegree(searchEntity.getDegree())
.setDegrees(searchEntity.getDegrees().stream().collect(Collectors.toList()))
.setDegrees(new ArrayList<>(searchEntity.getDegrees()))
.setExplored(Boolean.TRUE.equals(searchEntity.isExplored()))
.build();
}

Expand Down
9 changes: 7 additions & 2 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ type ScrollResults {
}

"""
Results returned by issueing a search across relationships query
Results returned by issuing a search across relationships query
"""
type SearchAcrossLineageResults {
"""
Expand Down Expand Up @@ -679,7 +679,7 @@ type SearchAcrossLineageResults {
}

"""
Results returned by issueing a search across relationships query using scroll API
Results returned by issuing a search across relationships query using scroll API
"""
type ScrollAcrossLineageResults {
"""
Expand Down Expand Up @@ -742,6 +742,11 @@ type SearchAcrossLineageResult {
"""
degrees: [Int!]

"""
Marks whether or not this entity was explored further for lineage
"""
explored: Boolean!

}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,17 @@ private Stream<Urn> processOneHopLineage(
int i) {

// Do one hop on the lineage graph
int numHops = i + 1; // Zero indexed for loop counter, one indexed count
int remainingHops = maxHops - numHops;
List<LineageRelationship> oneHopRelationships =
getLineageRelationshipsInBatches(
currentLevel,
direction,
graphFilters,
visitedEntities,
viaEntities,
i + 1,
maxHops - (i + 1),
numHops,
remainingHops,
remainingTime,
existingPaths,
exploreMultiplePaths,
Expand Down Expand Up @@ -387,8 +389,9 @@ private Stream<Urn> processOneHopLineage(
|| platformMatches(
lineageRelationship.getEntity(),
ignoreAsHops.get(entityType)))))
.forEach(
lineageRelationship -> additionalCurrentLevel.add(lineageRelationship.getEntity()));
.map(LineageRelationship::getEntity)
.forEach(additionalCurrentLevel::add);
;
if (!additionalCurrentLevel.isEmpty()) {
Stream<Urn> ignoreAsHopUrns =
processOneHopLineage(
Expand Down Expand Up @@ -417,6 +420,14 @@ private Stream<Urn> processOneHopLineage(
.sorted(Comparator.comparing(Urn::toString))
.limit(lineageFlags.getEntitiesExploredPerHopLimit());
}
if (remainingHops > 0) {
// If there are hops remaining, we expect to explore everything getting passed back to the
// loop, barring a timeout
List<Urn> entitiesToExplore = intermediateStream.collect(Collectors.toList());
entitiesToExplore.forEach(urn -> result.get(urn).setExplored(true));
// reassign the stream after consuming it
intermediateStream = entitiesToExplore.stream();
}
}
return intermediateStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ private LineageSearchEntity buildLineageSearchEntity(
if (lineageRelationship.hasDegrees()) {
entity.setDegrees(lineageRelationship.getDegrees());
}
entity.setExplored(Boolean.TRUE.equals(lineageRelationship.isExplored()));
}
return entity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,29 @@ public void testTimestampLineage() throws Exception {
Assert.assertEquals(Integer.valueOf(2), downstreamResult.getTotal());
}

@Test
public void testExplored() throws Exception {

List<Edge> edges =
Arrays.asList(
// One upstream edge
new Edge(dataset2Urn, dataset1Urn, downstreamOf, null, null, null, null, null),
// Two downstream
new Edge(dataset3Urn, dataset2Urn, downstreamOf, null, null, null, null, null),
new Edge(dataset4Urn, dataset2Urn, downstreamOf, null, null, null, null, null),
// One with null values, should always be returned
new Edge(dataset5Urn, dataset2Urn, downstreamOf, null, null, null, null, null));

edges.forEach(getGraphService()::addEdge);
syncAfterWrite();

EntityLineageResult result = getUpstreamLineage(dataset2Urn, null, null, 10);
Assert.assertTrue(Boolean.TRUE.equals(result.getRelationships().get(0).isExplored()));

EntityLineageResult result2 = getUpstreamLineage(dataset2Urn, null, null, 10, 0);
Assert.assertTrue(result2.getRelationships().get(0).isExplored() == null);
}

/**
* Utility method to reduce repeated parameters for lineage tests
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ record LineageRelationship {
* Replaces the deprecated field "degree".
**/
degrees: optional array[int]

/**
* Marks this relationship as explored during the graph walk
*/
explored: optional boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ record LineageSearchEntity includes SearchEntity {
* The degrees of separation (number of hops) between the source and this entity
*/
degrees: array[int] = []

/**
* Marks an entity as having been explored for as a part of the graph walk
*/
explored: optional boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -6205,6 +6205,11 @@
},
"doc" : "The degrees of separation (number of hops) between the source and this entity ",
"default" : [ ]
}, {
"name" : "explored",
"type" : "boolean",
"doc" : "Marks an entity as having been explored for as a part of the graph walk",
"optional" : true
} ]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@
},
"doc" : "The different depths at which this entity is discovered in the lineage graph.\nMarked as optional to maintain backward compatibility, but is filled out by implementations. \nReplaces the deprecated field \"degree\".\n",
"optional" : true
}, {
"name" : "explored",
"type" : "boolean",
"doc" : "Marks this relationship as explored during the graph walk",
"optional" : true
} ]
}
},
Expand Down

0 comments on commit a89e189

Please sign in to comment.