Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NOID] Backport runMany updates #4038

Open
wants to merge 1 commit into
base: 4.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/apoc/cypher/Cypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private Object consumeResult(Result result, BlockingQueue<RowResult> queue, bool
}
}

private String removeShellControlCommands(String stmt) {
public static String removeShellControlCommands(String stmt) {
Matcher matcher = shellControl.matcher(stmt.trim());
if (matcher.find()) {
// an empty file get transformed into ":begin\n:commit" and that statement is not matched by the pattern
Expand All @@ -257,7 +257,7 @@ private boolean isPeriodicOperation(String stmt) {
return stmt.matches("(?is).*using\\s+periodic.*");
}

private Map<String, Object> toMap(QueryStatistics stats, long time, long rows) {
protected static Map<String, Object> toMap(QueryStatistics stats, long time, long rows) {
final Map<String, Object> map = map(
"rows", rows,
"time", time);
Expand Down
237 changes: 128 additions & 109 deletions full/src/main/java/apoc/cypher/CypherExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -59,6 +61,7 @@
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.security.AuthorizationViolationException;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
Expand All @@ -76,7 +79,6 @@
@Extended
public class CypherExtended {

public static final String COMPILED_PREFIX = "CYPHER runtime=" + Util.COMPILED;
public static final int PARTITIONS = 100 * Runtime.getRuntime().availableProcessors();
public static final int MAX_BATCH = 10000;

Expand Down Expand Up @@ -134,42 +136,84 @@ private Stream<RowResult> runNonSchemaFiles(
@SuppressWarnings("unchecked")
final Map<String, Object> parameters =
(Map<String, Object>) config.getOrDefault("parameters", Collections.emptyMap());
final boolean schemaOperation = false;
return runFiles(fileNames, config, parameters, schemaOperation, defaultStatistics);
return runFiles(fileNames, config, parameters, defaultStatistics);
}

// This runs the files sequentially
private Stream<RowResult> runFiles(
List<String> fileNames,
Map<String, Object> config,
Map<String, Object> parameters,
boolean schemaOperation,
boolean defaultStatistics) {
List<String> fileNames, Map<String, Object> config, Map<String, Object> params, boolean defaultStatistics) {
boolean reportError = Util.toBoolean(config.get("reportError"));
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics", defaultStatistics));
int timeout = Util.toInteger(config.getOrDefault("timeout", 10));
int queueCapacity = Util.toInteger(config.getOrDefault("queueCapacity", 100));
var result = fileNames.stream().flatMap(fileName -> {
return fileNames.stream().flatMap(fileName -> {
final Reader reader = readerForFile(fileName);
final Scanner scanner = createScannerFor(reader);
return runManyStatements(
scanner,
parameters,
schemaOperation,
addStatistics,
timeout,
queueCapacity,
reportError,
fileName)
.onClose(() -> Util.close(
scanner,
(e) -> log.info(
"Cannot close the scanner for file " + fileName
+ " because the following exception",
e)));
AtomicBoolean hasFailed = new AtomicBoolean(false);
return Iterators.stream(new Scanner(reader).useDelimiter(";\r?\n"))
.map(Cypher::removeShellControlCommands)
.filter(s -> !s.isBlank())
.flatMap(s -> streamInNewTx(s, params, addStatistics, fileName, reportError, hasFailed));
});
}

private Stream<RowResult> streamInNewTx(
String cypher,
Map<String, Object> params,
boolean stats,
String fileName,
boolean reportError,
AtomicBoolean hasFailed) {
if (hasFailed.get()) return null;
else if (isPeriodicOperation(cypher))
return streamInNewExplicitTx(cypher, params, stats, fileName, reportError);
final var innerTx = db.beginTx();
try {
// Hello fellow wanderer,
// At this point you may have questions like;
// - "Why do we execute this statement in a new transaction?"
// My guess is as good as yours. This is the way of the apoc. Safe travels.

final var results = new RunManyResultSpliterator(innerTx.execute(cypher, params), stats, fileName, tx);
return StreamSupport.stream(results, false).onClose(results::close).onClose(innerTx::commit);
} catch (AuthorizationViolationException accessModeException) {
// We meet again, few people make it this far into this world!
// I hope you're not still seeking answers, there are few to give.
// It has been written, in some long forgotten commits,
// that failures of this kind should be avoided. The ancestors
// were brave and used a regex based cypher parser to avoid
// trying to execute schema changing statements all together.
// We don't have that courage, and try to forget about it
// after the fact instead.
// One can only hope that by keeping this tradition alive,
// in some form, we make some poor souls happier.
innerTx.close();
return Stream.empty();
} catch (Throwable t) {
innerTx.close();
hasFailed.set(true);
if (reportError) {
String error = t.getMessage();
return Stream.of(new RowResult(-1, Map.of("error", error), fileName));
} else {
return null;
}
}
}

return result;
private Stream<RowResult> streamInNewExplicitTx(
String cypher, Map<String, Object> params, boolean stats, String fileName, boolean reportError) {
try {
final var results = new RunManyResultSpliterator(
db.executeTransactionally(cypher, params, result -> result), stats, fileName, tx);
return StreamSupport.stream(results, false).onClose(results::close);
} catch (AuthorizationViolationException accessModeException) {
return Stream.empty();
} catch (Throwable t) {
if (reportError) {
String error = t.getMessage();
return Stream.of(new RowResult(-1, Map.of("error", error), fileName));
} else {
return null;
}
}
}

@Procedure(mode = Mode.SCHEMA)
Expand All @@ -186,34 +230,8 @@ public Stream<RowResult> runSchemaFile(
public Stream<RowResult> runSchemaFiles(
@Name("file") List<String> fileNames,
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
final boolean schemaOperation = true;
final Map<String, Object> parameters = Collections.emptyMap();
return runFiles(fileNames, config, parameters, schemaOperation, true);
}

private Stream<RowResult> runManyStatements(
Scanner scanner,
Map<String, Object> params,
boolean schemaOperation,
boolean addStatistics,
int timeout,
int queueCapacity,
boolean reportError,
String fileName) {
BlockingQueue<RowResult> queue = runInSeparateThreadAndSendTombstone(
queueCapacity,
internalQueue -> {
if (schemaOperation) {
runSchemaStatementsInTx(
scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
} else {
runDataStatementsInTx(
scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
}
},
RowResult.TOMBSTONE);
return StreamSupport.stream(
new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard, Integer.MAX_VALUE), false);
return runFiles(fileNames, config, parameters, true);
}

private <T> BlockingQueue<T> runInSeparateThreadAndSendTombstone(
Expand Down Expand Up @@ -246,7 +264,6 @@ private void runDataStatementsInTx(
BlockingQueue<RowResult> queue,
Map<String, Object> params,
boolean addStatistics,
long timeout,
boolean reportError,
String fileName) {
while (scanner.hasNext()) {
Expand Down Expand Up @@ -294,43 +311,6 @@ private void collectError(BlockingQueue<RowResult> queue, boolean reportError, E
QueueUtil.put(queue, result, 10);
}

private Scanner createScannerFor(Reader reader) {
Scanner scanner = new Scanner(reader);
scanner.useDelimiter(";\r?\n");
return scanner;
}

private void runSchemaStatementsInTx(
Scanner scanner,
BlockingQueue<RowResult> queue,
Map<String, Object> params,
boolean addStatistics,
long timeout,
boolean reportError,
String fileName) {
while (scanner.hasNext()) {
String stmt = removeShellControlCommands(scanner.next());
if (stmt.trim().isEmpty()) continue;
boolean schemaOperation;
try {
schemaOperation = isSchemaOperation(stmt);
} catch (Exception e) {
collectError(queue, reportError, e, fileName);
return;
}
if (schemaOperation) {
Util.inTx(db, pools, txInThread -> {
try (Result result = txInThread.execute(stmt, params)) {
return consumeResult(result, queue, addStatistics, tx, fileName);
} catch (Exception e) {
collectError(queue, reportError, e, fileName);
return null;
}
});
}
}
}

private static final Pattern shellControl =
Pattern.compile("^:?\\b(begin|commit|rollback)\\b", Pattern.CASE_INSENSITIVE);

Expand Down Expand Up @@ -419,10 +399,6 @@ public static String withParamMapping(String fragment, Collection<String> keys)
return declaration + fragment;
}

public static String compiled(String fragment) {
return fragment.substring(0, 6).equalsIgnoreCase("cypher") ? fragment : COMPILED_PREFIX + fragment;
}

@Procedure
@Description(
"apoc.cypher.parallel(fragment, `paramMap`, `keyList`) yield value - executes fragments in parallel through a list defined in `paramMap` with a key `keyList`")
Expand All @@ -445,18 +421,6 @@ public Stream<MapResult> parallel(
parallelParams.replace(key, v);
return tx.execute(statement, parallelParams).stream().map(MapResult::new);
});

/*
params.entrySet().stream()
.filter( e -> asCollection(e.getValue()).size() > 100)
.map( (e) -> (Map.Entry<String,Collection>)(Map.Entry)e )
.max( (max,e) -> e.getValue().size() )
.map( (e) -> e.getValue().parallelStream().map( (v) -> {
Map map = new HashMap<>(params);
map.put(e.getKey(),as)
}));
return db.execute(statement,params).stream().map(MapResult::new);
*/
}

@Procedure
Expand Down Expand Up @@ -590,3 +554,58 @@ private Future<List<Map<String, Object>>> submit(
});
}
}

class RunManyResultSpliterator implements Spliterator<CypherExtended.RowResult>, AutoCloseable {
private final Result result;
private final long start;
private boolean statistics;
private String fileName;
private int rowCount;

private Transaction transaction;

RunManyResultSpliterator(Result result, boolean statistics, String fileName, Transaction transaction) {
this.result = result;
this.start = System.currentTimeMillis();
this.statistics = statistics;
this.fileName = fileName;
this.transaction = transaction;
}

@Override
public boolean tryAdvance(Consumer<? super CypherExtended.RowResult> action) {
if (result.hasNext()) {
Map<String, Object> res = EntityUtil.anyRebind(transaction, result.next());
action.accept(new CypherExtended.RowResult(rowCount++, res, fileName));
return true;
} else if (statistics) {
final var stats =
CypherExtended.toMap(result.getQueryStatistics(), System.currentTimeMillis() - start, rowCount);
statistics = false;
action.accept(new CypherExtended.RowResult(-1, stats, fileName));
return true;
}
close();
return false;
}

@Override
public Spliterator<CypherExtended.RowResult> trySplit() {
return null;
}

@Override
public long estimateSize() {
return result.hasNext() ? Long.MAX_VALUE : 1;
}

@Override
public int characteristics() {
return Spliterator.ORDERED;
}

@Override
public void close() {
result.close();
}
}
Loading
Loading