Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8400] apply 'write.ignore.failed' when write data failed v2 #12150

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,12 @@ public class HoodieWriteConfig extends HoodieConfig {
*/
public static final String WRITES_FILEID_ENCODING = "_hoodie.writes.fileid.encoding";

public static final ConfigProperty<Boolean> IGNORE_FAILED = ConfigProperty
.key("hoodie.write.ignore.failed")
.defaultValue(false)
.sinceVersion("")
.withDocumentation("Flag to indicate whether to ignore any non exception error (e.g. writestatus error).");

private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;

Expand Down Expand Up @@ -2788,6 +2794,13 @@ public int getSecondaryIndexParallelism() {
return metadataConfig.getSecondaryIndexParallelism();
}

/**
* Whether to ignore the write failed.
*/
public boolean getIgnoreWriteFailed() {
return getBooleanOrDefault(IGNORE_FAILED);
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down Expand Up @@ -3330,6 +3343,11 @@ public Builder withWriteRecordPositionsEnabled(boolean shouldWriteRecordPosition
return this;
}

public Builder withWriteIgnoreFailed(boolean ignoreFailedWriteData) {
writeConfig.setValue(IGNORE_FAILED, String.valueOf(ignoreFailedWriteData));
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
// for a single record
writeStatus.markFailure(record, t, recordMetadata);
LOG.error("Error writing record " + record, t);
ignoreWriteFailed(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
// for a single record
writeStatus.markFailure(record, t, recordMetadata);
LOG.error("Error writing record " + record, t);
ignoreWriteFailed(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> comb
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
+ newRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
writeStatus.markFailure(newRecord, failureEx, recordMetadata);
ignoreWriteFailed(failureEx);
return false;
}
try {
Expand Down Expand Up @@ -333,6 +334,7 @@ private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> comb
} catch (Exception e) {
LOG.error("Error writing record " + newRecord, e);
writeStatus.markFailure(newRecord, e, recordMetadata);
ignoreWriteFailed(e);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
// NO_OP
}

/**
* ignore failed write based on 'getIgnoreWriteFailed' config
* @param throwable
*/
protected void ignoreWriteFailed(Throwable throwable) {
if (config.getIgnoreWriteFailed()) {
throw new HoodieException(throwable.getMessage(), throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we throw exception when 'ignore.write.failed' is true?

}
}

/**
* Perform the actual writing of the given record into the backing file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.storage.HoodieStorage;
Expand Down Expand Up @@ -142,7 +143,11 @@ public void write(String recordKey, String partitionPath, RowData record) throws
? HoodieRecordDelegate.create(recordKey, partitionPath, null, newRecordLocation) : null;
writeStatus.markSuccess(recordDelegate, Option.empty());
} catch (Throwable t) {
LOG.error("Failed to write : key is " + recordKey + ", data is " + rowData, t);
writeStatus.markFailure(recordKey, partitionPath, t);
if (!writeConfig.getIgnoreWriteFailed()) {
throw new HoodieException(t.getMessage(), t);
}
}
} catch (Throwable ge) {
writeStatus.setGlobalError(ge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ private FlinkOptions() {
.key("write.ignore.failed")
.booleanType()
.defaultValue(false)
.withFallbackKeys("hoodie.write.ignore.failed")
.withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. \n"
+ "By default false. Turning this on, could hide the write status errors while the flink checkpoint moves ahead. \n"
+ "So, would recommend users to use this with caution.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withAutoCommit(false)
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.withProps(flinkConf2TypedProperties(conf))
.withSchema(getSourceSchema(conf).toString());
.withSchema(getSourceSchema(conf).toString())
.withWriteIgnoreFailed(conf.get(FlinkOptions.IGNORE_FAILED));

Option<HoodieLockConfig> lockConfig = getLockConfig(conf);
if (lockConfig.isPresent()) {
Expand Down
Loading