diff --git a/extended/src/main/java/apoc/export/arrow/ExportArrow.java b/extended/src/main/java/apoc/export/arrow/ExportArrow.java new file mode 100644 index 0000000000..cb923e44c5 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportArrow.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.Pools; +import apoc.export.util.NodesAndRelsSubGraph; +import apoc.result.ByteArrayResult; +import apoc.result.ExportProgressInfo; +import apoc.result.VirtualGraph; +import org.neo4j.cypher.export.DatabaseSubGraph; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.Result; +import org.neo4j.graphdb.Transaction; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.logging.Log; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.NotThreadSafe; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Stream; + +public class ExportArrow { + + @Context + public Transaction tx; + + @Context + public GraphDatabaseService db; + + @Context + public Pools pools; + + @Context + public Log logger; + + @Context + public TerminationGuard terminationGuard; + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.stream.all", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the full database as an arrow byte array.") + public Stream all( + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + return new ExportArrowService(db, pools, terminationGuard, logger) + .stream(new DatabaseSubGraph(tx), new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure( + name = "apoc.export.arrow.stream.graph", + deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the given graph as an arrow byte array.") + public Stream graph( + @Name(value = "graph", description = "The graph to export.") Object graph, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + final SubGraph subGraph; + if (graph instanceof Map) { + Map mGraph = (Map) graph; + if (!mGraph.containsKey("nodes")) { + throw new IllegalArgumentException( + "Graph Map must contains `nodes` field and `relationships` optionally"); + } + subGraph = new NodesAndRelsSubGraph( + tx, (Collection) mGraph.get("nodes"), (Collection) mGraph.get("relationships")); + } else if (graph instanceof VirtualGraph) { + VirtualGraph vGraph = (VirtualGraph) graph; + subGraph = new NodesAndRelsSubGraph(tx, vGraph.nodes(), vGraph.relationships()); + } else { + throw new IllegalArgumentException("Supported inputs are VirtualGraph, Map"); + } + return new ExportArrowService(db, pools, terminationGuard, logger).stream(subGraph, new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure( + name = "apoc.export.arrow.stream.query", + deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the given Cypher query as an arrow byte array.") + public Stream query( + @Name(value = "query", description = "The query used to collect the data for export.") String query, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + Map params = config == null + ? Collections.emptyMap() + : (Map) config.getOrDefault("params", Collections.emptyMap()); + Result result = tx.execute(query, params); + return new ExportArrowService(db, pools, terminationGuard, logger).stream(result, new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.all", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the full database as an arrow file.") + public Stream all( + @Name(value = "file", description = "The name of the file to export the data to.") String fileName, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + return new ExportArrowService(db, pools, terminationGuard, logger) + .file(fileName, new DatabaseSubGraph(tx), new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.graph", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the given graph as an arrow file.") + public Stream graph( + @Name(value = "file", description = "The name of the file to export the data to.") String fileName, + @Name(value = "graph", description = "The graph to export.") Object graph, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + final SubGraph subGraph; + if (graph instanceof Map) { + Map mGraph = (Map) graph; + if (!mGraph.containsKey("nodes")) { + throw new IllegalArgumentException( + "Graph Map must contains `nodes` field and `relationships` optionally"); + } + subGraph = new NodesAndRelsSubGraph( + tx, (Collection) mGraph.get("nodes"), (Collection) mGraph.get("relationships")); + } else if (graph instanceof VirtualGraph) { + VirtualGraph vGraph = (VirtualGraph) graph; + subGraph = new NodesAndRelsSubGraph(tx, vGraph.nodes(), vGraph.relationships()); + } else { + throw new IllegalArgumentException("Supported inputs are VirtualGraph, Map"); + } + return new ExportArrowService(db, pools, terminationGuard, logger) + .file(fileName, subGraph, new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.query", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the results from the given Cypher query as an arrow file.") + public Stream query( + @Name(value = "file", description = "The name of the file to which the data will be exported.") + String fileName, + @Name(value = "query", description = "The query to use to collect the data for export.") String query, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + Map params = config == null + ? Collections.emptyMap() + : (Map) config.getOrDefault("params", Collections.emptyMap()); + Result result = tx.execute(query, params); + return new ExportArrowService(db, pools, terminationGuard, logger) + .file(fileName, result, new ArrowConfig(config)); + } +} diff --git a/extended/src/main/java/apoc/load/LoadArrow.java b/extended/src/main/java/apoc/load/LoadArrow.java new file mode 100644 index 0000000000..68bc238935 --- /dev/null +++ b/extended/src/main/java/apoc/load/LoadArrow.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.load; + +import apoc.result.LoadDataMapResult; +import apoc.util.FileUtils; +import apoc.util.JsonUtil; +import apoc.util.Util; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.util.Text; +import org.neo4j.graphdb.security.URLAccessChecker; +import org.neo4j.graphdb.security.URLAccessValidationError; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.values.storable.Values; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.channels.SeekableByteChannel; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class LoadArrow { + + @Context + public URLAccessChecker urlAccessChecker; + + private static class ArrowSpliterator extends Spliterators.AbstractSpliterator { + + private final ArrowReader reader; + private final VectorSchemaRoot schemaRoot; + private final AtomicInteger counter; + + public ArrowSpliterator(ArrowReader reader, VectorSchemaRoot schemaRoot) throws IOException { + super(Long.MAX_VALUE, Spliterator.ORDERED); + this.reader = reader; + this.schemaRoot = schemaRoot; + this.counter = new AtomicInteger(); + this.reader.loadNextBatch(); + } + + @Override + public synchronized boolean tryAdvance(Consumer action) { + try { + if (counter.get() >= schemaRoot.getRowCount()) { + if (reader.loadNextBatch()) { + counter.set(0); + } else { + return false; + } + } + final Map row = schemaRoot.getFieldVectors().stream() + .collect( + HashMap::new, + (map, fieldVector) -> map.put(fieldVector.getName(), read(fieldVector, counter.get())), + HashMap::putAll); // please look at https://bugs.openjdk.java.net/browse/JDK-8148463 + counter.incrementAndGet(); + action.accept(new LoadDataMapResult(row)); + return true; + } catch (Exception e) { + return false; + } + } + } + + @Procedure(name = "apoc.load.arrow.stream", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Imports `NODE` and `RELATIONSHIP` values from the provided arrow byte array.") + public Stream stream( + @Name(value = "source", description = "The data to load.") byte[] source, + @Name(value = "config", defaultValue = "{}", description = "This value is never used.") + Map config) + throws IOException { + RootAllocator allocator = new RootAllocator(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(source); + ArrowStreamReader streamReader = new ArrowStreamReader(inputStream, allocator); + VectorSchemaRoot schemaRoot = streamReader.getVectorSchemaRoot(); + return StreamSupport.stream(new ArrowSpliterator(streamReader, schemaRoot), false) + .onClose(() -> { + Util.close(allocator); + Util.close(streamReader); + Util.close(schemaRoot); + Util.close(inputStream); + }); + } + + @Procedure(name = "apoc.load.arrow", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Imports `NODE` and `RELATIONSHIP` values from the provided arrow file.") + public Stream file( + @Name(value = "file", description = "The name of the file to import data from.") String fileName, + @Name(value = "config", defaultValue = "{}", description = "This value is never used.") + Map config) + throws IOException, URISyntaxException, URLAccessValidationError { + final SeekableByteChannel channel = FileUtils.inputStreamFor(fileName, null, null, null, urlAccessChecker) + .asChannel(); + RootAllocator allocator = new RootAllocator(); + ArrowFileReader streamReader = new ArrowFileReader(channel, allocator); + VectorSchemaRoot schemaRoot = streamReader.getVectorSchemaRoot(); + return StreamSupport.stream(new ArrowSpliterator(streamReader, schemaRoot), false) + .onClose(() -> { + Util.close(allocator); + Util.close(streamReader); + Util.close(schemaRoot); + Util.close(channel); + }); + } + + private static Object read(FieldVector fieldVector, int index) { + if (fieldVector.isNull(index)) { + return null; + } else if (fieldVector instanceof DateMilliVector) { + DateMilliVector fe = (DateMilliVector) fieldVector; + return Instant.ofEpochMilli(fe.get(index)).atOffset(ZoneOffset.UTC); + } else if (fieldVector instanceof BitVector) { + BitVector fe = (BitVector) fieldVector; + return fe.get(index) == 1; + } else { + Object object = fieldVector.getObject(index); + return getObject(object); + } + } + + private static Object getObject(Object object) { + if (object instanceof Collection) { + return ((Collection) object).stream().map(LoadArrow::getObject).collect(Collectors.toList()); + } + if (object instanceof Map) { + return ((Map) object) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getObject(e.getValue()))); + } + if (object instanceof Text) { + return object.toString(); + } + try { + // we test if is a valid Neo4j type + return Values.of(object); + } catch (Exception e) { + // otherwise we try coerce it + return valueToString(object); + } + } + + private static String valueToString(Object value) { + return JsonUtil.writeValueAsString(value); + } +} diff --git a/extended/src/main/java/apoc/load/LoadJson.java b/extended/src/main/java/apoc/load/LoadJson.java new file mode 100644 index 0000000000..6c20e62937 --- /dev/null +++ b/extended/src/main/java/apoc/load/LoadJson.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.load; + +import apoc.result.LoadDataMapResult; +import apoc.util.CompressionAlgo; +import org.neo4j.graphdb.security.URLAccessChecker; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static apoc.load.LoadJsonUtils.loadJsonStream; +import static apoc.util.CompressionConfig.COMPRESSION; + +public class LoadJson { + + @Context + public TerminationGuard terminationGuard; + + @Context + public URLAccessChecker urlAccessChecker; + + @SuppressWarnings("unchecked") + @Procedure(name = "apoc.load.jsonParams", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description( + "Loads parameters from a JSON URL (e.g. web-API) as a stream of values if the given JSON file is a `LIST`.\n" + + "If the given JSON file is a `MAP`, this procedure imports a single value instead.") + public Stream jsonParams( + @Name( + value = "urlOrKeyOrBinary", + description = "The name of the file or binary data to import the data from.") + Object urlOrKeyOrBinary, + @Name(value = "headers", description = "Headers to be used when connecting to the given URL.") + Map headers, + @Name(value = "payload", description = "The payload to send when connecting to the given URL.") + String payload, + @Name( + value = "path", + defaultValue = "", + description = "A JSON path expression used to extract a certain part from the list.") + String path, + @Name( + value = "config", + defaultValue = "{}", + description = + """ + { + failOnError = true :: BOOLEAN, + pathOptions :: LIST, + compression = ""NONE"" :: [""NONE"", ""BYTES"", ""GZIP"", ""BZIP2"", ""DEFLATE"", ""BLOCK_LZ4"", ""FRAMED_SNAPPYā€¯] + } + """) + Map config) { + if (config == null) config = Collections.emptyMap(); + boolean failOnError = (boolean) config.getOrDefault("failOnError", true); + String compressionAlgo = (String) config.getOrDefault(COMPRESSION, CompressionAlgo.NONE.name()); + List pathOptions = (List) config.get("pathOptions"); + return loadJsonStream( + urlOrKeyOrBinary, + headers, + payload, + path, + failOnError, + compressionAlgo, + pathOptions, + terminationGuard, + urlAccessChecker); + } +} diff --git a/extended/src/main/java/apoc/log/Neo4jLogStream.java b/extended/src/main/java/apoc/log/Neo4jLogStream.java new file mode 100644 index 0000000000..10ef47304b --- /dev/null +++ b/extended/src/main/java/apoc/log/Neo4jLogStream.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.log; + +import apoc.util.FileUtils; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.procedure.Admin; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +/** + * @author moxious + * @since 27.02.19 + */ +public class Neo4jLogStream { + + public static class FileEntry implements Comparable { + @Description("The line number.") + public final long lineNo; + + @Description("The content of the line.") + public final String line; + + @Description("The path to the log file.") + public final String path; + + public FileEntry(long lineNumber, String data, String path) { + this.lineNo = lineNumber; + this.line = data; + this.path = path; + } + + public int compareTo(FileEntry o) { + return Long.compare(this.lineNo, o.lineNo); + } + } + + @Admin + @Procedure( + name = "apoc.log.stream", + mode = Mode.DBMS, + deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Returns the file contents from the given log, optionally returning only the last n lines.\n" + + "This procedure requires users to have an admin role.") + public Stream stream( + @Name(value = "path", description = "The name of the log file to read.") String logName, + @Name(value = "config", defaultValue = "{}", description = "{ last :: INTEGER }") + Map config) { + + File logDir = FileUtils.getLogDirectory(); + + if (logDir == null) { + throw new RuntimeException("Neo4j configured server.directories.logs points to a directory that " + + "does not exist or is not readable. Please ensure this configuration is correct."); + } + + // Prepend neo4jHome if it's a relative path, and use the user's path otherwise. + File f = new File(logDir, logName); + + try { + if (!f.getCanonicalFile().toPath().startsWith(logDir.getAbsolutePath())) { + throw new RuntimeException("The path you are trying to access has a canonical path outside of the logs " + + "directory, and this procedure is only permitted to access files in the log directory. This may " + + "occur if the path in question is a symlink or other link."); + } + } catch (IOException ioe) { + throw new RuntimeException("Unable to resolve basic log file canonical path", ioe); + } + + try { + Stream stream = Files.lines(Paths.get(f.toURI())); + final AtomicLong lineNumber = new AtomicLong(0); + final String p = f.getCanonicalPath(); + + Stream entries = stream.map(line -> new FileEntry(lineNumber.getAndIncrement(), line, p)); + + // Useful for tailing logfiles. + if (config.containsKey("last")) { + return entries.sorted(Collections.reverseOrder()) + .limit(Double.valueOf(config.get("last").toString()).longValue()); + } + + return entries; + } catch (NoSuchFileException nsf) { + // This special case we want to throw a custom message and not let this error propagate, because the + // trace exposes the full path we were checking. + throw new RuntimeException("No log file exists by that name"); + } catch (IOException exc) { + throw new RuntimeException(exc); + } + } +} diff --git a/extended/src/test/java/apoc/export/arrow/ArrowTest.java b/extended/src/test/java/apoc/export/arrow/ArrowTest.java new file mode 100644 index 0000000000..5ff4355475 --- /dev/null +++ b/extended/src/test/java/apoc/export/arrow/ArrowTest.java @@ -0,0 +1,402 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.graph.Graphs; +import apoc.load.LoadArrow; +import apoc.meta.Meta; +import apoc.util.JsonUtil; +import apoc.util.TestUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseInternalSettings; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.Result; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +import java.io.File; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static org.junit.Assert.assertEquals; + +public class ArrowTest { + + private static File directory = new File("target/arrow import"); + + static { //noinspection ResultOfMethodCallIgnored + directory.mkdirs(); + } + + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting( + GraphDatabaseSettings.load_csv_file_url_root, + directory.toPath().toAbsolutePath()) + .withSetting(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true); + + public static final List> EXPECTED = List.of( + new HashMap<>() { + { + put("name", "Adam"); + put("bffSince", null); + put("", null); + put("", 0L); + put("age", 42L); + put("labels", List.of("User")); + put("male", true); + put("", null); + put("kids", List.of("Sam", "Anna", "Grace")); + put( + "place", + Map.of("crs", "wgs-84-3d", "longitude", 33.46789D, "latitude", 13.1D, "height", 100.0D)); + put("", null); + put("since", null); + put( + "born", + LocalDateTime.parse("2015-05-18T19:32:24.000") + .atOffset(ZoneOffset.UTC) + .toZonedDateTime()); + } + }, + new HashMap<>() { + { + put("name", "Jim"); + put("bffSince", null); + put("", null); + put("", 1L); + put("age", 42L); + put("labels", List.of("User")); + put("male", null); + put("", null); + put("kids", null); + put("place", null); + put("", null); + put("since", null); + put("born", null); + } + }, + new HashMap<>() { + { + put("name", null); + put("bffSince", "P5M1DT12H"); + put("", 0L); + put("", 0L); + put("age", null); + put("labels", null); + put("male", null); + put("", "KNOWS"); + put("kids", null); + put("place", null); + put("", 1L); + put("since", 1993L); + put("born", null); + } + }); + + @BeforeClass + public static void beforeClass() { + db.executeTransactionally( + "CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})"); + TestUtil.registerProcedure(db, ExportArrow.class, LoadArrow.class, Graphs.class, Meta.class); + } + + @AfterClass + public static void teardown() { + db.shutdown(); + } + + @Before + public void before() { + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true); + } + + private byte[] extractByteArray(Result result) { + return result.columnAs("byteArray").next(); + } + + private String extractFileName(Result result) { + return result.columnAs("file").next(); + } + + private T readValue(String json, Class clazz) { + if (json == null) return null; + try { + return JsonUtil.OBJECT_MAPPER.readValue(json, clazz); + } catch (JsonProcessingException e) { + return null; + } + } + + @Test + public void testStreamRoundtripArrowQuery() { + // given - when + final String returnQuery = "RETURN 1 AS intData," + "'a' AS stringData," + + "true AS boolData," + + "[1, 2, 3] AS intArray," + + "[1.1, 2.2, 3.3] AS doubleArray," + + "[true, false, true] AS boolArray," + + "[1, '2', true, null] AS mixedArray," + + "{foo: 'bar'} AS mapData," + + "localdatetime('2015-05-18T19:32:24') as dateData," + + "[[0]] AS arrayArray," + + "1.1 AS doubleData"; + final byte[] byteArray = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.query($query) YIELD value AS byteArray", + Map.of("query", returnQuery), + this::extractByteArray); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> { + final Map row = (Map) result.next().get("value"); + assertEquals(1L, row.get("intData")); + assertEquals("a", row.get("stringData")); + assertEquals(Arrays.asList(1L, 2L, 3L), row.get("intArray")); + assertEquals(Arrays.asList(1.1D, 2.2D, 3.3), row.get("doubleArray")); + assertEquals(Arrays.asList(true, false, true), row.get("boolArray")); + assertEquals(Arrays.asList("1", "2", "true", null), row.get("mixedArray")); + assertEquals("{\"foo\":\"bar\"}", row.get("mapData")); + assertEquals( + LocalDateTime.parse("2015-05-18T19:32:24.000") + .atOffset(ZoneOffset.UTC) + .toZonedDateTime(), + row.get("dateData")); + assertEquals(Arrays.asList("[0]"), row.get("arrayArray")); + assertEquals(1.1D, row.get("doubleData")); + return true; + }); + } + + @Test + public void testFileRoundtripArrowQuery() { + // given - when + final String returnQuery = "RETURN 1 AS intData," + "'a' AS stringData," + + "true AS boolData," + + "[1, 2, 3] AS intArray," + + "[1.1, 2.2, 3.3] AS doubleArray," + + "[true, false, true] AS boolArray," + + "[1, '2', true, null] AS mixedArray," + + "{foo: 'bar'} AS mapData," + + "localdatetime('2015-05-18T19:32:24') as dateData," + + "[[0]] AS arrayArray," + + "1.1 AS doubleData"; + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.query('query_test.arrow', $query) YIELD file", + Map.of("query", returnQuery), + this::extractFileName); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final Map row = (Map) result.next().get("value"); + assertEquals(1L, row.get("intData")); + assertEquals("a", row.get("stringData")); + assertEquals(Arrays.asList(1L, 2L, 3L), row.get("intArray")); + assertEquals(Arrays.asList(1.1D, 2.2D, 3.3), row.get("doubleArray")); + assertEquals(Arrays.asList(true, false, true), row.get("boolArray")); + assertEquals(Arrays.asList("1", "2", "true", null), row.get("mixedArray")); + assertEquals("{\"foo\":\"bar\"}", row.get("mapData")); + assertEquals( + LocalDateTime.parse("2015-05-18T19:32:24.000") + .atOffset(ZoneOffset.UTC) + .toZonedDateTime(), + row.get("dateData")); + assertEquals(Arrays.asList("[0]"), row.get("arrayArray")); + assertEquals(1.1D, row.get("doubleData")); + return true; + }); + } + + @Test + public void testStreamRoundtripArrowGraph() { + // given - when + final byte[] byteArray = db.executeTransactionally( + "CYPHER 25 CALL apoc.graph.fromDB('neo4j',{}) yield graph " + + "CALL apoc.export.arrow.stream.graph(graph) YIELD value AS byteArray " + + "RETURN byteArray", + Map.of(), + this::extractByteArray); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + private List> getActual(Result result) { + return result.stream() + .map(m -> (Map) m.get("value")) + .map(m -> { + final Map newMap = new HashMap(m); + newMap.put("place", readValue((String) m.get("place"), Map.class)); + return newMap; + }) + .collect(Collectors.toList()); + } + + @Test + public void testFileRoundtripArrowGraph() { + // given - when + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.graph.fromDB('neo4j',{}) yield graph " + + "CALL apoc.export.arrow.graph('graph_test.arrow', graph) YIELD file " + + "RETURN file", + Map.of(), + this::extractFileName); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + @Test + public void testStreamRoundtripArrowAll() { + testStreamRoundtripAllCommon(); + } + + @Test + public void testStreamRoundtripArrowAllWithImportExportConfsDisabled() { + // disable both export and import configs + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, false); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, false); + + // should work regardless of the previous config + testStreamRoundtripAllCommon(); + } + + private void testStreamRoundtripAllCommon() { + // given - when + final byte[] byteArray = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.all() YIELD value AS byteArray ", Map.of(), this::extractByteArray); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + @Test + public void testFileRoundtripArrowAll() { + // given - when + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.all('all_test.arrow') YIELD file", Map.of(), this::extractFileName); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + @Test + public void testStreamVolumeArrowAll() { + // given - when + db.executeTransactionally("UNWIND range(0, 10000 - 1) AS id CREATE (n:ArrowNode{id:id})"); + + final List list = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.query('MATCH (n:ArrowNode) RETURN n.id AS id') YIELD value AS byteArray ", + Map.of(), + result -> result.columnAs("byteArray").stream().collect(Collectors.toList())); + + final List expected = LongStream.range(0, 10000).mapToObj(l -> l).collect(Collectors.toList()); + + // then + final String query = "CYPHER 25 UNWIND $list AS byteArray " + "CALL apoc.load.arrow.stream(byteArray) YIELD value " + + "RETURN value.id AS id"; + db.executeTransactionally(query, Map.of("list", list), result -> { + final List actual = + result.stream().map(m -> (Long) m.get("id")).sorted().collect(Collectors.toList()); + assertEquals(expected, actual); + return null; + }); + + db.executeTransactionally("MATCH (n:ArrowNode) DELETE n"); + } + + @Test + public void testFileVolumeArrowAll() { + // given - when + db.executeTransactionally("UNWIND range(0, 10000 - 1) AS id CREATE (:ArrowNode{id:id})"); + + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.query('volume_test.arrow', 'MATCH (n:ArrowNode) RETURN n.id AS id') YIELD file ", + Map.of(), + this::extractFileName); + + final List expected = LongStream.range(0, 10000).mapToObj(l -> l).collect(Collectors.toList()); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value.id AS id"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final List actual = + result.stream().map(m -> (Long) m.get("id")).sorted().collect(Collectors.toList()); + assertEquals(expected, actual); + return null; + }); + + db.executeTransactionally("MATCH (n:ArrowNode) DELETE n"); + } + + @Test + public void testValidNonStorableQuery() { + final List list = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.query($query) YIELD value AS byteArray ", + Map.of("query", "RETURN [1, true, 2.3, null, { name: 'Dave' }] AS array"), + result -> result.columnAs("byteArray").stream().collect(Collectors.toList())); + + final List expected = Arrays.asList("1", "true", "2.3", null, "{\"name\":\"Dave\"}"); + + // then + final String query = "CYPHER 25 UNWIND $list AS byteArray " + "CALL apoc.load.arrow.stream(byteArray) YIELD value " + + "RETURN value.array AS array"; + db.executeTransactionally(query, Map.of("list", list), result -> { + List actual = result.>columnAs("array").next(); + assertEquals(expected, actual); + return null; + }); + } +} diff --git a/extended/src/test/java/apoc/log/Neo4jLogStreamTest.java b/extended/src/test/java/apoc/log/Neo4jLogStreamTest.java new file mode 100644 index 0000000000..f0fb5ac268 --- /dev/null +++ b/extended/src/test/java/apoc/log/Neo4jLogStreamTest.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.log; + +import apoc.util.TestUtil; +import apoc.util.collection.Iterators; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseInternalSettings; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.dbms.api.DatabaseManagementService; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.test.TestDatabaseManagementServiceBuilder; + +import java.nio.file.Paths; +import java.util.UUID; +import java.util.stream.Collectors; + +import static apoc.ApocConfig.apocConfig; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertTrue; + +public class Neo4jLogStreamTest { + + private GraphDatabaseService db; + private DatabaseManagementService dbManagementService; + + @Before + public void setUp() { + dbManagementService = new TestDatabaseManagementServiceBuilder( + Paths.get("target", UUID.randomUUID().toString()).toAbsolutePath()) + .setConfig(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true) + .build(); + apocConfig().setProperty("server.directories.logs", ""); + db = dbManagementService.database(GraphDatabaseSettings.DEFAULT_DATABASE_NAME); + TestUtil.registerProcedure(db, Neo4jLogStream.class); + } + + @After + public void teardown() { + dbManagementService.shutdown(); + } + + @Test + public void testLogStream() { + testResult(db, "CYPHER 25 CALL apoc.log.stream('debug.log')", res -> { + final String wholeFile = + Iterators.stream(res.columnAs("line")).collect(Collectors.joining("")); + assertTrue(wholeFile.contains("apoc.import.file.enabled=false")); + }); + } +}