Skip to content

Commit

Permalink
Fixes neo4j/apoc#126: apoc.periodic.submit fails with schema operatio…
Browse files Browse the repository at this point in the history
…ns (neo4j/apoc#208) (#3234) (#3246)

* Fixes neo4j/apoc#126: apoc.periodic.submit fails with schema operations (neo4j/apoc#208)
* Fixes neo4j/apoc#126: Improve validateQuery (neo4j/apoc#218)
* removed unused imports
* fixed full compilation error
  • Loading branch information
vga91 authored Oct 25, 2022
1 parent 212a4fc commit bc9e62f
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 118 deletions.
123 changes: 13 additions & 110 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import apoc.Pools;
import apoc.util.Util;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
Expand All @@ -17,11 +16,6 @@
import org.neo4j.logging.Log;
import org.neo4j.procedure.*;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.*;
import java.util.concurrent.*;
Expand All @@ -33,11 +27,14 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static apoc.periodic.PeriodicUtils.getJobInfo;
import static apoc.periodic.PeriodicUtils.schedule;
import static org.neo4j.graphdb.QueryExecutionType.QueryType;
import static apoc.periodic.PeriodicUtils.submitJob;
import static apoc.periodic.PeriodicUtils.submitProc;
import static apoc.util.Util.merge;

public class Periodic {

public static final String ERROR_DATE_BEFORE = "The provided date is before current date";

enum Planner {DEFAULT, COST, IDP, DP }

Expand Down Expand Up @@ -175,45 +172,7 @@ public Stream<JobInfo> cancel(@Name("name") String name) {
@Description("apoc.periodic.submit('name',statement,params) - submit a one-off background statement; parameter 'params' is optional and can contain query parameters for Cypher statement")
public Stream<JobInfo> submit(@Name("name") String name, @Name("statement") String statement, @Name(value = "params", defaultValue = "{}") Map<String,Object> config) {
validateQuery(statement);
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());

final Temporal atTime = (Temporal) (config.get("atTime"));

final Runnable task = () -> {
try {
db.executeTransactionally(statement, params);
} catch (Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
}
};

JobInfo info = atTime != null
? getJobInfo(name, atTime, task, ScheduleType.DEFAULT)
: submit(name, task);

return Stream.of(info);
}

private JobInfo getJobInfo(String name, Temporal atTime, Runnable task, ScheduleType scheduleType) {
if (atTime instanceof LocalDate) {
atTime = ((LocalDate) atTime).atStartOfDay();
}
final boolean isTime = atTime instanceof OffsetTime || atTime instanceof LocalTime;
Temporal now = isTime
? LocalTime.now()
: LocalDateTime.now();

final long secPerDay = DateUtils.MILLIS_PER_DAY / 1000L;
long delay = now.until(atTime, ChronoUnit.SECONDS);
if (isTime && delay < 0) {
// we consider the day after
delay = delay + secPerDay;
}
if (delay < 0) {
throw new RuntimeException(ERROR_DATE_BEFORE);
}
return schedule(name, task, delay, secPerDay, scheduleType);
return submitProc(name, statement, config, db, log, pools);
}

@Procedure(mode = Mode.WRITE)
Expand All @@ -227,9 +186,9 @@ public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") Stri
};
final JobInfo info;
if (rateOrTime instanceof Long) {
info = schedule(name, runnable,0, (long) rateOrTime);
info = schedule(name, runnable,0, (long) rateOrTime, log, pools);
} else if(rateOrTime instanceof Temporal) {
info = getJobInfo(name, (Temporal) rateOrTime, runnable, ScheduleType.FIXED_RATE);
info = getJobInfo(name, (Temporal) rateOrTime, runnable, log, pools, PeriodicUtils.ScheduleType.FIXED_RATE);
} else {
throw new RuntimeException("invalid type of rateOrTime parameter");
}
Expand All @@ -238,77 +197,21 @@ public Stream<JobInfo> repeat(@Name("name") String name, @Name("statement") Stri
}

private void validateQuery(String statement) {
Util.validateQuery(db, statement);
Util.validateQuery(db, statement,
Set.of(Mode.WRITE, Mode.READ, Mode.DEFAULT),
QueryType.READ_ONLY, QueryType.WRITE, QueryType.READ_WRITE);
}

@Procedure(mode = Mode.WRITE)
@Description("apoc.periodic.countdown('name',statement,repeat-rate-in-seconds) submit a repeatedly-called background statement until it returns 0")
public Stream<JobInfo> countdown(@Name("name") String name, @Name("statement") String statement, @Name("rate") long rate) {
validateQuery(statement);
JobInfo info = submit(name, new Countdown(name, statement, rate, log));
JobInfo info = submitJob(name, new Countdown(name, statement, rate, log), log, pools);
info.rate = rate;
return Stream.of(info);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public <T> JobInfo submit(String name, Runnable task) {
JobInfo info = new JobInfo(name);
Future<T> future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask);
pools.getJobList().put(info,newFuture);
return info;
}

private enum ScheduleType { DEFAULT, FIXED_DELAY, FIXED_RATE }

public JobInfo schedule(String name, Runnable task, long delay, long repeat) {
return schedule(name, task, delay, repeat, ScheduleType.FIXED_DELAY);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public JobInfo schedule(String name, Runnable task, long delay, long repeat, ScheduleType isFixedDelay) {
JobInfo info = new JobInfo(name, delay, isFixedDelay.equals(ScheduleType.DEFAULT) ? 0 : repeat);
Future future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
ScheduledFuture<?> newFuture = getScheduledFuture(wrappingTask, delay, repeat, isFixedDelay);
pools.getJobList().put(info,newFuture);
return info;
}

private ScheduledFuture<?> getScheduledFuture(Runnable wrappingTask, long delay, long repeat, ScheduleType isFixedDelay) {
final ScheduledExecutorService service = pools.getScheduledExecutorService();
final TimeUnit timeUnit = TimeUnit.SECONDS;
switch (isFixedDelay) {
case FIXED_DELAY:
return service.scheduleWithFixedDelay(wrappingTask, delay, repeat, timeUnit);
case FIXED_RATE:
return service.scheduleAtFixedRate(wrappingTask, delay, repeat, timeUnit);
default:
return service.schedule(wrappingTask, delay, timeUnit);
}
}

private static Runnable wrapTask(String name, Runnable task, Log log) {
return () -> {
log.debug("Executing task " + name);
try {
task.run();
} catch (Exception e) {
log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e);
throw e;
}
log.debug("Executed task " + name);
};
}

/**
* Invoke cypherAction in batched transactions being fed from cypherIteration running in main thread
Expand Down Expand Up @@ -520,7 +423,7 @@ public Countdown(String name, String statement, long rate, Log log) {
@Override
public void run() {
if (Periodic.this.executeNumericResultStatement(statement, Collections.emptyMap()) > 0) {
pools.getScheduledExecutorService().schedule(() -> submit(name, this), rate, TimeUnit.SECONDS);
pools.getScheduledExecutorService().schedule(() -> submitJob(name, this, log, pools), rate, TimeUnit.SECONDS);
}
}
}
Expand Down
116 changes: 116 additions & 0 deletions core/src/main/java/apoc/periodic/PeriodicUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,30 @@

import apoc.Pools;
import apoc.util.Util;
import org.apache.commons.lang3.time.DateUtils;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Transaction;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.logging.Log;
import org.neo4j.procedure.TerminationGuard;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
Expand All @@ -23,11 +34,16 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static apoc.periodic.Periodic.JobInfo;

public class PeriodicUtils {

private PeriodicUtils() {

}
public enum ScheduleType { DEFAULT, FIXED_DELAY, FIXED_RATE }

public static final String ERROR_DATE_BEFORE = "The provided date is before current date";

public static Pair<String,Boolean> prepareInnerStatement(String cypherAction, BatchMode batchMode, List<String> columns, String iteratorVariableName) {
String names = columns.stream().map(Util::quote).collect(Collectors.joining("|"));
Expand Down Expand Up @@ -116,6 +132,106 @@ public static Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThre
}
return Stream.of(collector.getResult());
}

public static Stream<JobInfo> submitProc(String name, String statement, Map<String, Object> config, GraphDatabaseService db, Log log, Pools pools) {
Map<String,Object> params = (Map)config.getOrDefault("params", Collections.emptyMap());

final Temporal atTime = (Temporal) (config.get("atTime"));

final Runnable task = () -> {
try {
db.executeTransactionally(statement, params);
} catch (Exception e) {
log.warn("in background task via submit", e);
throw new RuntimeException(e);
}
};

JobInfo info = atTime != null
? getJobInfo(name, atTime, task, log, pools, ScheduleType.DEFAULT)
: submitJob(name, task, log, pools);

return Stream.of(info);
}

public static JobInfo getJobInfo(String name, Temporal atTime, Runnable task, Log log, Pools pools, ScheduleType scheduleType) {
if (atTime instanceof LocalDate) {
atTime = ((LocalDate) atTime).atStartOfDay();
}
final boolean isTime = atTime instanceof OffsetTime || atTime instanceof LocalTime;
Temporal now = isTime
? LocalTime.now()
: LocalDateTime.now();

final long secPerDay = DateUtils.MILLIS_PER_DAY / 1000L;
long delay = now.until(atTime, ChronoUnit.SECONDS);
if (isTime && delay < 0) {
// we consider the day after
delay = delay + secPerDay;
}
if (delay < 0) {
throw new RuntimeException(ERROR_DATE_BEFORE);
}
return schedule(name, task, delay, secPerDay, log, pools, scheduleType);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public static <T> JobInfo submitJob(String name, Runnable task, Log log, Pools pools) {
JobInfo info = new JobInfo(name);
Future<T> future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
Future newFuture = pools.getScheduledExecutorService().submit(wrappingTask);
pools.getJobList().put(info,newFuture);
return info;
}

public static JobInfo schedule(String name, Runnable task, long delay, long repeat, Log log, Pools pools) {
return schedule(name, task, delay, repeat, log, pools, ScheduleType.FIXED_DELAY);
}

/**
* Call from a procedure that gets a <code>@Context GraphDatbaseAPI db;</code> injected and provide that db to the runnable.
*/
public static JobInfo schedule(String name, Runnable task, long delay, long repeat, Log log, Pools pools, ScheduleType isFixedDelay) {
JobInfo info = new JobInfo(name, delay, isFixedDelay.equals(ScheduleType.DEFAULT) ? 0 : repeat);
Future future = pools.getJobList().remove(info);
if (future != null && !future.isDone()) future.cancel(false);

Runnable wrappingTask = wrapTask(name, task, log);
ScheduledFuture<?> newFuture = getScheduledFuture(wrappingTask, delay, repeat, pools, isFixedDelay);
pools.getJobList().put(info,newFuture);
return info;
}

private static ScheduledFuture<?> getScheduledFuture(Runnable wrappingTask, long delay, long repeat, Pools pools, ScheduleType isFixedDelay) {
final ScheduledExecutorService service = pools.getScheduledExecutorService();
final TimeUnit timeUnit = TimeUnit.SECONDS;
switch (isFixedDelay) {
case FIXED_DELAY:
return service.scheduleWithFixedDelay(wrappingTask, delay, repeat, timeUnit);
case FIXED_RATE:
return service.scheduleAtFixedRate(wrappingTask, delay, repeat, timeUnit);
default:
return service.schedule(wrappingTask, delay, timeUnit);
}
}

public static Runnable wrapTask(String name, Runnable task, Log log) {
return () -> {
log.debug("Executing task " + name);
try {
task.run();
} catch (Exception e) {
log.error("Error while executing task " + name + " because of the following exception (the task will be killed):", e);
throw e;
}
log.debug("Executed task " + name);
};
}
}

/*
Expand Down
Loading

0 comments on commit bc9e62f

Please sign in to comment.