Skip to content

Commit

Permalink
[Improve][Connector[File] Optimize files commit order (#5045)
Browse files Browse the repository at this point in the history
Before using `HashMap` store files path, so every checkpoint file commit is out of order.

Now switch to using `LinkedHashMap` to ensure that files are commit in the generated order
  • Loading branch information
hailin0 authored Jul 24, 2023
1 parent 973d045 commit 1e18a8c
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> {
private final WriteStrategy writeStrategy;
protected final WriteStrategy writeStrategy;
private final FileSystemUtils fileSystemUtils;

@SuppressWarnings("checkstyle:MagicNumber")
Expand All @@ -67,7 +67,7 @@ public BaseFileSinkWriter(
List<String> transactions = findTransactionList(jobId, uuidPrefix);
FileSinkAggregatedCommitter fileSinkAggregatedCommitter =
new FileSinkAggregatedCommitter(fileSystemUtils);
HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
LinkedHashMap<String, FileSinkState> fileStatesMap = new LinkedHashMap<>();
fileSinkStates.forEach(
fileSinkState ->
fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import lombok.Data;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
Expand All @@ -34,7 +34,7 @@ public class FileAggregatedCommitInfo implements Serializable {
*
* <p>V is the target file path of the data file.
*/
private final Map<String, Map<String, String>> transactionMap;
private final LinkedHashMap<String, LinkedHashMap<String, String>> transactionMap;

/**
* Storage the partition information in map.
Expand All @@ -43,5 +43,5 @@ public class FileAggregatedCommitInfo implements Serializable {
*
* <p>V is the list of partition column's values.
*/
private final Map<String, List<String>> partitionDirAndValuesMap;
private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import lombok.Data;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
Expand All @@ -34,7 +34,7 @@ public class FileCommitInfo implements Serializable {
*
* <p>V is the target file path of the data file.
*/
private final Map<String, String> needMoveFiles;
private final LinkedHashMap<String, String> needMoveFiles;

/**
* Storage the partition information in map.
Expand All @@ -43,7 +43,7 @@ public class FileCommitInfo implements Serializable {
*
* <p>V is the list of partition column's values.
*/
private final Map<String, List<String>> partitionDirAndValuesMap;
private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;

/** Storage the transaction directory */
private final String transactionDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -44,7 +44,7 @@ public List<FileAggregatedCommitInfo> commit(
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
for (Map.Entry<String, Map<String, String>> entry :
for (Map.Entry<String, LinkedHashMap<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
Expand Down Expand Up @@ -77,13 +77,14 @@ public FileAggregatedCommitInfo combine(List<FileCommitInfo> commitInfos) {
if (commitInfos == null || commitInfos.size() == 0) {
return null;
}
Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>();
LinkedHashMap<String, LinkedHashMap<String, String>> aggregateCommitInfo =
new LinkedHashMap<>();
LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new LinkedHashMap<>();
commitInfos.forEach(
commitInfo -> {
Map<String, String> needMoveFileMap =
LinkedHashMap<String, String> needMoveFileMap =
aggregateCommitInfo.computeIfAbsent(
commitInfo.getTransactionDir(), k -> new HashMap<>());
commitInfo.getTransactionDir(), k -> new LinkedHashMap<>());
needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
if (commitInfo.getPartitionDirAndValuesMap() != null
&& !commitInfo.getPartitionDirAndValuesMap().isEmpty()) {
Expand All @@ -109,7 +110,7 @@ public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws E
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
for (Map.Entry<String, Map<String, String>> entry :
for (Map.Entry<String, LinkedHashMap<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
// rollback the file
for (Map.Entry<String, String> mvFileEntry :
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import lombok.Data;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
public class FileSinkState implements Serializable {
private final String transactionId;
private final String uuidPrefix;
private final Long checkpointId;
private final Map<String, String> needMoveFiles;
private final Map<String, List<String>> partitionDirAndValuesMap;
private final LinkedHashMap<String, String> needMoveFiles;
private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
private final String transactionDir;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -76,9 +77,9 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
protected String uuidPrefix;

protected String transactionDirectory;
protected Map<String, String> needMoveFiles;
protected Map<String, String> beingWrittenFile = new HashMap<>();
private Map<String, List<String>> partitionDirAndValuesMap;
protected LinkedHashMap<String, String> needMoveFiles;
protected LinkedHashMap<String, String> beingWrittenFile = new LinkedHashMap<>();
private LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
protected SeaTunnelRowType seaTunnelRowType;

// Checkpoint id from engine is start with 1
Expand Down Expand Up @@ -111,13 +112,18 @@ public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIn
@Override
public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException {
if (currentBatchSize >= batchSize) {
this.partId++;
newFilePart();
currentBatchSize = 0;
beingWrittenFile.clear();
}
currentBatchSize++;
}

public synchronized void newFilePart() {
this.partId++;
beingWrittenFile.clear();
log.debug("new file part: {}", partId);
}

protected SeaTunnelRowType buildSchemaWithRowType(
SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) {
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
Expand Down Expand Up @@ -177,9 +183,9 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
* @return the map of partition directory
*/
@Override
public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
public LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
List<Integer> partitionFieldsIndexInRow = fileSinkConfig.getPartitionFieldsIndexInRow();
Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new LinkedHashMap<>(1);
if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
partitionDirAndValuesMap.put(BaseSinkConfig.NON_PARTITION, null);
return partitionDirAndValuesMap;
Expand Down Expand Up @@ -258,12 +264,15 @@ public String generateFileName(String transactionId) {
@Override
public Optional<FileCommitInfo> prepareCommit() {
this.finishAndCloseFile();
Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
Map<String, List<String>> copyMap =
LinkedHashMap<String, String> commitMap = new LinkedHashMap<>(this.needMoveFiles);
LinkedHashMap<String, List<String>> copyMap =
this.partitionDirAndValuesMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
Map.Entry::getKey,
e -> new ArrayList<>(e.getValue()),
(e1, e2) -> e1,
LinkedHashMap::new));
return Optional.of(new FileCommitInfo(commitMap, copyMap, transactionDirectory));
}

Expand Down Expand Up @@ -301,8 +310,8 @@ public void beginTransaction(Long checkpointId) {
this.checkpointId = checkpointId;
this.transactionId = getTransactionId(checkpointId);
this.transactionDirectory = getTransactionDir(this.transactionId);
this.needMoveFiles = new HashMap<>();
this.partitionDirAndValuesMap = new HashMap<>();
this.needMoveFiles = new LinkedHashMap<>();
this.partitionDirAndValuesMap = new LinkedHashMap<>();
}

private String getTransactionId(Long checkpointId) {
Expand All @@ -325,18 +334,21 @@ private String getTransactionId(Long checkpointId) {
*/
@Override
public List<FileSinkState> snapshotState(long checkpointId) {
Map<String, List<String>> commitMap =
LinkedHashMap<String, List<String>> commitMap =
this.partitionDirAndValuesMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> new ArrayList<>(e.getValue())));
Map.Entry::getKey,
e -> new ArrayList<>(e.getValue()),
(e1, e2) -> e1,
LinkedHashMap::new));
ArrayList<FileSinkState> fileState =
Lists.newArrayList(
new FileSinkState(
this.transactionId,
this.uuidPrefix,
this.checkpointId,
new HashMap<>(this.needMoveFiles),
new LinkedHashMap<>(this.needMoveFiles),
commitMap,
this.getTransactionDir(transactionId)));
this.beingWrittenFile.clear();
Expand All @@ -363,7 +375,7 @@ public static String getTransactionDirPrefix(String tmpPath, String jobId, Strin
}

public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow) {
Map<String, List<String>> dataPartitionDirAndValuesMap =
LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap =
generatorPartitionDir(seaTunnelRow);
String beingWrittenFileKey = dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
// get filePath from beingWrittenFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
import lombok.NonNull;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.LinkedHashMap;

public class ExcelWriteStrategy extends AbstractWriteStrategy {
private final Map<String, ExcelGenerator> beingWrittenWriter;
private final LinkedHashMap<String, ExcelGenerator> beingWrittenWriter;

public ExcelWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
this.beingWrittenWriter = new HashMap<>();
this.beingWrittenWriter = new LinkedHashMap<>();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

public class JsonWriteStrategy extends AbstractWriteStrategy {
private final byte[] rowDelimiter;
private SerializationSchema serializationSchema;
private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
private final LinkedHashMap<String, FSDataOutputStream> beingWrittenOutputStream;
private final Map<String, Boolean> isFirstWrite;

public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
this.beingWrittenOutputStream = new HashMap<>();
this.beingWrittenOutputStream = new LinkedHashMap<>();
this.isFirstWrite = new HashMap<>();
this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class OrcWriteStrategy extends AbstractWriteStrategy {
private final Map<String, Writer> beingWrittenWriter;
private final LinkedHashMap<String, Writer> beingWrittenWriter;

public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
this.beingWrittenWriter = new HashMap<>();
this.beingWrittenWriter = new LinkedHashMap<>();
}

@Override
Expand Down
Loading

0 comments on commit 1e18a8c

Please sign in to comment.