Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slide-By-Tuple Window Implementation and Test #29

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark,
session = getWindow(0);
}
}


}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package de.tub.dima.scotty.core.windowType;

import de.tub.dima.scotty.core.WindowCollector;
import de.tub.dima.scotty.core.windowType.windowContext.WindowContext;

import java.util.ArrayList;
import java.util.Collections;

public class SlideByTupleWindow implements ForwardContextAware {

private final WindowMeasure measure;
private final long size;
private final long slide;

/**
* the measure of the Slide-by-tuple Window is time
* @param size size of the SlideByTuple window in time
* @param slide window slide step in tuple counts
*/
public SlideByTupleWindow(long size, long slide){
this.measure = WindowMeasure.Time;
this.size = size;
this.slide = slide;
}

public long getSize() {
return size;
}

public long getSlide() {
return slide;
}

@Override
public WindowMeasure getWindowMeasure() {
return measure;
}

@Override
public SlideByTupleContext createContext() {
return new SlideByTupleContext();
}

public class SlideByTupleContext extends WindowContext<Object> {

private long nextStart = 0; //start count of next window
private long count = 0; //tuple-counter
private ArrayList<Long> timestamps = new ArrayList<Long>(); //holds timestamps of tuples for out-of-order processing

@Override
public ActiveWindow updateContext(Object tuple, long position){

if (hasActiveWindows()) {
//append first window
addNewWindow(0, position, position + size);
count++;
nextStart += getSlide();
timestamps.add(position);
return getWindow(0);
}

if (position >= timestamps.get(timestamps.size() -1)) {
//processes in-order tuples
timestamps.add(position);
int windowIndex = getWindowIndex(position);

if (windowIndex == -1) {
addNewWindow(0, position, position + size);
count++;
nextStart += getSlide();

} else {
if (count == nextStart) { //new window starts
count++;
nextStart += getSlide();
return addNewWindow(windowIndex + 1, position, position + size);
} else {
ActiveWindow w = getWindow(windowIndex);
if (w.getEnd() > position) { //append to active window
count++;
return w;
} else {
/* Tuple, which does not belong to current window and where count != nextStart,
does not get included in any window instance */
count++;
}
}
}
} else {
//processes out-of-order tuples
timestamps.add(position);
Collections.sort(timestamps);
count++;
int windowIndex = getWindowIndex(position);

if(slide == 1){ // slide == 1: add new window for every new tuple
nextStart += getSlide();
return addNewWindow(windowIndex+1, position, position + getSize());
}

if (windowIndex+1 <= (numberOfActiveWindows()-1)) {
//subsequent windows have to be shifted, beginning from the next window, to which the tuples does not belong
shiftWindows(position, windowIndex);
return null;

} else {
//no subsequent windows exist, current tuple or some tuple before may start a new window
if (timestamps.size()-1 == nextStart){
// tuple that arrived before starts new window because of changed count
long positionOfNewWindow = timestamps.get((int)nextStart);
nextStart += getSlide();
return addNewWindow(windowIndex + 1, positionOfNewWindow, positionOfNewWindow + size);
} else if (timestamps.lastIndexOf(position) == nextStart) { // get count of current tuple
// current tuple starts a new window
nextStart += getSlide();
return addNewWindow(windowIndex + 1, position, position + size);
}
}
}
return null;
}

/**
* shifts all windows after the out-of-order tuple
* @param position of the out-of-order tuple
* @param windowIndex of the window the tuple belongs to
*/
private void shiftWindows(long position, int windowIndex){

for(int i = windowIndex+1; i <= (numberOfActiveWindows()-1); i++){
ActiveWindow w = getWindow(i);
int index = timestamps.indexOf(w.getStart());
long timestampBefore = timestamps.get(index-1);

if(position == timestampBefore){ //start of window has to be shifted to current tuple
// shift start of window and modify slice start, otherwise insertion into wrong slice
shiftStart(w,timestampBefore);
}else{
// shift start of window, split slice if necessary
shiftStartDontModifySlice(w,timestampBefore);
splitSlice(timestampBefore);
}

shiftEnd(w, timestampBefore + size);
splitSlice(timestampBefore + size);
}
}

public int getWindowIndex(long position) {
//returns newest window
int i = numberOfActiveWindows()-1;
for (; i >= 0 ; i--) {
ActiveWindow w = getWindow(i);
if (w.getStart() <= position) {
return i;
}
}
return -1;
}

@Override
public long assignNextWindowStart(long position) {
if (count == nextStart) {
// new window starts, create new slice
return position;
} else {
// return next window end
return getNextWindowEnd(position);
}
}

public long getNextWindowEnd(long position) {
//returns next window end after position
long nextWEnd = -1;
for (int i = numberOfActiveWindows() -1 ; i >= 0 ; i--) {
ActiveWindow w = getWindow(i);
if (position >= w.getEnd()) {
break;
}
nextWEnd = w.getEnd();
}
return nextWEnd;
}

@Override
public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark) {
ActiveWindow w = getWindow(0);
while (w.getEnd() <= currentWatermark) {
aggregateWindows.trigger(w.getStart(), w.getEnd() , measure);
removeWindow(0);
if (hasActiveWindows())
return;
w = getWindow(0);
}
}
}

@Override
public String toString() {
return "SlideByTupleWindow{" +
"measure=" + measure +
", size=" + size +
", slide=" + slide +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ public void shiftEnd(ActiveWindow window, long position) {
window.setEnd(position);
}

public void shiftStartDontModifySlice(ActiveWindow window, long position) {
// does not modify the start edge of the slice, only adjusts the window start
window.setStart(position);
}

public void splitSlice(long position) {
// adds a AddModification to split the slice if necessary, regardless of changes to the window
modifiedWindowEdges.add(new AddModification(position));
}

public abstract ActiveWindow updateContext(Tuple tuple, long position);

Expand Down Expand Up @@ -104,4 +113,4 @@ public int compareTo(ActiveWindow o) {
return Long.compare(this.start, o.start);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.tub.dima.scotty.slicing;

import de.tub.dima.scotty.core.windowType.SlideByTupleWindow;
import de.tub.dima.scotty.core.windowType.windowContext.*;
import de.tub.dima.scotty.slicing.aggregationstore.*;
import de.tub.dima.scotty.slicing.slice.*;
Expand Down
Loading