Skip to content

Commit

Permalink
[FLINK-29615][metrics] Retain active subtasks in TaskMetricStore when…
Browse files Browse the repository at this point in the history
… fetching metrics to accommodate dynamic scaling

This closes apache#21051
  • Loading branch information
X-czh authored and xintongsong committed Oct 21, 2022
1 parent d17ceaf commit f11c322
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ public synchronized void updateCurrentExecutionAttempts(Collection<JobDetails> j
Map<Integer, Integer> vertexAttempts =
jobRepresentativeAttempts.compute(
vertexId, (k, overwritten) -> new HashMap<>());
TaskMetricStore taskMetricStore = getTaskMetricStore(jobId, vertexId);
JobMetricStore jobMetricStore = this.jobs.get(jobId);
TaskMetricStore taskMetricStore =
jobMetricStore.getTaskMetricStore(vertexId);
// Retains current active subtasks to accommodate dynamic scaling
taskMetricStore.retainSubtasks(subtaskAttempts.keySet());
subtaskAttempts.forEach(
(subtaskIndex, attempts) -> {
// Updates representative attempts
Expand Down Expand Up @@ -498,6 +502,10 @@ public Map<Integer, SubtaskMetricStore> getAllSubtaskMetricStores() {
return unmodifiableMap(subtasks);
}

void retainSubtasks(Set<Integer> activeSubtasks) {
subtasks.keySet().retainAll(activeSubtasks);
}

private static TaskMetricStore unmodifiable(TaskMetricStore source) {
if (source == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,40 @@ void testMalformedNameHandling() {
assertThat(store.getJobs()).isEmpty();
}

@Test
void testTaskMetricStoreCleanup() {
MetricStore store = setupStore(new MetricStore());
assertThat(
store.getTaskMetricStore(JOB_ID.toString(), "taskid")
.getAllSubtaskMetricStores()
.keySet())
.containsExactlyInAnyOrderElementsOf(Arrays.asList(1, 8));

Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts =
Collections.singletonMap(
"taskid",
Collections.singletonMap(1, new CurrentAttempts(1, new HashSet<>())));
JobDetails jobDetail =
new JobDetails(
JOB_ID,
"jobname",
0,
0,
0,
JobStatus.RUNNING,
0,
new int[10],
1,
currentExecutionAttempts);
store.updateCurrentExecutionAttempts(Collections.singleton(jobDetail));

assertThat(
store.getTaskMetricStore(JOB_ID.toString(), "taskid")
.getAllSubtaskMetricStores()
.keySet())
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(1));
}

@Test
void testSubtaskMetricStoreCleanup() {
MetricStore store = setupStore(new MetricStore());
Expand Down

0 comments on commit f11c322

Please sign in to comment.