From b0a3db0b265f0388f42cdbbd1532d77a35d1c705 Mon Sep 17 00:00:00 2001 From: julianev Date: Sat, 30 May 2020 18:42:29 +0200 Subject: [PATCH 1/3] Threshold Frame Implementation and Test --- core/pom.xml | 12 + .../scotty/core/windowType/SessionWindow.java | 6 + .../core/windowType/ThresholdFrame.java | 204 ++++++++++++++ .../windowContext/WindowContext.java | 12 + .../tub/dima/scotty/slicing/SliceManager.java | 11 +- .../test/windowTest/ThresholdTest.java | 261 ++++++++++++++++++ 6 files changed, 504 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java create mode 100644 slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java diff --git a/core/pom.xml b/core/pom.xml index ffd60886..ea427299 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -12,6 +12,18 @@ core + + org.apache.flink + flink-core + 1.8.0 + compile + + + org.apache.flink + flink-core + 1.8.0 + compile + \ No newline at end of file diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java index c30128aa..8929f682 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java @@ -3,6 +3,8 @@ import de.tub.dima.scotty.core.*; import de.tub.dima.scotty.core.windowType.windowContext.*; +import java.util.ArrayList; + public class SessionWindow implements ForwardContextAware { private final WindowMeasure measure; @@ -115,6 +117,10 @@ public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, } } + @Override + public ActiveWindow updateContextWindows(Object element, long ts, ArrayList listOfTs) { + return null; + } } diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java new file mode 100644 index 00000000..f40b2d96 --- /dev/null +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java @@ -0,0 +1,204 @@ +package de.tub.dima.scotty.core.windowType; + +import de.tub.dima.scotty.core.WindowCollector; +import de.tub.dima.scotty.core.windowType.windowContext.WindowContext; +import org.apache.flink.api.java.tuple.Tuple; + +import java.util.ArrayList; + +public class ThresholdFrame implements ForwardContextFree { + /* + * Implements Threshold Frame after the definition by Grossniklaus et al. 2016 + */ + + private final WindowMeasure measure; + private final int threshold; + private final int attribute; + private final long minSize; + + public ThresholdFrame(int threshold){ + this(threshold, 0, 2); + } + + public ThresholdFrame(int threshold, long minSize){ + this(threshold, 0, minSize); + } + + /** + * + * @param attribute the position of an attribute in a tuple which values should be compared to the threshold + * @param threshold the value of the threshold + * @param minSize the minimum count of tuples in the frame, 2 tuples by default + */ + public ThresholdFrame(int threshold, int attribute, long minSize){ + this.measure = WindowMeasure.Time; + this.attribute = attribute; + this.threshold = threshold; + this.minSize = minSize; + } + + @Override + public WindowMeasure getWindowMeasure() { + return this.measure; + } + + @Override + public ThresholdFrameContext createContext() { + return new ThresholdFrameContext(); + } + + public class ThresholdFrameContext extends WindowContext { + + long count = 0; + long gap = 0; + long lastEnd = 0; + + @Override + public ActiveWindow updateContext(Object o, long position) { + //tuple is in-order + int value; + if(o instanceof Tuple){ //fetching the value of the attribute in the tuple + value = (int)((Tuple) o).getField(attribute); + }else{ + value = (int)o; + } + + if (hasActiveWindows()) { + if(value > threshold){ //begin of first frame + count++; + gap = 0; + addNewWindow(0, position, position); + return getWindow(0); + }else{ + return null; + } + } + + int fIndex = getFrame(position); + + if (fIndex == -1) { + if(value > threshold){ + addNewWindow(0, position, position); + } + } else { + ActiveWindow f = getWindow(fIndex); + if (value > threshold) { + if (count == 0 && gap >= 1) { //open new frame + count++; + gap = 0; + return addNewWindow(fIndex + 1, position, position); + } else { //update frame + //append tuple to active frame + count++; + shiftEnd(f, position); + return f; + } + } else { + if (gap == 0) { + if (count >= minSize) { //close frame with first tuple that is below the threshold + shiftEnd(f, position); + count = 0; + gap++; + lastEnd = position; + return f; + } else { //discard frame if it is smaller than minSize + removeWindow(fIndex); + count = 0; + gap++; + } + } else { //tuple that is not included in any frame + count = 0; + gap++; + } + } + } + return null; + } + + @Override + public ActiveWindow updateContextWindows(Object o, long position, ArrayList listOfTs) { + //tuple is out-of-order + int value; + if(o instanceof Tuple){ + value = (int)((Tuple) o).getField(attribute); + }else{ + value = (int)o; + } + + int fIndex = getFrame(position); + + if(value > threshold){ + if((fIndex+1) < numberOfActiveWindows()) { //frame after this one exists + int index = listOfTs.indexOf(position); + long timestampAfter = (long) listOfTs.get(index + 1); + ActiveWindow fNext = getWindow(fIndex + 1); + if (timestampAfter == fNext.getStart()) { //shift frame start of next window to current tuple + shiftStart(fNext, position); + return fNext; + } + //else: simple insert, changes nothing + } + //else: current frame is the last one, tuple belongs to current frame + }else{ + //value below or equal to threshold + ActiveWindow f = getWindow(fIndex); + int index = listOfTs.indexOf(position); + long timestampAfter = (long) listOfTs.get(index + 1); + long timestampBefore = (long) listOfTs.get(index - 1); + long last_ts = f.getEnd(); + + if(fIndex+1 == numberOfActiveWindows()){ //if current frame is last frame + shiftEndAndModify(f, position); + if (timestampAfter == last_ts) { //shift frame end + return f; + } else { //split frame + return addNewWindow(fIndex + 1, timestampAfter, last_ts); + } + }else if((fIndex+1) < numberOfActiveWindows()) { //current frame is not the last frame + ActiveWindow fNext = getWindow(fIndex + 1); + if (timestampAfter != fNext.getStart() && timestampBefore < last_ts) { + //do not shift start of next frame, but shift end + shiftEndAndModify(f, position); + if (timestampAfter == last_ts) { //shift frame end + return f; + } else { //split frame + return addNewWindow(fIndex + 1, timestampAfter, last_ts); + } + } + } + } + return null; + } + + @Override + public long assignNextWindowStart(long position) { + return position; + } + + public int getFrame(long position) { + // returns newest frame + int i = numberOfActiveWindows()-1; + for (; i >= 0 ; i--) { + ActiveWindow p = getWindow(i); + if (p.getStart() <= position) { + return i; + } + } + return -1; + } + + @Override + public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark) { + if(numberOfActiveWindows() > 0) { + ActiveWindow window = getWindow(0); + while (window.getEnd() <= currentWatermark && window.getEnd() <= lastEnd) { + aggregateWindows.trigger(window.getStart(), window.getEnd(), measure); + removeWindow(0); + if (hasActiveWindows()) + return; + window = getWindow(0); + } + } + } + } +} diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java index bb0730ed..627c6069 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java @@ -62,6 +62,11 @@ public void shiftEnd(ActiveWindow window, long position) { window.setEnd(position); } + public void shiftEndAndModify(ActiveWindow window, long position) { + modifiedWindowEdges.add(new ShiftModification(window.end, position)); //adds a ShiftModification for end of window + window.setEnd(position); + } + public abstract ActiveWindow updateContext(Tuple tuple, long position); @@ -70,6 +75,13 @@ public final ActiveWindow updateContext(Tuple tuple, long position, Set listOfTs); //For out-of-order processing + + public ActiveWindow updateContextWindows(Tuple element, long ts, ArrayList listOfTs, Set windowModifications) { + this.modifiedWindowEdges = windowModifications; + return updateContextWindows(element, ts, listOfTs); + } + public abstract long assignNextWindowStart(long position); public abstract void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark); diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index 3c9a9a6f..b32238c3 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -1,5 +1,6 @@ package de.tub.dima.scotty.slicing; +import de.tub.dima.scotty.core.windowType.ThresholdFrame; import de.tub.dima.scotty.core.windowType.windowContext.*; import de.tub.dima.scotty.slicing.aggregationstore.*; import de.tub.dima.scotty.slicing.slice.*; @@ -11,6 +12,7 @@ public class SliceManager { private final SliceFactory sliceFactory; private final AggregationStore aggregationStore; private final WindowManager windowManager; + public ArrayList listOfTs = new ArrayList(); public SliceManager(final SliceFactory sliceFactory, final AggregationStore aggregationStore, final WindowManager windowManager) { this.sliceFactory = sliceFactory; @@ -51,7 +53,7 @@ public void processElement(InputType element, long ts) { } final Slice currentSlice = this.aggregationStore.getCurrentSlice(); - + listOfTs.add(ts); // is element in order? if (ts >= currentSlice.getTLast()) { // in order @@ -66,7 +68,12 @@ public void processElement(InputType element, long ts) { for (WindowContext windowContext : this.windowManager.getContextAwareWindows()) { Set windowModifications = new HashSet<>(); - WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); + if (!(windowContext instanceof ThresholdFrame.ThresholdFrameContext)){ + WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); + }else { + Collections.sort(listOfTs); + WindowContext.ActiveWindow assignedWindow = windowContext.updateContextWindows(element, ts, listOfTs, windowModifications); + } checkSliceEdges(windowModifications); } diff --git a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java new file mode 100644 index 00000000..e209fde2 --- /dev/null +++ b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java @@ -0,0 +1,261 @@ +package de.tub.dima.scotty.slicing.aggregationstore.test.windowTest; + +import de.tub.dima.scotty.core.AggregateWindow; +import de.tub.dima.scotty.core.windowFunction.ReduceAggregateFunction; +import de.tub.dima.scotty.core.windowType.ThresholdFrame; +import de.tub.dima.scotty.slicing.SlicingWindowOperator; +import de.tub.dima.scotty.state.memory.MemoryStateFactory; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class ThresholdTest { + + private SlicingWindowOperator slicingWindowOperator; + private MemoryStateFactory stateFactory; + + @Before + public void setup() { + this.stateFactory = new MemoryStateFactory(); + this.slicingWindowOperator = new SlicingWindowOperator(stateFactory); + } + + @Test + public void inOrderTest() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); + slicingWindowOperator.processElement(1, 14); //end of second frame + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 16); + WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); + } + + @Test + public void inOrderSizeTest() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3, 3)); + + slicingWindowOperator.processElement(5, 4); //start of frame because 5 > 3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(3, 8); //end of frame because 3 = 3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(2, 11); + slicingWindowOperator.processElement(8, 12); //no frame because just 1 tuple + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(5, 16); //start of second frame + slicingWindowOperator.processElement(5,17); + slicingWindowOperator.processElement(4, 19); + slicingWindowOperator.processElement(4, 20); + slicingWindowOperator.processElement(2, 21); //end of second frame + + + List resultWindows = slicingWindowOperator.processWatermark(22); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 16); + WindowAssert.assertEquals(resultWindows.get(1),16, 21, 18); + } + + @Test + public void simpleInsert() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); //end of second frame + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(5, 7); //out-of-order, insert into first frame + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 21); + WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); + } + + @Test + public void simpleInsert2() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(5, 7); + slicingWindowOperator.processElement(4, 5); //out-of-order, insert into frame + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); //end of second frame + slicingWindowOperator.processElement(1, 14); + + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 21); + WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); + } + + @Test + public void noInsertIntoFrame() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 11); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(2, 10); //does not change anything, no insert into any frame + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 16); + WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); + } + + @Test + public void NoShift() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(2, 11); //out-of-order, does not change anything + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 16); + WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); + } + + @Test + public void shiftFrameStart() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(5, 11); //out-of-order, shift frame start to 11 + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 16); + WindowAssert.assertEquals(resultWindows.get(1),11, 14, 17); + } + + @Test + public void shiftFirstFrameStart() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(5, 3); //shifts beginning of first frame + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),3, 8, 21); + WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); + } + + @Test + public void shiftFrameEnd() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(8, 12); //begin of second frame + slicingWindowOperator.processElement(4, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(2, 7); //shift end of frame from 8 to 7 + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 7, 16); + WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); + } + + @Test + public void splitFrame() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(4, 8); + slicingWindowOperator.processElement(5, 9); + slicingWindowOperator.processElement(2, 10);//end of frame because 2<3 + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(2, 7); //splits frame into 2 frames + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 7, 16); + WindowAssert.assertEquals(resultWindows.get(1),8, 10, 9); + } +} From c94fa74587406e78b178c55c781e8ed3eb560108 Mon Sep 17 00:00:00 2001 From: julianev Date: Mon, 24 May 2021 10:16:55 +0200 Subject: [PATCH 2/3] revisit Threshold Frame --- .../scotty/core/windowType/SessionWindow.java | 5 - .../core/windowType/ThresholdFrame.java | 164 +++++++++--------- .../windowContext/WindowContext.java | 7 - .../tub/dima/scotty/slicing/SliceManager.java | 9 +- 4 files changed, 80 insertions(+), 105 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java index 8929f682..da040113 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java @@ -117,11 +117,6 @@ public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, } } - @Override - public ActiveWindow updateContextWindows(Object element, long ts, ArrayList listOfTs) { - return null; - } - } @Override diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java index f40b2d96..e6c4062d 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java @@ -5,29 +5,19 @@ import org.apache.flink.api.java.tuple.Tuple; import java.util.ArrayList; +import java.util.Collections; public class ThresholdFrame implements ForwardContextFree { - /* - * Implements Threshold Frame after the definition by Grossniklaus et al. 2016 - */ private final WindowMeasure measure; private final int threshold; private final int attribute; private final long minSize; - public ThresholdFrame(int threshold){ - this(threshold, 0, 2); - } - - public ThresholdFrame(int threshold, long minSize){ - this(threshold, 0, minSize); - } - /** - * - * @param attribute the position of an attribute in a tuple which values should be compared to the threshold + * Implements Threshold Frame after the definition by Grossniklaus et al. 2016 * @param threshold the value of the threshold + * @param attribute the position of an attribute in a tuple which values should be compared to the threshold * @param minSize the minimum count of tuples in the frame, 2 tuples by default */ public ThresholdFrame(int threshold, int attribute, long minSize){ @@ -37,6 +27,14 @@ public ThresholdFrame(int threshold, int attribute, long minSize){ this.minSize = minSize; } + public ThresholdFrame(int threshold){ + this(threshold, 0, 2); + } + + public ThresholdFrame(int threshold, long minSize){ + this(threshold, 0, minSize); + } + @Override public WindowMeasure getWindowMeasure() { return this.measure; @@ -52,10 +50,10 @@ public class ThresholdFrameContext extends WindowContext { long count = 0; long gap = 0; long lastEnd = 0; + private ArrayList timestamps = new ArrayList(); //holds timestamps of tuples for out-of-order processing @Override public ActiveWindow updateContext(Object o, long position) { - //tuple is in-order int value; if(o instanceof Tuple){ //fetching the value of the attribute in the tuple value = (int)((Tuple) o).getField(attribute); @@ -67,6 +65,7 @@ public ActiveWindow updateContext(Object o, long position) { if(value > threshold){ //begin of first frame count++; gap = 0; + timestamps.add(position); addNewWindow(0, position, position); return getWindow(0); }else{ @@ -76,94 +75,89 @@ public ActiveWindow updateContext(Object o, long position) { int fIndex = getFrame(position); - if (fIndex == -1) { - if(value > threshold){ - addNewWindow(0, position, position); - } - } else { - ActiveWindow f = getWindow(fIndex); - if (value > threshold) { - if (count == 0 && gap >= 1) { //open new frame - count++; - gap = 0; - return addNewWindow(fIndex + 1, position, position); - } else { //update frame - //append tuple to active frame - count++; - shiftEnd(f, position); - return f; + if (position >= timestamps.get(timestamps.size() -1)) { + //processes in-order tuples + timestamps.add(position); + + if (fIndex == -1) { + if (value > threshold) { + addNewWindow(0, position, position); } } else { - if (gap == 0) { - if (count >= minSize) { //close frame with first tuple that is below the threshold + ActiveWindow f = getWindow(fIndex); + if (value > threshold) { + if (count == 0 && gap >= 1) { //open new frame + count++; + gap = 0; + return addNewWindow(fIndex + 1, position, position); + } else { //update frame + //append tuple to active frame + count++; shiftEnd(f, position); - count = 0; - gap++; - lastEnd = position; return f; - } else { //discard frame if it is smaller than minSize - removeWindow(fIndex); + } + } else { + if (gap == 0) { + if (count >= minSize) { //close frame with first tuple that is below the threshold + shiftEnd(f, position); + count = 0; + gap++; + lastEnd = position; + return f; + } else { //discard frame if it is smaller than minSize + removeWindow(fIndex); + count = 0; + gap++; + } + } else { //tuple that is not included in any frame count = 0; gap++; } - } else { //tuple that is not included in any frame - count = 0; - gap++; } } - } - return null; - } - - @Override - public ActiveWindow updateContextWindows(Object o, long position, ArrayList listOfTs) { - //tuple is out-of-order - int value; - if(o instanceof Tuple){ - value = (int)((Tuple) o).getField(attribute); - }else{ - value = (int)o; - } - - int fIndex = getFrame(position); + } else { + //processes out-of-order tuples + timestamps.add(position); + Collections.sort(timestamps); - if(value > threshold){ - if((fIndex+1) < numberOfActiveWindows()) { //frame after this one exists - int index = listOfTs.indexOf(position); - long timestampAfter = (long) listOfTs.get(index + 1); - ActiveWindow fNext = getWindow(fIndex + 1); - if (timestampAfter == fNext.getStart()) { //shift frame start of next window to current tuple - shiftStart(fNext, position); - return fNext; - } - //else: simple insert, changes nothing - } - //else: current frame is the last one, tuple belongs to current frame - }else{ - //value below or equal to threshold - ActiveWindow f = getWindow(fIndex); - int index = listOfTs.indexOf(position); - long timestampAfter = (long) listOfTs.get(index + 1); - long timestampBefore = (long) listOfTs.get(index - 1); - long last_ts = f.getEnd(); - - if(fIndex+1 == numberOfActiveWindows()){ //if current frame is last frame - shiftEndAndModify(f, position); - if (timestampAfter == last_ts) { //shift frame end - return f; - } else { //split frame - return addNewWindow(fIndex + 1, timestampAfter, last_ts); + if(value > threshold){ + if((fIndex+1) < numberOfActiveWindows()) { //frame after this one exists + int index = timestamps.indexOf(position); + long timestampAfter = (long) timestamps.get(index + 1); + ActiveWindow fNext = getWindow(fIndex + 1); + if (timestampAfter == fNext.getStart()) { //shift frame start of next window to current tuple + shiftStart(fNext, position); + return fNext; + } + //else: simple insert, changes nothing } - }else if((fIndex+1) < numberOfActiveWindows()) { //current frame is not the last frame - ActiveWindow fNext = getWindow(fIndex + 1); - if (timestampAfter != fNext.getStart() && timestampBefore < last_ts) { - //do not shift start of next frame, but shift end + //else: current frame is the last one, tuple belongs to current frame + }else{ + //value below or equal to threshold + ActiveWindow f = getWindow(fIndex); + int index = timestamps.indexOf(position); + long timestampAfter = (long) timestamps.get(index + 1); + long timestampBefore = (long) timestamps.get(index - 1); + long last_ts = f.getEnd(); + + if(fIndex+1 == numberOfActiveWindows()){ //if current frame is last frame shiftEndAndModify(f, position); if (timestampAfter == last_ts) { //shift frame end return f; } else { //split frame return addNewWindow(fIndex + 1, timestampAfter, last_ts); } + }else if((fIndex+1) < numberOfActiveWindows()) { //current frame is not the last frame + ActiveWindow fNext = getWindow(fIndex + 1); + if (timestampAfter != fNext.getStart() && timestampBefore < last_ts) { + //do not shift start of next frame, but shift end + shiftEndAndModify(f, position); + if (timestampAfter == last_ts) { //shift frame end + return f; + } else { //split frame + return addNewWindow(fIndex + 1, timestampAfter, last_ts); + } + } } } } diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java index 627c6069..e1db3a9e 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java @@ -75,13 +75,6 @@ public final ActiveWindow updateContext(Tuple tuple, long position, Set listOfTs); //For out-of-order processing - - public ActiveWindow updateContextWindows(Tuple element, long ts, ArrayList listOfTs, Set windowModifications) { - this.modifiedWindowEdges = windowModifications; - return updateContextWindows(element, ts, listOfTs); - } - public abstract long assignNextWindowStart(long position); public abstract void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark); diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index b32238c3..df72f84c 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -12,7 +12,6 @@ public class SliceManager { private final SliceFactory sliceFactory; private final AggregationStore aggregationStore; private final WindowManager windowManager; - public ArrayList listOfTs = new ArrayList(); public SliceManager(final SliceFactory sliceFactory, final AggregationStore aggregationStore, final WindowManager windowManager) { this.sliceFactory = sliceFactory; @@ -53,7 +52,6 @@ public void processElement(InputType element, long ts) { } final Slice currentSlice = this.aggregationStore.getCurrentSlice(); - listOfTs.add(ts); // is element in order? if (ts >= currentSlice.getTLast()) { // in order @@ -68,12 +66,7 @@ public void processElement(InputType element, long ts) { for (WindowContext windowContext : this.windowManager.getContextAwareWindows()) { Set windowModifications = new HashSet<>(); - if (!(windowContext instanceof ThresholdFrame.ThresholdFrameContext)){ - WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); - }else { - Collections.sort(listOfTs); - WindowContext.ActiveWindow assignedWindow = windowContext.updateContextWindows(element, ts, listOfTs, windowModifications); - } + WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); checkSliceEdges(windowModifications); } From 8bbc677c2e8a60b7e353a9b61a5e1cd9ddf86604 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 26 Sep 2022 16:29:56 +0200 Subject: [PATCH 3/3] add test and fix in out-of-order processing --- .../core/windowType/ThresholdFrame.java | 24 ++++++------------ .../test/windowTest/ThresholdTest.java | 25 ++++++++++++++++++- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java index e6c4062d..42c2a61c 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/ThresholdFrame.java @@ -132,32 +132,24 @@ public ActiveWindow updateContext(Object o, long position) { //else: simple insert, changes nothing } //else: current frame is the last one, tuple belongs to current frame - }else{ + }else { //value below or equal to threshold ActiveWindow f = getWindow(fIndex); int index = timestamps.indexOf(position); long timestampAfter = (long) timestamps.get(index + 1); long timestampBefore = (long) timestamps.get(index - 1); long last_ts = f.getEnd(); + long nextStart = -1; + + if((fIndex+1) < numberOfActiveWindows()) { // current frame is not the last frame + nextStart = getWindow(fIndex + 1).getStart(); // get start of next frame + } - if(fIndex+1 == numberOfActiveWindows()){ //if current frame is last frame + if (timestampBefore < last_ts && timestampAfter != nextStart) { shiftEndAndModify(f, position); - if (timestampAfter == last_ts) { //shift frame end - return f; - } else { //split frame + if (timestampAfter != last_ts) { //shift frame end return addNewWindow(fIndex + 1, timestampAfter, last_ts); } - }else if((fIndex+1) < numberOfActiveWindows()) { //current frame is not the last frame - ActiveWindow fNext = getWindow(fIndex + 1); - if (timestampAfter != fNext.getStart() && timestampBefore < last_ts) { - //do not shift start of next frame, but shift end - shiftEndAndModify(f, position); - if (timestampAfter == last_ts) { //shift frame end - return f; - } else { //split frame - return addNewWindow(fIndex + 1, timestampAfter, last_ts); - } - } } } } diff --git a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java index e209fde2..b84e2caf 100644 --- a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java +++ b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/ThresholdTest.java @@ -143,7 +143,7 @@ public void noInsertIntoFrame() { } @Test - public void NoShift() { + public void NoShiftAtStartOfFrame() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); @@ -166,6 +166,29 @@ public void NoShift() { WindowAssert.assertEquals(resultWindows.get(1),12, 14, 12); } + @Test + public void noShiftSingleFrame() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new ThresholdFrame(3)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(2, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(5, 4); //begin of frame because 5>3 + slicingWindowOperator.processElement(4, 5); + slicingWindowOperator.processElement(7, 6); + slicingWindowOperator.processElement(2, 8); //end of frame because 2<3 + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(2, 12); + slicingWindowOperator.processElement(3, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(2, 11); + + List resultWindows = slicingWindowOperator.processWatermark(20); + WindowAssert.assertEquals(resultWindows.get(0),4, 8, 16); + } + @Test public void shiftFrameStart() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element);