Skip to content

Commit

Permalink
0.19.5
Browse files Browse the repository at this point in the history
fix some bugs in auto-purge and add a manual purge action
  • Loading branch information
d-shapiro committed Jan 15, 2020
1 parent b137f04 commit 9295e68
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 55 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ apply plugin: 'java-library'
mainClassName = 'org.dsa.iot.etsdb.Main'
sourceCompatibility = 1.7
targetCompatibility = 1.7
version = '0.19.4'
version = '0.19.5'

repositories {
mavenLocal()
Expand Down
2 changes: 1 addition & 1 deletion dslink.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dslink-java-etsdb",
"version": "0.19.4",
"version": "0.19.5",
"description": "Historian DSLink implementation for ETSDB",
"license": "Apache",
"author": {
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/org/dsa/iot/etsdb/db/Db.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import org.dsa.iot.dslink.node.actions.Action;
import org.dsa.iot.dslink.node.actions.ActionResult;
import org.dsa.iot.dslink.node.actions.Parameter;
import org.dsa.iot.dslink.node.actions.table.Row;
import org.dsa.iot.dslink.node.value.Value;
import org.dsa.iot.dslink.node.value.ValueType;
import org.dsa.iot.dslink.util.NodeUtils;
import org.dsa.iot.dslink.util.Objects;
import org.dsa.iot.dslink.util.TimeUtils;
import org.dsa.iot.dslink.util.handler.CompleteHandler;
import org.dsa.iot.dslink.util.handler.Handler;
import org.dsa.iot.etsdb.serializer.ByteData;
Expand All @@ -23,6 +25,8 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -252,6 +256,32 @@ public void handle(ActionResult event) {
}));
b.build();
}

{
NodeBuilder b = parent.createChild("purge");
b.setDisplayName("Purge");
b.setSerializable(false);
b.setAction(new Action(getProvider().dbPermission(),
new Handler<ActionResult>() {
@Override
public void handle(ActionResult event) {
Value to = event.getParameter("Purge To");
if (to != null && to.getString() != null) {
long toTs = TimeUtils.decode(to.getString());
List<String> series = getSanitizedSeriesIds();
for (String s: series) {
LOGGER.info("Manually purging series " + s);
db.delete(s, 0, toTs);
db.purge(s, toTs);
}
LOGGER.info("Done manually purging");
event.getTable().addRow(Row.make(new Value(true)));
}
}
}).addParameter(new Parameter("Purge To", ValueType.STRING).setDescription("Purge everything older than this date/time"))
.addResult(new Parameter("Success", ValueType.BOOL)));
b.build();
}

{
NodeBuilder b = parent.createChild("wps");
Expand Down Expand Up @@ -558,4 +588,17 @@ public void handle(ActionResult event, Map<String, Value> params) {
setDiskSpaceRemaining(vD.getNumber().intValue());
}
}

public List<String> getSanitizedSeriesIds() {
DatabaseImpl<ByteData> db = getDb();
List<String> series = db.getSeriesIds();
if (File.separatorChar != '/') {
List<String> corrected = new ArrayList<String>();
for (String s: series) {
corrected.add(s.replace(File.separatorChar, '/'));
}
series = corrected;
}
return series;
}
}
16 changes: 13 additions & 3 deletions src/main/java/org/dsa/iot/etsdb/db/DbProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.dsa.iot.historian.database.DatabaseProvider;
import org.dsa.iot.historian.database.Watch;
import org.dsa.iot.historian.utils.TimeParser;
import org.etsdb.TimeRange;
import org.etsdb.TypeOverrideTypes;
import org.etsdb.impl.DatabaseImpl;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -146,7 +148,7 @@ public void handle(ActionResult event) {
path = StringUtils.decodeName(path);

DatabaseImpl<ByteData> db = ((Db) database).getDb();
db.delete(path, fromTs, toTs);
deleteRange(db, path, fromTs, toTs);
}
});
{
Expand Down Expand Up @@ -179,9 +181,17 @@ public void run() {
public void deleteRange(Watch watch, long fromTs, long toTs) {
final Database database = watch.getGroup().getDb();
DatabaseImpl<ByteData> db = ((Db) database).getDb();
db.delete(watch.getPath(), fromTs, toTs);
deleteRange(db, watch.getPath(), fromTs, toTs);
}


private void deleteRange(DatabaseImpl<ByteData> db, String path, long fromTs, long toTs) {
TimeRange tr = db.getTimeRange(Collections.singletonList(path));
if (tr != null && !tr.isUndefined() && fromTs <= tr.getFrom()) {
db.purge(path, toTs);
}
db.delete(path, fromTs, toTs);
}

private void addOverrideTypeAction(final Node node, Permission permission) {
NodeBuilder nodeBuilder = node.createChild("overrideType");
nodeBuilder.setDisplayName("Override data point type");
Expand Down
117 changes: 69 additions & 48 deletions src/main/java/org/dsa/iot/etsdb/db/DbPurger.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ public class DbPurger {
private ScheduledFuture<?> fut;
private boolean running;

public synchronized void addDb(Db db) {
if (!databases.contains(db)) {
databases.add(db);
}
public void addDb(Db db) {
synchronized(databases) {
if (!databases.contains(db)) {
databases.add(db);
}
}
}

public synchronized void removeDb(Db db) {
databases.remove(db);
public void removeDb(Db db) {
synchronized (databases) {
databases.remove(db);
}
}

public void stop() {
Expand All @@ -45,56 +49,64 @@ public void stop() {
}
}

void setupPurger() {
public void setupPurger() {
running = true;
Runnable runner = new Runnable() {
@Override
public void run() {
for (Db db : databases) {
if (!(db.isPurgeable() && running)) {
continue;
}

File path = db.getPath();
long curr = path.getUsableSpace();
long request = db.getDiskSpaceRemaining();
long delCount = 0;
if (curr - request <= 0) {
if (!running) {
break;
synchronized (databases) {
for (Db db : databases) {
if (!(db.isPurgeable() && running)) {
continue;
}
DatabaseImpl<ByteData> realDb = db.getDb();

List<String> series = realDb.getSeriesIds();
if (File.separatorChar != '/') {
List<String> corrected = new ArrayList<String>();
for (String s: series) {
corrected.add(s.replace(File.separatorChar, '/'));
}
series = corrected;

File path = db.getPath();
long curr = path.getUsableSpace();
long request = db.getDiskSpaceRemaining();
long delCount = 0;
long shardDelCount = 0;
LOGGER.info("Deciding whether to purge");
LOGGER.info("curr = " + curr + " , request = " + request);
if (curr - request <= 0) {
if (!running) {
break;
}
LOGGER.info("Going to purge");
DatabaseImpl<ByteData> realDb = db.getDb();
List<String> series = db.getSanitizedSeriesIds();
// LOGGER.info("Purge Step 1");
while (curr - request <= 0) {
// LOGGER.info("Purge Step 2");
TimeRange range = realDb.getTimeRange(series);
if (range == null || range.isUndefined()) {
break;
}
// LOGGER.info("Purge Step 3");
long from = range.getFrom();
long to = getToFromFrom(from);
for (String s : series) {
// LOGGER.info("Purge Step 4");
delCount += realDb.delete(s, from, to);
int openShards = realDb.getOpenShards();
realDb.purge(s, to);
shardDelCount += (openShards - realDb.getOpenShards());
}
// LOGGER.info("Purge Step 5");
if (delCount <= 0 && shardDelCount <= 0) {
break;
}
// LOGGER.info("Purge Step 6");
curr = path.getUsableSpace();
LOGGER.info("Purge progress: deleted " + delCount + "records so far");
LOGGER.info("curr = " + curr + " , request = " + request);
}
}
while (curr - request <= 0) {
TimeRange range = realDb.getTimeRange(series);
if (range == null || range.isUndefined()) {
break;
}

long from = range.getFrom();
for (String s : series) {
delCount += realDb.delete(s, from, from + 3600000);
}

if (delCount <= 0) {
break;
}
curr = path.getUsableSpace();
if (delCount > 0) {
String p = path.getPath();
LOGGER.info("Deleted {} records from {}", delCount, p);
}
}
if (delCount > 0) {
String p = path.getPath();
LOGGER.info("Deleted {} records from {}", delCount, p);
}
}
}
}
};
ScheduledThreadPoolExecutor stpe = Objects.getDaemonThreadPool();
Expand All @@ -103,4 +115,13 @@ public void run() {
fut = stpe.scheduleWithFixedDelay(runner, 30, 30, u);
}
}

private static long getToFromFrom(long from) {
long diff = System.currentTimeMillis() - from;
long range = (long) (diff * .15);
if (range < 3600000) {
range = 3600000;
}
return from + range;
}
}
4 changes: 2 additions & 2 deletions src/main/java/org/etsdb/impl/DataShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ else if (scanInfo.getOffset() >= toOffset) {
long getMinTs() throws IOException {
if (!dataFile.exists()) {
if (cache == null || cache.isEmpty()) {
return Long.MIN_VALUE;
return Utils.getLastTimestamp(shardId);
}
return Utils.getTimestamp(shardId, cache.getList().get(0).getOffset());
}
Expand All @@ -342,7 +342,7 @@ long getMinTs() throws IOException {
readSample(in, scanInfo);

if (scanInfo.isEndOfShard()) {
return Long.MAX_VALUE;
return Utils.getLastTimestamp(shardId);
}
return Utils.getTimestamp(shardId, scanInfo.getOffset());
} finally {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/etsdb/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public static long getSampleOffset(long ts) {
public static long getTimestamp(long shardFile, long offset) {
return (shardFile << SHARD_BITS) | offset;
}

public static long getLastTimestamp(long shardFile) {
return getTimestamp(shardFile, 0x3fffffff);
}

public static void closeQuietly(Closeable c) {
try {
Expand Down

0 comments on commit 9295e68

Please sign in to comment.