Skip to content

Commit

Permalink
[lYfZxdRz] Migrate procedures and functions from Core to Cypher 25
Browse files Browse the repository at this point in the history
  • Loading branch information
gem-neo4j committed Dec 10, 2024
1 parent b24d774 commit eb31c8d
Show file tree
Hide file tree
Showing 6 changed files with 1,061 additions and 0 deletions.
184 changes: 184 additions & 0 deletions extended/src/main/java/apoc/export/arrow/ExportArrow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package apoc.export.arrow;

import apoc.Pools;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.result.ByteArrayResult;
import apoc.result.ExportProgressInfo;
import apoc.result.VirtualGraph;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.api.QueryLanguage;
import org.neo4j.kernel.api.procedure.QueryLanguageScope;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.NotThreadSafe;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.TerminationGuard;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;

public class ExportArrow {

@Context
public Transaction tx;

@Context
public GraphDatabaseService db;

@Context
public Pools pools;

@Context
public Log logger;

@Context
public TerminationGuard terminationGuard;

@NotThreadSafe
@Procedure(name = "apoc.export.arrow.stream.all", deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Exports the full database as an arrow byte array.")
public Stream<ByteArrayResult> all(
@Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }")
Map<String, Object> config) {
return new ExportArrowService(db, pools, terminationGuard, logger)
.stream(new DatabaseSubGraph(tx), new ArrowConfig(config));
}

@NotThreadSafe
@Procedure(
name = "apoc.export.arrow.stream.graph",
deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Exports the given graph as an arrow byte array.")
public Stream<ByteArrayResult> graph(
@Name(value = "graph", description = "The graph to export.") Object graph,
@Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }")
Map<String, Object> config) {
final SubGraph subGraph;
if (graph instanceof Map) {
Map<String, Object> mGraph = (Map<String, Object>) graph;
if (!mGraph.containsKey("nodes")) {
throw new IllegalArgumentException(
"Graph Map must contains `nodes` field and `relationships` optionally");
}
subGraph = new NodesAndRelsSubGraph(
tx, (Collection<Node>) mGraph.get("nodes"), (Collection<Relationship>) mGraph.get("relationships"));
} else if (graph instanceof VirtualGraph) {
VirtualGraph vGraph = (VirtualGraph) graph;
subGraph = new NodesAndRelsSubGraph(tx, vGraph.nodes(), vGraph.relationships());
} else {
throw new IllegalArgumentException("Supported inputs are VirtualGraph, Map");
}
return new ExportArrowService(db, pools, terminationGuard, logger).stream(subGraph, new ArrowConfig(config));
}

@NotThreadSafe
@Procedure(
name = "apoc.export.arrow.stream.query",
deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Exports the given Cypher query as an arrow byte array.")
public Stream<ByteArrayResult> query(
@Name(value = "query", description = "The query used to collect the data for export.") String query,
@Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }")
Map<String, Object> config) {
Map<String, Object> params = config == null
? Collections.emptyMap()
: (Map<String, Object>) config.getOrDefault("params", Collections.emptyMap());
Result result = tx.execute(query, params);
return new ExportArrowService(db, pools, terminationGuard, logger).stream(result, new ArrowConfig(config));
}

@NotThreadSafe
@Procedure(name = "apoc.export.arrow.all", deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Exports the full database as an arrow file.")
public Stream<ExportProgressInfo> all(
@Name(value = "file", description = "The name of the file to export the data to.") String fileName,
@Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }")
Map<String, Object> config) {
return new ExportArrowService(db, pools, terminationGuard, logger)
.file(fileName, new DatabaseSubGraph(tx), new ArrowConfig(config));
}

@NotThreadSafe
@Procedure(name = "apoc.export.arrow.graph", deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Exports the given graph as an arrow file.")
public Stream<ExportProgressInfo> graph(
@Name(value = "file", description = "The name of the file to export the data to.") String fileName,
@Name(value = "graph", description = "The graph to export.") Object graph,
@Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }")
Map<String, Object> config) {
final SubGraph subGraph;
if (graph instanceof Map) {
Map<String, Object> mGraph = (Map<String, Object>) graph;
if (!mGraph.containsKey("nodes")) {
throw new IllegalArgumentException(
"Graph Map must contains `nodes` field and `relationships` optionally");
}
subGraph = new NodesAndRelsSubGraph(
tx, (Collection<Node>) mGraph.get("nodes"), (Collection<Relationship>) mGraph.get("relationships"));
} else if (graph instanceof VirtualGraph) {
VirtualGraph vGraph = (VirtualGraph) graph;
subGraph = new NodesAndRelsSubGraph(tx, vGraph.nodes(), vGraph.relationships());
} else {
throw new IllegalArgumentException("Supported inputs are VirtualGraph, Map");
}
return new ExportArrowService(db, pools, terminationGuard, logger)
.file(fileName, subGraph, new ArrowConfig(config));
}

@NotThreadSafe
@Procedure(name = "apoc.export.arrow.query", deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Exports the results from the given Cypher query as an arrow file.")
public Stream<ExportProgressInfo> query(
@Name(value = "file", description = "The name of the file to which the data will be exported.")
String fileName,
@Name(value = "query", description = "The query to use to collect the data for export.") String query,
@Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }")
Map<String, Object> config) {
Map<String, Object> params = config == null
? Collections.emptyMap()
: (Map<String, Object>) config.getOrDefault("params", Collections.emptyMap());
Result result = tx.execute(query, params);
return new ExportArrowService(db, pools, terminationGuard, logger)
.file(fileName, result, new ArrowConfig(config));
}
}
187 changes: 187 additions & 0 deletions extended/src/main/java/apoc/load/LoadArrow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package apoc.load;

import apoc.result.LoadDataMapResult;
import apoc.util.FileUtils;
import apoc.util.JsonUtil;
import apoc.util.Util;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.util.Text;
import org.neo4j.graphdb.security.URLAccessChecker;
import org.neo4j.graphdb.security.URLAccessValidationError;
import org.neo4j.kernel.api.QueryLanguage;
import org.neo4j.kernel.api.procedure.QueryLanguageScope;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.values.storable.Values;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class LoadArrow {

@Context
public URLAccessChecker urlAccessChecker;

private static class ArrowSpliterator extends Spliterators.AbstractSpliterator<LoadDataMapResult> {

private final ArrowReader reader;
private final VectorSchemaRoot schemaRoot;
private final AtomicInteger counter;

public ArrowSpliterator(ArrowReader reader, VectorSchemaRoot schemaRoot) throws IOException {
super(Long.MAX_VALUE, Spliterator.ORDERED);
this.reader = reader;
this.schemaRoot = schemaRoot;
this.counter = new AtomicInteger();
this.reader.loadNextBatch();
}

@Override
public synchronized boolean tryAdvance(Consumer<? super LoadDataMapResult> action) {
try {
if (counter.get() >= schemaRoot.getRowCount()) {
if (reader.loadNextBatch()) {
counter.set(0);
} else {
return false;
}
}
final Map<String, Object> row = schemaRoot.getFieldVectors().stream()
.collect(
HashMap::new,
(map, fieldVector) -> map.put(fieldVector.getName(), read(fieldVector, counter.get())),
HashMap::putAll); // please look at https://bugs.openjdk.java.net/browse/JDK-8148463
counter.incrementAndGet();
action.accept(new LoadDataMapResult(row));
return true;
} catch (Exception e) {
return false;
}
}
}

@Procedure(name = "apoc.load.arrow.stream", deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Imports `NODE` and `RELATIONSHIP` values from the provided arrow byte array.")
public Stream<LoadDataMapResult> stream(
@Name(value = "source", description = "The data to load.") byte[] source,
@Name(value = "config", defaultValue = "{}", description = "This value is never used.")
Map<String, Object> config)
throws IOException {
RootAllocator allocator = new RootAllocator();
ByteArrayInputStream inputStream = new ByteArrayInputStream(source);
ArrowStreamReader streamReader = new ArrowStreamReader(inputStream, allocator);
VectorSchemaRoot schemaRoot = streamReader.getVectorSchemaRoot();
return StreamSupport.stream(new ArrowSpliterator(streamReader, schemaRoot), false)
.onClose(() -> {
Util.close(allocator);
Util.close(streamReader);
Util.close(schemaRoot);
Util.close(inputStream);
});
}

@Procedure(name = "apoc.load.arrow", deprecatedBy = "This procedure is being moved to APOC Extended.")
@Deprecated
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Imports `NODE` and `RELATIONSHIP` values from the provided arrow file.")
public Stream<LoadDataMapResult> file(
@Name(value = "file", description = "The name of the file to import data from.") String fileName,
@Name(value = "config", defaultValue = "{}", description = "This value is never used.")
Map<String, Object> config)
throws IOException, URISyntaxException, URLAccessValidationError {
final SeekableByteChannel channel = FileUtils.inputStreamFor(fileName, null, null, null, urlAccessChecker)
.asChannel();
RootAllocator allocator = new RootAllocator();
ArrowFileReader streamReader = new ArrowFileReader(channel, allocator);
VectorSchemaRoot schemaRoot = streamReader.getVectorSchemaRoot();
return StreamSupport.stream(new ArrowSpliterator(streamReader, schemaRoot), false)
.onClose(() -> {
Util.close(allocator);
Util.close(streamReader);
Util.close(schemaRoot);
Util.close(channel);
});
}

private static Object read(FieldVector fieldVector, int index) {
if (fieldVector.isNull(index)) {
return null;
} else if (fieldVector instanceof DateMilliVector) {
DateMilliVector fe = (DateMilliVector) fieldVector;
return Instant.ofEpochMilli(fe.get(index)).atOffset(ZoneOffset.UTC);
} else if (fieldVector instanceof BitVector) {
BitVector fe = (BitVector) fieldVector;
return fe.get(index) == 1;
} else {
Object object = fieldVector.getObject(index);
return getObject(object);
}
}

private static Object getObject(Object object) {
if (object instanceof Collection) {
return ((Collection<?>) object).stream().map(LoadArrow::getObject).collect(Collectors.toList());
}
if (object instanceof Map) {
return ((Map<String, Object>) object)
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getObject(e.getValue())));
}
if (object instanceof Text) {
return object.toString();
}
try {
// we test if is a valid Neo4j type
return Values.of(object);
} catch (Exception e) {
// otherwise we try coerce it
return valueToString(object);
}
}

private static String valueToString(Object value) {
return JsonUtil.writeValueAsString(value);
}
}
Loading

0 comments on commit eb31c8d

Please sign in to comment.