Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherrypick lock backoff retry to 3.9 #18485

Merged
merged 23 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ synchronized boolean lock(final TaskId taskId) {
} else {
lockedTasksToOwner.put(taskId, Thread.currentThread());
// make sure the task directory actually exists, and create it if not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that this comment does not make sense anymore since the following line is removed. This also applies to trunk. Could you please remove the comment here and open a PR to remove the comment for trunk?

getOrCreateDirectoryForTask(taskId);
return true;
}
}
Expand Down Expand Up @@ -680,5 +679,4 @@ public int hashCode() {
return Objects.hash(file, namedTopology);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
Expand Down Expand Up @@ -104,6 +105,8 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();

private Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();

private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
private final StateUpdater stateUpdater;
Expand Down Expand Up @@ -1007,14 +1010,22 @@ private void addTasksToStateUpdater() {
}

private void addTaskToStateUpdater(final Task task) {
final long nowMs = time.milliseconds();
try {
task.initializeIfNeeded();
stateUpdater.add(task);
if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded();
taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task);
} else {
log.trace("Task {} is still not allowed to retry acquiring the state directory lock", task.id());
tasks.addPendingTasksToInit(Collections.singleton(task));
}
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
tasks.addPendingTasksToInit(Collections.singleton(task));
updateOrCreateBackoffRecord(task.id(), nowMs);
}
}

Expand Down Expand Up @@ -1770,7 +1781,6 @@ private Stream<Task> standbyTaskStream() {
return standbyTasksInTaskRegistry;
}
}

// For testing only.
int commitAll() {
return commit(tasks.allTasks());
Expand Down Expand Up @@ -2117,4 +2127,37 @@ boolean needsInitializationOrRestoration() {
void addTask(final Task task) {
tasks.addTask(task);
}

private boolean canTryInitializeTask(final TaskId taskId, final long nowMs) {
return !taskIdToBackoffRecord.containsKey(taskId) || taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
}

private void updateOrCreateBackoffRecord(final TaskId taskId, final long nowMs) {
if (taskIdToBackoffRecord.containsKey(taskId)) {
taskIdToBackoffRecord.get(taskId).recordAttempt(nowMs);
} else {
taskIdToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
}
}

public static class BackoffRecord {
private long attempts;
private long lastAttemptMs;
private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1000, 2, 10000, 0.5);


public BackoffRecord(final long nowMs) {
this.attempts = 1;
this.lastAttemptMs = nowMs;
}

public void recordAttempt(final long nowMs) {
this.attempts++;
this.lastAttemptMs = nowMs;
}

public boolean canAttempt(final long nowMs) {
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler.ProcessingHandlerResponse;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, final b
topologyMetadata.buildAndRewriteTopology();

final TaskManager taskManager = new TaskManager(
null,
new MockTime(),
changelogReader,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Answers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -203,8 +202,6 @@ public class TaskManagerTest {
private Admin adminClient;
@Mock
private ProcessorStateManager stateManager;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ProcessorStateManager.StateStoreMetadata stateStore;
final StateUpdater stateUpdater = mock(StateUpdater.class);
final DefaultTaskManager schedulingTaskManager = mock(DefaultTaskManager.class);

Expand Down Expand Up @@ -1247,6 +1244,54 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}

@Test
public void shouldRetryInitializationWithBackoffWhenInitializationFails() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

// task00 should not be initialized due to LockException, task01 should be initialized
verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(tasks).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
verify(stateUpdater, never()).add(task00);
verify(stateUpdater).add(task01);

time.sleep(500);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

// task00 should not be initialized since the backoff period has not passed
verify(task00, times(1)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater, never()).add(task00);

time.sleep(5000);

// task00 should call initialize since the backoff period has passed
doNothing().when(task00).initializeIfNeeded();
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

verify(task00, times(2)).initializeIfNeeded();
verify(tasks, times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
);
verify(stateUpdater).add(task00);
}

@Test
public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() {
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
Expand Down Expand Up @@ -1669,6 +1714,8 @@ public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() {
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
}

@Test
public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() {
final Map<TaskId, Set<TopicPartition>> activeTasksAssignment = mkMap(
mkEntry(taskId01, mkSet(t1p1)),
Expand Down