diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java index 76f4f677..7c341b9c 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java @@ -66,8 +66,7 @@ public class PulsarOptions { public static final String KEY_DISABLED_METRICS = "key-disable-metrics"; public static final String OLD_STATE_VERSION = "old-state-version"; public static final String FAIL_ON_DATA_LOSS_OPTION_KEY = "failOnDataLoss"; - public static final String USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY = - "use-earliest-when-data-loss"; + public static final String USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY = "useEarliestWhenDataLoss"; public static final String SEND_DELAY_MILLISECONDS = "send-delay-millisecond"; public static final String INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java index 9db11f1f..f7b10e0e 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java @@ -235,12 +235,13 @@ public static boolean getFailOnDataLossAndRemoveKey(Map readerCo } public static boolean getUseEarliestWhenDataLossAndRemoveKey(Map readerConf) { - String failOnDataLossVal = - readerConf - .getOrDefault(PulsarOptions.USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY, "false") - .toString(); + String key = PulsarOptions.USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY; + if (!readerConf.containsKey(key)) { + key = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_HYPHEN, key); + } + String failOnDataLossVal = readerConf.getOrDefault(key, "false").toString(); final boolean value = Boolean.parseBoolean(failOnDataLossVal); - readerConf.remove(PulsarOptions.USE_EARLIEST_WHEN_DATA_LOSS_OPTION_KEY); + readerConf.remove(key); return value; } }