Skip to content

Commit

Permalink
[FLINK-35028][runtime] Timer firing under async execution model
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia committed Apr 22, 2024
1 parent 51d015b commit 0b2e988
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ public AsyncExecutionController(
* @return the built record context.
*/
public RecordContext<K> buildContext(Object record, K key) {
if (record == null) {
return new RecordContext<>(RecordContext.EMPTY_RECORD, key, this::disposeContext);
}
return new RecordContext<>(record, key, this::disposeContext);
}

Expand All @@ -144,7 +147,7 @@ public void setCurrentContext(RecordContext<K> switchingContext) {
*
* @param toDispose the context to dispose.
*/
public void disposeContext(RecordContext<K> toDispose) {
void disposeContext(RecordContext<K> toDispose) {
keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
inFlightRecordNum.decrementAndGet();
RecordContext<K> nextRecordCtx =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* @param <K> the type of key
*/
public class KeyAccountingUnit<K> {

/** The in-flight records that are being processed, their keys are different from each other. */
private final Map<K, Object> noConflictInFlightRecords;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
* @param <K> The type of the key inside the record.
*/
public class RecordContext<K> extends ReferenceCounted {

/** The empty record for timer and non-record input usage. */
static final Object EMPTY_RECORD = new Object();
/** The record to be processed. */
private final Object record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public abstract class AbstractStreamOperator<OUT>

private transient StreamOperatorStateHandler stateHandler;

private transient InternalTimeServiceManager<?> timeServiceManager;
protected transient InternalTimeServiceManager<?> timeServiceManager;

// --------------- Metrics ---------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public abstract class AbstractStreamOperatorV2<OUT>
protected final RecordAttributes[] lastRecordAttributes;

private StreamOperatorStateHandler stateHandler;
private InternalTimeServiceManager<?> timeServiceManager;
protected InternalTimeServiceManager<?> timeServiceManager;

public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
final Environment environment = parameters.getContainingTask().getEnvironment();
Expand Down Expand Up @@ -465,7 +465,6 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
* @param triggerable The {@link Triggerable} that should be invoked when timers fire
* @param <N> The type of the timer namespace.
*/
@VisibleForTesting
public <K, N> InternalTimerService<N> getInternalTimerService(
String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
if (timeServiceManager == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -51,6 +52,21 @@ <N> InternalTimerService<N> getInternalTimerService(
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable);

/**
* Creates an {@link InternalTimerServiceAsyncImpl} for handling a group of timers identified by
* the given {@code name}. The timers are scoped to a key and namespace. Mainly used by async
* operators.
*
* <p>Some essential order preservation will be added when the given {@link Triggerable} is
* invoked.
*/
<N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController);

/**
* Advances the Watermark of all managed {@link InternalTimerService timer services},
* potentially firing event time timers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -179,6 +180,54 @@ <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
return timerService;
}

@Override
public <N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController) {
checkNotNull(keySerializer, "Timers can only be used on keyed operators.");

// the following casting is to overcome type restrictions.
TimerSerializer<K, N> timerSerializer =
new TimerSerializer<>(keySerializer, namespaceSerializer);

InternalTimerServiceAsyncImpl<K, N> timerService =
registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController);

timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);

return timerService;
}

<N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService(
String name,
TimerSerializer<K, N> timerSerializer,
AsyncExecutionController<K> asyncExecutionController) {
InternalTimerServiceAsyncImpl<K, N> timerService =
(InternalTimerServiceAsyncImpl<K, N>) timerServices.get(name);
if (timerService == null) {
timerService =
new InternalTimerServiceAsyncImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext,
asyncExecutionController);

timerServices.put(name, timerService);
}
return timerService;
}

Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
return Collections.unmodifiableMap(timerServices);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;

/**
* An implementation of {@link InternalTimerService} that is used by {@link
* org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
* The timer service will set {@link RecordContext} for the timers before invoking action to
* preserve the execution order between timer firing and records processing.
*
* @see <a
* href=https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425
* timers section.</a>
* @param <K> Type of timer's key.
* @param <N> Type of the namespace to which timers are scoped.
*/
@Internal
public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImpl<K, N> {

private AsyncExecutionController<K> asyncExecutionController;

InternalTimerServiceAsyncImpl(
TaskIOMetricGroup taskIOMetricGroup,
KeyGroupRange localKeyGroupRange,
KeyContext keyContext,
ProcessingTimeService processingTimeService,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue,
StreamTaskCancellationContext cancellationContext,
AsyncExecutionController<K> asyncExecutionController) {
super(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
processingTimeTimersQueue,
eventTimeTimersQueue,
cancellationContext);
this.asyncExecutionController = asyncExecutionController;
}

@Override
void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;

InternalTimer<K, N> timer;

while ((timer = processingTimeTimersQueue.peek()) != null
&& timer.getTimestamp() <= time
&& !cancellationContext.isCancelled()) {
processingTimeTimersQueue.poll();
final InternalTimer<K, N> timerToTrigger = timer;
maintainContextAndProcess(
timerToTrigger, () -> triggerTarget.onProcessingTime(timerToTrigger));
taskIOMetricGroup.getNumFiredTimers().inc();
}

if (timer != null && nextTimer == null) {
nextTimer =
processingTimeService.registerTimer(
timer.getTimestamp(), this::onProcessingTime);
}
}

/**
* Advance one watermark, this will fire some event timers.
*
* @param time the time in watermark.
*/
@Override
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;

InternalTimer<K, N> timer;

while ((timer = eventTimeTimersQueue.peek()) != null
&& timer.getTimestamp() <= time
&& !cancellationContext.isCancelled()) {
eventTimeTimersQueue.poll();
final InternalTimer<K, N> timerToTrigger = timer;
maintainContextAndProcess(
timerToTrigger, () -> triggerTarget.onEventTime(timerToTrigger));
taskIOMetricGroup.getNumFiredTimers().inc();
}
}

/**
* Iterator each timer in the queue, and invoke the consumer. This function is mainly used by
* state-processor-API. TODO: Ensure state-processor-API that only uses sync state API.
*/
protected void foreachTimer(
BiConsumerWithException<N, Long, Exception> consumer,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue)
throws Exception {
throw new UnsupportedOperationException(
"Batch operation is not supported when using async state.");
}

private void maintainContextAndProcess(
InternalTimer<K, N> timer, ThrowingRunnable<Exception> runnable) {
RecordContext<K> recordCtx = asyncExecutionController.buildContext(null, timer.getKey());
recordCtx.retain();
asyncExecutionController.setCurrentContext(recordCtx);
keyContext.setCurrentKey(timer.getKey());
asyncExecutionController.syncPointRequestWithCallback(runnable);
recordCtx.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@
/** {@link InternalTimerService} that stores timers on the Java heap. */
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {

private final ProcessingTimeService processingTimeService;
protected final ProcessingTimeService processingTimeService;

private final TaskIOMetricGroup taskIOMetricGroup;
private final KeyContext keyContext;
protected final TaskIOMetricGroup taskIOMetricGroup;
protected final KeyContext keyContext;

/** Processing time timers that are currently in-flight. */
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>
protected final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>
processingTimeTimersQueue;

/** Event time timers that are currently in-flight. */
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>
protected final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>
eventTimeTimersQueue;

/** Context that allows us to stop firing timers if the containing task has been cancelled. */
private final StreamTaskCancellationContext cancellationContext;
protected final StreamTaskCancellationContext cancellationContext;

/** Information concerning the local key-group range. */
private final KeyGroupRange localKeyGroupRange;
Expand All @@ -69,21 +69,21 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
* The local event time, as denoted by the last received {@link
* org.apache.flink.streaming.api.watermark.Watermark Watermark}.
*/
private long currentWatermark = Long.MIN_VALUE;
protected long currentWatermark = Long.MIN_VALUE;

/**
* The one and only Future (if any) registered to execute the next {@link Triggerable} action,
* when its (processing) time arrives.
*/
private ScheduledFuture<?> nextTimer;
protected ScheduledFuture<?> nextTimer;

// Variables to be set when the service is started.

private TypeSerializer<K> keySerializer;

private TypeSerializer<N> namespaceSerializer;

private Triggerable<K, N> triggerTarget;
protected Triggerable<K, N> triggerTarget;

private volatile boolean isInitialized;

Expand Down Expand Up @@ -270,7 +270,7 @@ public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exceptio
foreachTimer(consumer, processingTimeTimersQueue);
}

private void foreachTimer(
protected void foreachTimer(
BiConsumerWithException<N, Long, Exception> consumer,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue)
throws Exception {
Expand All @@ -283,7 +283,7 @@ private void foreachTimer(
}
}

private void onProcessingTime(long time) throws Exception {
void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators.sorted.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -72,6 +73,17 @@ public <N> InternalTimerService<N> getInternalTimerService(
return timerService;
}

@Override
public <N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController) {
throw new UnsupportedOperationException(
"Async timer service is not supported in BATCH execution mode.");
}

@Override
public void advanceWatermark(Watermark watermark) {
if (watermark.getTimestamp() == Long.MAX_VALUE) {
Expand Down
Loading

0 comments on commit 0b2e988

Please sign in to comment.