From 4b619f05b0aa4d95f3e70fffdbba0c2368cdf5f1 Mon Sep 17 00:00:00 2001 From: Geser Dugarov Date: Mon, 21 Oct 2024 19:07:26 +0700 Subject: [PATCH] [HUDI-8370] Removed excessive `DataBucket::preWrite` (#12104) --- .../apache/hudi/sink/StreamWriteFunction.java | 47 +++++-------------- .../ConsistentBucketStreamWriteFunction.java | 5 +- 2 files changed, 13 insertions(+), 39 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 57fb24a413c7..48dd225ecb97 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -19,11 +19,8 @@ package org.apache.hudi.sink; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordMerger; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.ObjectSizeCalculator; @@ -218,33 +215,18 @@ private void initMergeClass() { protected static class DataBucket { private final List records; private final BufferSizeDetector detector; - private final String partitionPath; - private final String fileID; - private DataBucket(Double batchSize, HoodieRecord hoodieRecord) { + private DataBucket(Double batchSize) { this.records = new ArrayList<>(); this.detector = new BufferSizeDetector(batchSize); - this.partitionPath = hoodieRecord.getPartitionPath(); - this.fileID = hoodieRecord.getCurrentLocation().getFileId(); } public List getRecords() { return records; } - /** - * Sets up before flush: patch up the first record with correct partition path and fileID. - * - *

Note: the method may modify the given records {@code records}. - */ - public void preWrite(List records) { - // rewrite the first record with expected fileID - HoodieRecord first = records.get(0); - HoodieRecord record = new HoodieAvroRecord<>(first.getKey(), (HoodieRecordPayload) first.getData(), first.getOperation()); - HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID); - record.setCurrentLocation(newLoc); - - records.set(0, record); + public boolean isEmpty() { + return records.isEmpty(); } public void reset() { @@ -351,7 +333,7 @@ protected void bufferRecord(HoodieRecord value) { final String bucketID = getBucketID(value); DataBucket bucket = this.buckets.computeIfAbsent(bucketID, - k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value)); + k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE))); bucket.records.add(value); boolean flushBucket = bucket.detector.detect(value); @@ -392,11 +374,8 @@ private boolean flushBucket(DataBucket bucket) { return false; } - List records = bucket.getRecords(); - ValidationUtils.checkState(!records.isEmpty(), "Data bucket to flush has no buffering records"); - records = deduplicateRecordsIfNeeded(records); - final List writeStatus = writeBucket(instant, bucket, records); - records.clear(); + ValidationUtils.checkState(!bucket.isEmpty(), "Data bucket to flush has no buffering records"); + final List writeStatus = writeRecords(instant, bucket.getRecords()); final WriteMetadataEvent event = WriteMetadataEvent.builder() .taskID(taskID) .instantTime(instant) // the write instant may shift but the event still use the currentInstant. @@ -425,11 +404,8 @@ private void flushRemaining(boolean endInput) { // The records are partitioned by the bucket ID and each batch sent to // the writer belongs to one bucket. .forEach(bucket -> { - List records = bucket.getRecords(); - if (!records.isEmpty()) { - records = deduplicateRecordsIfNeeded(records); - writeStatus.addAll(writeBucket(currentInstant, bucket, records)); - records.clear(); + if (!bucket.isEmpty()) { + writeStatus.addAll(writeRecords(currentInstant, bucket.getRecords())); bucket.reset(); } }); @@ -463,16 +439,15 @@ private void registerMetrics() { writeMetrics.registerMetrics(); } - protected List writeBucket(String instant, DataBucket bucket, List records) { - bucket.preWrite(records); + protected List writeRecords(String instant, List records) { writeMetrics.startFileFlush(); - List statuses = writeFunction.apply(records, instant); + List statuses = writeFunction.apply(deduplicateRecordsIfNeeded(records), instant); writeMetrics.endFileFlush(); writeMetrics.increaseNumOfFilesWritten(); return statuses; } - private List deduplicateRecordsIfNeeded(List records) { + protected List deduplicateRecordsIfNeeded(List records) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { return FlinkWriteHelper.newInstance() .deduplicateRecords(records, null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), recordMerger); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java index 9372d70b68e9..aa5405e2aeab 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java @@ -71,11 +71,10 @@ public void snapshotState() { } @Override - protected List writeBucket(String instant, DataBucket bucket, List records) { + protected List writeRecords(String instant, List records) { updateStrategy.initialize(this.writeClient); - bucket.preWrite(records); Pair, String>>, Set> recordListFgPair = - updateStrategy.handleUpdate(Collections.singletonList(Pair.of(records, instant))); + updateStrategy.handleUpdate(Collections.singletonList(Pair.of(deduplicateRecordsIfNeeded(records), instant))); return recordListFgPair.getKey().stream().flatMap( recordsInstantPair -> writeFunction.apply(recordsInstantPair.getLeft(), recordsInstantPair.getRight()).stream() ).collect(Collectors.toList());