Skip to content

Commit

Permalink
Fixes #4244: Make apoc.dv.* procedures work in clusters (#4281)
Browse files Browse the repository at this point in the history
* Fixes #4244: Make apoc.dv.* procedures work in clusters

* added procs to extended*.txt

* fix tests

* cleanup
  • Loading branch information
vga91 committed Dec 17, 2024
1 parent 8d3fbec commit 582d999
Show file tree
Hide file tree
Showing 16 changed files with 898 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it!
= apoc.dv.catalog.add
:description: This section contains reference documentation for the apoc.dv.catalog.add procedure.

label:procedure[] label:apoc-extended[]
label:procedure[] label:apoc-extended[] label:deprecated[]

[.emphasis]
Add a virtualized resource configuration
Expand All @@ -17,6 +17,8 @@ Add a virtualized resource configuration
apoc.dv.catalog.add(name :: STRING?, config = {} :: MAP?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
----

include::partial$/dv/deprecated.adoc[]

[WARNING]
====
This procedure is not intended to be used in a cluster environment, and may act unpredictably.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it!
= apoc.dv.catalog.list
:description: This section contains reference documentation for the apoc.dv.catalog.list procedure.

label:procedure[] label:apoc-extended[]
label:procedure[] label:apoc-extended[] label:deprecated[]

[.emphasis]
List all virtualized resource configuration
Expand All @@ -17,6 +17,8 @@ List all virtualized resource configuration
apoc.dv.catalog.list() :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
----

include::partial$/dv/deprecated.adoc[]

== Output parameters
[.procedures, opts=header]
|===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This file is generated by DocsTest, so don't change it!
= apoc.dv.catalog.remove
:description: This section contains reference documentation for the apoc.dv.catalog.remove procedure.

label:procedure[] label:apoc-extended[]
label:procedure[] label:apoc-extended[] label:deprecated[]

[.emphasis]
Remove a virtualized resource config by name
Expand All @@ -17,6 +17,8 @@ Remove a virtualized resource config by name
apoc.dv.catalog.remove(name :: STRING?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
----

include::partial$/dv/deprecated.adoc[]

[WARNING]
====
This procedure is not intended to be used in a cluster environment, and may act unpredictably.
Expand Down
17 changes: 10 additions & 7 deletions docs/asciidoc/modules/ROOT/pages/virtual-resource/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
= Virtual Resource
:description: This chapter describes how to handle external data sources as virtual resource without persisting them in the database

include::partial$systemdbonly.note.adoc[]

[NOTE]
====
There are situations where we would like to enrich/complement the results of a cypher query in a Neo4j graph with additional
Expand Down Expand Up @@ -40,10 +42,11 @@ image::apoc.dv.imported-graph-from-RDB.png[scaledwidth="100%"]
== Managing a Virtualized Resource via JDBC

=== Creating a Virtualized Resource (JDBC)
Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.add` procedure.
The procedure takes two parameters:
Before we can query a Virtualized Resource, we need to define it. We do this using the `apoc.dv.catalog.install` procedure.
The procedure takes three parameters:

* a name that uniquely identifies the virtualized resource and can be used to query that resource
* the database name where we want to use the resource (default is `'neo4j'`)
* a set of parameters indicating the type of the resource (type), the access point (url), the parameterised query
that will be run on the access point (query) and the labels that will be applied to the generated virtual nodes (labels).

Expand All @@ -56,7 +59,7 @@ Here is the cypher that creates such virtualized resource:

[source,cypher]
----
CALL apoc.dv.catalog.add("fr-towns-by-dept", {
CALL apoc.dv.catalog.install("fr-towns-by-dept", "neo4j", {
type: "JDBC",
url: "jdbc:postgresql://localhost/communes?user=jb&password=jb",
labels: ["Town","PopulatedPlace"],
Expand Down Expand Up @@ -124,19 +127,19 @@ RETURN path
----

=== Listing the Virtualized Resource Catalog
The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It takes no parameters.
The apoc.dv.catalog.list procedure returns a list with all the existing Virtualized resources and their descriptions. It accepts one parameter: i.e. the database name where we want to use the resource (default is 'neo4j').

[source,cypher]
----
CALL apoc.dv.catalog.list()
CALL apoc.dv.catalog.show()
----

=== Removing Virtualized Resources from the Catalog
When a Virtualized Resource is no longer needed it can be removed from the catalog by using the apoc.dv.catalog.remove procedure passing as parameter the unique name of the VR.

[source,cypher]
----
CALL apoc.dv.catalog.remove("vr-name")
CALL apoc.dv.catalog.drop("vr-name", <dbName>)
----

=== Export metadata
Expand Down Expand Up @@ -165,7 +168,7 @@ Here is the cypher that creates such virtualized resource:

[source,cypher]
----
CALL apoc.dv.catalog.add("prod-details-by-id", {
CALL apoc.dv.catalog.install("prod-details-by-id", "neo4j", {
type: "CSV",
url: "http://data.neo4j.com/northwind/products.csv",
labels: ["ProductDetails"],
Expand Down
19 changes: 19 additions & 0 deletions docs/asciidoc/modules/ROOT/partials/dv/deprecated.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[WARNING]
====
Please note that this procedure is deprecated.
Use the following ones instead, which allow for better support in a cluster:
[opts="header"]
|===
| deprecated procedure | new procedure
| `apoc.dv.catalog.add(<name>, $config)` | `apoc.dv.catalog.install('<name>', '<dbName>', $config)`
| `apoc.dv.catalog.remove('<name>')` | `apoc.dv.catalog.drop('<name>', '<dbName>')`
| `apoc.dv.catalog.list()` | `apoc.dv.catalog.show('<dbName>')`
|===
where `<dbName>` is the database where we want to execute the procedure
xref::virtual-resource/index.adoc[See here for more info].
====
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package apoc.dv;

import apoc.util.Neo4jContainerExtension;
import apoc.util.TestContainerUtil;
import apoc.util.TestcontainersCausalCluster;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.types.Node;
import org.neo4j.driver.types.Path;
import org.neo4j.driver.types.Relationship;

import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static apoc.dv.DataVirtualizationCatalogTestUtil.*;
import static apoc.util.ExtendedTestContainerUtil.dbIsWriter;
import static apoc.util.ExtendedTestContainerUtil.getBoltAddress;
import static apoc.util.ExtendedTestContainerUtil.getDriverIfNotReplica;
import static apoc.util.MapUtil.map;
import static apoc.util.SystemDbUtil.PROCEDURE_NOT_ROUTED_ERROR;
import static apoc.util.TestContainerUtil.importFolder;
import static apoc.util.TestContainerUtil.testCall;
import static apoc.util.TestContainerUtil.testCallEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.configuration.GraphDatabaseSettings.SYSTEM_DATABASE_NAME;


public class DataVirtualizationCatalogClusterRoutingTest {
private static final int NUM_CORES = 3;
private static TestcontainersCausalCluster cluster;
private static Session clusterSession;
private static List<Neo4jContainerExtension> members;

@BeforeClass
public static void setupCluster() throws Exception {
cluster = TestContainerUtil
.createEnterpriseCluster(List.of(TestContainerUtil.ApocPackage.EXTENDED, TestContainerUtil.ApocPackage.CORE), NUM_CORES, 0,
Collections.emptyMap(),
Map.of("NEO4J_dbms_routing_enabled", "true")
);
clusterSession = cluster.getSession();
members = cluster.getClusterMembers();
FileUtils.copyFileToDirectory(new File(new URI(FILE_URL).toURL().getPath()), importFolder);
assertEquals(NUM_CORES, members.size());
}

@AfterClass
public static void bringDownCluster() {
cluster.close();
}

@Test
public void testVirtualizeCSV() {
dvInSysLeaderMemberCommon(PROCEDURE_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME,
(session) -> testCall(session, APOC_DV_INSTALL_QUERY,
APOC_DV_INSTALL_PARAMS,
(row) -> assertCatalogContent(row, CSV_TEST_FILE)), APOC_DV_INSTALL_PARAMS
);

clusterSession.executeRead(tx -> {
final Result result = tx.run(APOC_DV_QUERY,
Map.of(NAME_KEY, CSV_NAME_VALUE,
APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS,
CONFIG_KEY, CONFIG_VALUE)
);

Node node = result.single().get(NODE_KEY).asNode();
assertEquals(NAME_VALUE, node.get(NAME_KEY).asString());
assertEquals(AGE_VALUE, node.get(AGE_KEY).asString());
assertEquals(List.of(LABELS_VALUE), node.labels());

return result.consume();
}
);

clusterSession.executeWrite(tx -> tx.run(CREATE_HOOK_QUERY, CREATE_HOOK_PARAMS).consume());

clusterSession.executeRead(tx -> {
final Result result = tx.run(APOC_DV_QUERY_AND_LINK_QUERY,
map(NAME_KEY, CSV_NAME_VALUE, APOC_DV_QUERY_PARAMS_KEY, APOC_DV_QUERY_PARAMS, RELTYPE_KEY, RELTYPE_VALUE, CONFIG_KEY, CONFIG_VALUE)
);

Path path = result.single().get("path").asPath();
Node node = path.end();
assertEquals(NAME_VALUE, node.get(NAME_KEY).asString());
assertEquals(AGE_VALUE, node.get(AGE_KEY).asString());
assertEquals(List.of(LABELS_VALUE), node.labels());

Node hook = path.start();
assertEquals(HOOK_NODE_NAME_VALUE, hook.get(NAME_KEY).asString());
assertEquals(List.of("Hook"), hook.labels());

Relationship relationship = path.relationships().iterator().next();
assertEquals(hook.elementId(), relationship.startNodeElementId());
assertEquals(node.elementId(), relationship.endNodeElementId());
assertEquals(RELTYPE_VALUE, relationship.type());

return result.consume();
}
);

dvInSysLeaderMemberCommon(PROCEDURE_NOT_ROUTED_ERROR, SYSTEM_DATABASE_NAME,
(session) -> testCallEmpty(session, APOC_DV_DROP_QUERY,
APOC_DV_DROP_PARAMS), APOC_DV_DROP_PARAMS
);

}

private static void dvInSysLeaderMemberCommon(String uuidNotRoutedError, String dbName, Consumer<Session> testDv, Map<String, Object> params) {
dvInSysLeaderMemberCommon(uuidNotRoutedError, dbName, testDv, false, params);
}

private static void dvInSysLeaderMemberCommon(String uuidNotRoutedError, String dbName, Consumer<Session> testDv, boolean readOnlyOperation, Map<String, Object> params) {
final List<Neo4jContainerExtension> members = cluster.getClusterMembers();
assertEquals(NUM_CORES, members.size());
boolean writeExecuted = false;
for (Neo4jContainerExtension container: members) {
// we skip READ_REPLICA members with write operations
// instead, we consider all members with a read only operations
final Driver driver = readOnlyOperation
? container.getDriver()
: getDriverIfNotReplica(container);
if (driver == null) {
continue;
}
Session session = driver.session(SessionConfig.forDatabase(dbName));
boolean isWriter = dbIsWriter(dbName, session, getBoltAddress(container));
if (isWriter) {
testDv.accept(session);
writeExecuted = true;
} else {
try {
testDv.accept(session);
fail("Should fail because of non leader Data Virtualization addition");
} catch (Exception e) {
String errorMsg = e.getMessage();
assertTrue("The actual message is: " + errorMsg, errorMsg.contains(uuidNotRoutedError));
}
}
}
assertTrue(writeExecuted);
}
}
Loading

0 comments on commit 582d999

Please sign in to comment.