From ca559a2dcf1a2a29f635401111c743b5da1bd81d Mon Sep 17 00:00:00 2001 From: Shaik Zakir Hussain Date: Fri, 2 Oct 2020 19:51:11 +0530 Subject: [PATCH] KAFKA-10477: Fix JsonConverter regression to treat MISSING nodes as NULL nodes (#9306) Fixes a regression introduced in `JsonConverter` with previous upgrades from Jackson Databind 2.9.x to 2.10.x. Jackson Databind version 2.10.0 included a backward-incompatible behavioral change to use `JsonNodeType.MISSING` (and `MissingNode`, the subclass of `JsonNode` that has a type of `MISSING`) instead of `JsonNodeType.NULL` / `NullNode`. See https://github.com/FasterXML/jackson-databind/issues/2211 for details of this change. This change makes recovers the older `JsonConverter` behavior of returning null on empty input. Added two unit tests for this change. Both unit tests were independently tested with earlier released versions and passed on all versions that used Jackson 2.9.x and earlier, and failed on all versions that used 2.10.x and that did not have the fixed included in the PR. Both of the new unit tests pass with this fix to `JsonConverter`. Author: Shaik Zakir Hussain Reviewer: Randall Hauch --- .../kafka/connect/json/JsonConverter.java | 2 +- .../kafka/connect/json/JsonConverterTest.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 8a2d6768cc96..1a175381e8f1 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -729,6 +729,7 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) { } else { switch (jsonValue.getNodeType()) { case NULL: + case MISSING: // Special case. With no schema return null; case BOOLEAN: @@ -751,7 +752,6 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) { break; case BINARY: - case MISSING: case POJO: default: schemaType = null; diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 2e189e2d584a..a1ac71d4c8d5 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -195,6 +195,36 @@ public void nullToConnect() { assertEquals(SchemaAndValue.NULL, converted); } + /** + * When schemas are disabled, empty data should be decoded to an empty envelope. + * This test verifies the case where `schemas.enable` configuration is set to false, and + * {@link JsonConverter} converts empty bytes to {@link SchemaAndValue#NULL}. + */ + @Test + public void emptyBytesToConnect() { + // This characterizes the messages with empty data when Json schemas is disabled + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + SchemaAndValue converted = converter.toConnectData(TOPIC, "".getBytes()); + assertEquals(SchemaAndValue.NULL, converted); + } + + /** + * When schemas are disabled, fields are mapped to Connect maps. + */ + @Test + public void schemalessWithEmptyFieldValueToConnect() { + // This characterizes the messages with empty data when Json schemas is disabled + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + String input = "{ \"a\": \"\", \"b\": null}"; + SchemaAndValue converted = converter.toConnectData(TOPIC, input.getBytes()); + Map expected = new HashMap<>(); + expected.put("a", ""); + expected.put("b", null); + assertEquals(new SchemaAndValue(null, expected), converted); + } + @Test public void nullSchemaPrimitiveToConnect() { SchemaAndValue converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());