Skip to content

Commit

Permalink
[Hotfix][CDC] Fix thread-unsafe collection container in cdc enumerator (
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Oct 17, 2023
1 parent 8c569b1 commit b2f70fd
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;

/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
Expand All @@ -47,12 +52,12 @@ public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssig

private final C sourceConfig;
private final List<TableId> alreadyProcessedTables;
private final List<SnapshotSplit> remainingSplits;
private final Queue<SnapshotSplit> remainingSplits;
private final Map<String, SnapshotSplit> assignedSplits;
private final Map<String, SnapshotSplitWatermark> splitCompletedOffsets;
private boolean assignerCompleted;
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
private final Deque<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;

private ChunkSplitter chunkSplitter;
Expand Down Expand Up @@ -115,12 +120,12 @@ private SnapshotSplitAssigner(
this.context = context;
this.sourceConfig = context.getSourceConfig();
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
this.splitCompletedOffsets = splitCompletedOffsets;
this.alreadyProcessedTables = Collections.synchronizedList(alreadyProcessedTables);
this.remainingSplits = new ConcurrentLinkedQueue(remainingSplits);
this.assignedSplits = new ConcurrentHashMap<>(assignedSplits);
this.splitCompletedOffsets = new ConcurrentHashMap<>(splitCompletedOffsets);
this.assignerCompleted = assignerCompleted;
this.remainingTables = new LinkedList<>(remainingTables);
this.remainingTables = new ConcurrentLinkedDeque<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.dialect = dialect;
Expand Down Expand Up @@ -211,11 +216,15 @@ public SnapshotPhaseState snapshotState(long checkpointId) {
SnapshotPhaseState state =
new SnapshotPhaseState(
alreadyProcessedTables,
remainingSplits,
remainingSplits.isEmpty()
? Collections.emptyList()
: new ArrayList<>(remainingSplits),
assignedSplits,
splitCompletedOffsets,
assignerCompleted,
remainingTables,
remainingTables.isEmpty()
? Collections.emptyList()
: new ArrayList<>(remainingTables),
isTableIdCaseSensitive,
true);
// we need a complete checkpoint before mark this assigner to be completed, to wait for all
Expand Down

0 comments on commit b2f70fd

Please sign in to comment.