diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 8a2d6768cc96..1a175381e8f1 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -729,6 +729,7 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) { } else { switch (jsonValue.getNodeType()) { case NULL: + case MISSING: // Special case. With no schema return null; case BOOLEAN: @@ -751,7 +752,6 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) { break; case BINARY: - case MISSING: case POJO: default: schemaType = null; diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 2e189e2d584a..a1ac71d4c8d5 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -195,6 +195,36 @@ public void nullToConnect() { assertEquals(SchemaAndValue.NULL, converted); } + /** + * When schemas are disabled, empty data should be decoded to an empty envelope. + * This test verifies the case where `schemas.enable` configuration is set to false, and + * {@link JsonConverter} converts empty bytes to {@link SchemaAndValue#NULL}. + */ + @Test + public void emptyBytesToConnect() { + // This characterizes the messages with empty data when Json schemas is disabled + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + SchemaAndValue converted = converter.toConnectData(TOPIC, "".getBytes()); + assertEquals(SchemaAndValue.NULL, converted); + } + + /** + * When schemas are disabled, fields are mapped to Connect maps. + */ + @Test + public void schemalessWithEmptyFieldValueToConnect() { + // This characterizes the messages with empty data when Json schemas is disabled + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + String input = "{ \"a\": \"\", \"b\": null}"; + SchemaAndValue converted = converter.toConnectData(TOPIC, input.getBytes()); + Map expected = new HashMap<>(); + expected.put("a", ""); + expected.put("b", null); + assertEquals(new SchemaAndValue(null, expected), converted); + } + @Test public void nullSchemaPrimitiveToConnect() { SchemaAndValue converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());