diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 09df47b44521..dd5ee628f54c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -811,6 +811,12 @@ public class HoodieWriteConfig extends HoodieConfig { */ public static final String WRITES_FILEID_ENCODING = "_hoodie.writes.fileid.encoding"; + public static final ConfigProperty IGNORE_FAILED = ConfigProperty + .key("hoodie.write.ignore.failed") + .defaultValue(false) + .sinceVersion("") + .withDocumentation("Flag to indicate whether to ignore any non exception error (e.g. writestatus error)."); + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -2788,6 +2794,13 @@ public int getSecondaryIndexParallelism() { return metadataConfig.getSecondaryIndexParallelism(); } + /** + * Whether to ignore the write failed. + */ + public boolean getIgnoreWriteFailed() { + return getBooleanOrDefault(IGNORE_FAILED); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); @@ -3330,6 +3343,11 @@ public Builder withWriteRecordPositionsEnabled(boolean shouldWriteRecordPosition return this; } + public Builder withWriteIgnoreFailed(boolean ignoreFailedWriteData) { + writeConfig.setValue(IGNORE_FAILED, String.valueOf(ignoreFailedWriteData)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index cb0c7dd283fa..b70415e06f70 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -499,6 +499,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props // for a single record writeStatus.markFailure(record, t, recordMetadata); LOG.error("Error writing record " + record, t); + ignoreWriteFailed(t); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 12406927ae61..93f4716fae7c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -170,6 +170,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props // for a single record writeStatus.markFailure(record, t, recordMetadata); LOG.error("Error writing record " + record, t); + ignoreWriteFailed(t); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 1bf6f6b01389..f1c1473dc64f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -304,6 +304,7 @@ private boolean writeRecord(HoodieRecord newRecord, Option comb HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + newRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath); writeStatus.markFailure(newRecord, failureEx, recordMetadata); + ignoreWriteFailed(failureEx); return false; } try { @@ -333,6 +334,7 @@ private boolean writeRecord(HoodieRecord newRecord, Option comb } catch (Exception e) { LOG.error("Error writing record " + newRecord, e); writeStatus.markFailure(newRecord, e, recordMetadata); + ignoreWriteFailed(e); } return false; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index a251a5f76144..9aad2ecdf2b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -174,6 +174,16 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props // NO_OP } + /** + * ignore failed write based on 'getIgnoreWriteFailed' config + * @param throwable + */ + protected void ignoreWriteFailed(Throwable throwable) { + if (!config.getIgnoreWriteFailed()) { + throw new HoodieException(throwable.getMessage(), throwable); + } + } + /** * Perform the actual writing of the given record into the backing file. */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index b08e814d15c5..dce64fce11b9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.storage.HoodieStorage; @@ -142,7 +143,11 @@ public void write(String recordKey, String partitionPath, RowData record) throws ? HoodieRecordDelegate.create(recordKey, partitionPath, null, newRecordLocation) : null; writeStatus.markSuccess(recordDelegate, Option.empty()); } catch (Throwable t) { + LOG.error("Failed to write : key is " + recordKey + ", data is " + rowData, t); writeStatus.markFailure(recordKey, partitionPath, t); + if (!writeConfig.getIgnoreWriteFailed()) { + throw new HoodieException(t.getMessage(), t); + } } } catch (Throwable ge) { writeStatus.setGlobalError(ge); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index cc7f5f6783ad..8d4a3f78b7e0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -439,6 +439,7 @@ private FlinkOptions() { .key("write.ignore.failed") .booleanType() .defaultValue(false) + .withFallbackKeys("hoodie.write.ignore.failed") .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. \n" + "By default false. Turning this on, could hide the write status errors while the flink checkpoint moves ahead. \n" + "So, would recommend users to use this with caution."); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java index e9d0310d4756..f25367392fe6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java @@ -225,7 +225,8 @@ public static HoodieWriteConfig getHoodieClientConfig( .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .withProps(flinkConf2TypedProperties(conf)) - .withSchema(getSourceSchema(conf).toString()); + .withSchema(getSourceSchema(conf).toString()) + .withWriteIgnoreFailed(conf.get(FlinkOptions.IGNORE_FAILED)); Option lockConfig = getLockConfig(conf); if (lockConfig.isPresent()) {