Skip to content

Commit

Permalink
0.19.6
Browse files Browse the repository at this point in the history
* Revert to 0.19.4
  • Loading branch information
a-hansen authored Feb 21, 2020
1 parent c37de90 commit 9e3b81f
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 155 deletions.
9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,3 @@ A local test run requires a broker to be actively running.

Running: <br />
`./gradlew run -Dexec.args="--broker http://localhost:8080/conn"`

## Watch Group Logging Type Settings

There are four possible values for the `Logging Type` of a watch group:
- `None`: Nothing will be logged
- `Interval`: Logging will happen on an interval (defined by the `Interval` setting)
- `All Data` and `Point Change`: Logging will happen for every update of the path's value
- The only difference between `All data` and `Point Change` is that `Point Change` will ignore an update if the value is the same as the last recorded value (in this run of the program). Since dglux only sends subscription updates on value changes (and once on the start of the subscription), this means there's not really any difference, aside from maybe some edge cases.
- Possibly important to note: Restarting the DSLink is not one of those edge cases. Both `All Data` and `Point Change` will record the initial value when the watch is started up, and so both will end up with a duplicate value in the database.
14 changes: 3 additions & 11 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
apply plugin: 'application'
apply plugin: 'findbugs'
apply plugin: 'java-library'

mainClassName = 'org.dsa.iot.etsdb.Main'
sourceCompatibility = 1.7
targetCompatibility = 1.7
version = '0.19.5'
version = '0.19.6'

repositories {
mavenLocal()
mavenCentral()
jcenter()
maven {
url 'https://oss.sonatype.org/content/repositories/snapshots/'
}
}

wrapper {
gradleVersion = '4.10'
gradleVersion = '6.1'
}

dependencies {
Expand All @@ -31,11 +30,4 @@ run {
workingDir project.buildDir
}

tasks.withType(FindBugs) {
reports {
xml.enabled = false
html.enabled = true
}
}

applicationDistribution.from(new File(project.projectDir, "/dslink.json"))
2 changes: 1 addition & 1 deletion dslink.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dslink-java-etsdb",
"version": "0.19.5",
"version": "0.19.6",
"description": "Historian DSLink implementation for ETSDB",
"license": "Apache",
"author": {
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 1 addition & 1 deletion gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
DEFAULT_JVM_OPTS='"-Xmx64m"'

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
Expand Down
2 changes: 1 addition & 1 deletion gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
set DEFAULT_JVM_OPTS="-Xmx64m"

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
Expand Down
43 changes: 0 additions & 43 deletions src/main/java/org/dsa/iot/etsdb/db/Db.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
import org.dsa.iot.dslink.node.actions.Action;
import org.dsa.iot.dslink.node.actions.ActionResult;
import org.dsa.iot.dslink.node.actions.Parameter;
import org.dsa.iot.dslink.node.actions.table.Row;
import org.dsa.iot.dslink.node.value.Value;
import org.dsa.iot.dslink.node.value.ValueType;
import org.dsa.iot.dslink.util.NodeUtils;
import org.dsa.iot.dslink.util.Objects;
import org.dsa.iot.dslink.util.TimeUtils;
import org.dsa.iot.dslink.util.handler.CompleteHandler;
import org.dsa.iot.dslink.util.handler.Handler;
import org.dsa.iot.etsdb.serializer.ByteData;
Expand All @@ -25,8 +23,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -256,32 +252,6 @@ public void handle(ActionResult event) {
}));
b.build();
}

{
NodeBuilder b = parent.createChild("purge");
b.setDisplayName("Purge");
b.setSerializable(false);
b.setAction(new Action(getProvider().dbPermission(),
new Handler<ActionResult>() {
@Override
public void handle(ActionResult event) {
Value to = event.getParameter("Purge To");
if (to != null && to.getString() != null) {
long toTs = TimeUtils.decode(to.getString());
List<String> series = getSanitizedSeriesIds();
for (String s: series) {
LOGGER.info("Manually purging series " + s);
db.delete(s, 0, toTs);
db.purge(s, toTs);
}
LOGGER.info("Done manually purging");
event.getTable().addRow(Row.make(new Value(true)));
}
}
}).addParameter(new Parameter("Purge To", ValueType.STRING).setDescription("Purge everything older than this date/time"))
.addResult(new Parameter("Success", ValueType.BOOL)));
b.build();
}

{
NodeBuilder b = parent.createChild("wps");
Expand Down Expand Up @@ -588,17 +558,4 @@ public void handle(ActionResult event, Map<String, Value> params) {
setDiskSpaceRemaining(vD.getNumber().intValue());
}
}

public List<String> getSanitizedSeriesIds() {
DatabaseImpl<ByteData> db = getDb();
List<String> series = db.getSeriesIds();
if (File.separatorChar != '/') {
List<String> corrected = new ArrayList<String>();
for (String s: series) {
corrected.add(s.replace(File.separatorChar, '/'));
}
series = corrected;
}
return series;
}
}
16 changes: 3 additions & 13 deletions src/main/java/org/dsa/iot/etsdb/db/DbProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
import org.dsa.iot.historian.database.DatabaseProvider;
import org.dsa.iot.historian.database.Watch;
import org.dsa.iot.historian.utils.TimeParser;
import org.etsdb.TimeRange;
import org.etsdb.TypeOverrideTypes;
import org.etsdb.impl.DatabaseImpl;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -148,7 +146,7 @@ public void handle(ActionResult event) {
path = StringUtils.decodeName(path);

DatabaseImpl<ByteData> db = ((Db) database).getDb();
deleteRange(db, path, fromTs, toTs);
db.delete(path, fromTs, toTs);
}
});
{
Expand Down Expand Up @@ -181,17 +179,9 @@ public void run() {
public void deleteRange(Watch watch, long fromTs, long toTs) {
final Database database = watch.getGroup().getDb();
DatabaseImpl<ByteData> db = ((Db) database).getDb();
deleteRange(db, watch.getPath(), fromTs, toTs);
db.delete(watch.getPath(), fromTs, toTs);
}

private void deleteRange(DatabaseImpl<ByteData> db, String path, long fromTs, long toTs) {
TimeRange tr = db.getTimeRange(Collections.singletonList(path));
if (tr != null && !tr.isUndefined() && fromTs <= tr.getFrom()) {
db.purge(path, toTs);
}
db.delete(path, fromTs, toTs);
}


private void addOverrideTypeAction(final Node node, Permission permission) {
NodeBuilder nodeBuilder = node.createChild("overrideType");
nodeBuilder.setDisplayName("Override data point type");
Expand Down
117 changes: 48 additions & 69 deletions src/main/java/org/dsa/iot/etsdb/db/DbPurger.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,14 @@ public class DbPurger {
private ScheduledFuture<?> fut;
private boolean running;

public void addDb(Db db) {
synchronized(databases) {
if (!databases.contains(db)) {
databases.add(db);
}
}
public synchronized void addDb(Db db) {
if (!databases.contains(db)) {
databases.add(db);
}
}

public void removeDb(Db db) {
synchronized (databases) {
databases.remove(db);
}
public synchronized void removeDb(Db db) {
databases.remove(db);
}

public void stop() {
Expand All @@ -49,64 +45,56 @@ public void stop() {
}
}

public void setupPurger() {
void setupPurger() {
running = true;
Runnable runner = new Runnable() {
@Override
public void run() {
synchronized (databases) {
for (Db db : databases) {
if (!(db.isPurgeable() && running)) {
continue;
for (Db db : databases) {
if (!(db.isPurgeable() && running)) {
continue;
}

File path = db.getPath();
long curr = path.getUsableSpace();
long request = db.getDiskSpaceRemaining();
long delCount = 0;
if (curr - request <= 0) {
if (!running) {
break;
}

File path = db.getPath();
long curr = path.getUsableSpace();
long request = db.getDiskSpaceRemaining();
long delCount = 0;
long shardDelCount = 0;
LOGGER.info("Deciding whether to purge");
LOGGER.info("curr = " + curr + " , request = " + request);
if (curr - request <= 0) {
if (!running) {
break;
}
LOGGER.info("Going to purge");
DatabaseImpl<ByteData> realDb = db.getDb();
List<String> series = db.getSanitizedSeriesIds();
// LOGGER.info("Purge Step 1");
while (curr - request <= 0) {
// LOGGER.info("Purge Step 2");
TimeRange range = realDb.getTimeRange(series);
if (range == null || range.isUndefined()) {
break;
}
// LOGGER.info("Purge Step 3");
long from = range.getFrom();
long to = getToFromFrom(from);
for (String s : series) {
// LOGGER.info("Purge Step 4");
delCount += realDb.delete(s, from, to);
int openShards = realDb.getOpenShards();
realDb.purge(s, to);
shardDelCount += (openShards - realDb.getOpenShards());
}
// LOGGER.info("Purge Step 5");
if (delCount <= 0 && shardDelCount <= 0) {
break;
}
// LOGGER.info("Purge Step 6");
curr = path.getUsableSpace();
LOGGER.info("Purge progress: deleted " + delCount + "records so far");
LOGGER.info("curr = " + curr + " , request = " + request);
}
DatabaseImpl<ByteData> realDb = db.getDb();

List<String> series = realDb.getSeriesIds();
if (File.separatorChar != '/') {
List<String> corrected = new ArrayList<String>();
for (String s: series) {
corrected.add(s.replace(File.separatorChar, '/'));
}
series = corrected;
}
if (delCount > 0) {
String p = path.getPath();
LOGGER.info("Deleted {} records from {}", delCount, p);
while (curr - request <= 0) {
TimeRange range = realDb.getTimeRange(series);
if (range == null || range.isUndefined()) {
break;
}

long from = range.getFrom();
for (String s : series) {
delCount += realDb.delete(s, from, from + 3600000);
}

if (delCount <= 0) {
break;
}
curr = path.getUsableSpace();
}
}
}
if (delCount > 0) {
String p = path.getPath();
LOGGER.info("Deleted {} records from {}", delCount, p);
}
}
}
};
ScheduledThreadPoolExecutor stpe = Objects.getDaemonThreadPool();
Expand All @@ -115,13 +103,4 @@ public void run() {
fut = stpe.scheduleWithFixedDelay(runner, 30, 30, u);
}
}

private static long getToFromFrom(long from) {
long diff = System.currentTimeMillis() - from;
long range = (long) (diff * .15);
if (range < 3600000) {
range = 3600000;
}
return from + range;
}
}
4 changes: 2 additions & 2 deletions src/main/java/org/etsdb/impl/DataShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ else if (scanInfo.getOffset() >= toOffset) {
long getMinTs() throws IOException {
if (!dataFile.exists()) {
if (cache == null || cache.isEmpty()) {
return Utils.getLastTimestamp(shardId);
return Long.MIN_VALUE;
}
return Utils.getTimestamp(shardId, cache.getList().get(0).getOffset());
}
Expand All @@ -342,7 +342,7 @@ long getMinTs() throws IOException {
readSample(in, scanInfo);

if (scanInfo.isEndOfShard()) {
return Utils.getLastTimestamp(shardId);
return Long.MAX_VALUE;
}
return Utils.getTimestamp(shardId, scanInfo.getOffset());
} finally {
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/org/etsdb/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ public static long getSampleOffset(long ts) {
public static long getTimestamp(long shardFile, long offset) {
return (shardFile << SHARD_BITS) | offset;
}

public static long getLastTimestamp(long shardFile) {
return getTimestamp(shardFile, 0x3fffffff);
}

public static void closeQuietly(Closeable c) {
try {
Expand Down

0 comments on commit 9e3b81f

Please sign in to comment.