Skip to content

Commit

Permalink
[HUDI-8370] Removed excessive DataBucket::preWrite (apache#12104)
Browse files Browse the repository at this point in the history
  • Loading branch information
geserdugarov authored Oct 21, 2024
1 parent 5ccb19b commit 4b619f0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
package org.apache.hudi.sink;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
Expand Down Expand Up @@ -218,33 +215,18 @@ private void initMergeClass() {
protected static class DataBucket {
private final List<HoodieRecord> records;
private final BufferSizeDetector detector;
private final String partitionPath;
private final String fileID;

private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
private DataBucket(Double batchSize) {
this.records = new ArrayList<>();
this.detector = new BufferSizeDetector(batchSize);
this.partitionPath = hoodieRecord.getPartitionPath();
this.fileID = hoodieRecord.getCurrentLocation().getFileId();
}

public List<HoodieRecord> getRecords() {
return records;
}

/**
* Sets up before flush: patch up the first record with correct partition path and fileID.
*
* <p>Note: the method may modify the given records {@code records}.
*/
public void preWrite(List<HoodieRecord> records) {
// rewrite the first record with expected fileID
HoodieRecord<?> first = records.get(0);
HoodieRecord<?> record = new HoodieAvroRecord<>(first.getKey(), (HoodieRecordPayload) first.getData(), first.getOperation());
HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
record.setCurrentLocation(newLoc);

records.set(0, record);
public boolean isEmpty() {
return records.isEmpty();
}

public void reset() {
Expand Down Expand Up @@ -351,7 +333,7 @@ protected void bufferRecord(HoodieRecord<?> value) {
final String bucketID = getBucketID(value);

DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
bucket.records.add(value);

boolean flushBucket = bucket.detector.detect(value);
Expand Down Expand Up @@ -392,11 +374,8 @@ private boolean flushBucket(DataBucket bucket) {
return false;
}

List<HoodieRecord> records = bucket.getRecords();
ValidationUtils.checkState(!records.isEmpty(), "Data bucket to flush has no buffering records");
records = deduplicateRecordsIfNeeded(records);
final List<WriteStatus> writeStatus = writeBucket(instant, bucket, records);
records.clear();
ValidationUtils.checkState(!bucket.isEmpty(), "Data bucket to flush has no buffering records");
final List<WriteStatus> writeStatus = writeRecords(instant, bucket.getRecords());
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
Expand Down Expand Up @@ -425,11 +404,8 @@ private void flushRemaining(boolean endInput) {
// The records are partitioned by the bucket ID and each batch sent to
// the writer belongs to one bucket.
.forEach(bucket -> {
List<HoodieRecord> records = bucket.getRecords();
if (!records.isEmpty()) {
records = deduplicateRecordsIfNeeded(records);
writeStatus.addAll(writeBucket(currentInstant, bucket, records));
records.clear();
if (!bucket.isEmpty()) {
writeStatus.addAll(writeRecords(currentInstant, bucket.getRecords()));
bucket.reset();
}
});
Expand Down Expand Up @@ -463,16 +439,15 @@ private void registerMetrics() {
writeMetrics.registerMetrics();
}

protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, List<HoodieRecord> records) {
bucket.preWrite(records);
protected List<WriteStatus> writeRecords(String instant, List<HoodieRecord> records) {
writeMetrics.startFileFlush();
List<WriteStatus> statuses = writeFunction.apply(records, instant);
List<WriteStatus> statuses = writeFunction.apply(deduplicateRecordsIfNeeded(records), instant);
writeMetrics.endFileFlush();
writeMetrics.increaseNumOfFilesWritten();
return statuses;
}

private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> records) {
protected List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> records) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
return FlinkWriteHelper.newInstance()
.deduplicateRecords(records, null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), recordMerger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ public void snapshotState() {
}

@Override
protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, List<HoodieRecord> records) {
protected List<WriteStatus> writeRecords(String instant, List<HoodieRecord> records) {
updateStrategy.initialize(this.writeClient);
bucket.preWrite(records);
Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> recordListFgPair =
updateStrategy.handleUpdate(Collections.singletonList(Pair.of(records, instant)));
updateStrategy.handleUpdate(Collections.singletonList(Pair.of(deduplicateRecordsIfNeeded(records), instant)));
return recordListFgPair.getKey().stream().flatMap(
recordsInstantPair -> writeFunction.apply(recordsInstantPair.getLeft(), recordsInstantPair.getRight()).stream()
).collect(Collectors.toList());
Expand Down

0 comments on commit 4b619f0

Please sign in to comment.