Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10477

Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1
    • Fix Version/s: 2.2.3, 2.3.2, 2.4.2, 2.7.0, 2.5.2, 2.6.1
    • Component/s: KafkaConnect
    • Labels:
      None

      Description

      Sink connector is facing a DataException when trying to convert a kafka record with empty key to Connect data format. 

      Kafka's trunk branch currently depends on jackson v2.10.5 

      A short unit test (shared below) in `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue.  

      @Test
          public void testToConnectDataEmptyKey() throws IOException {
              Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
              converter.configure(props, true);
              String str = "";
              SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", str.getBytes());
              System.out.println(schemaAndValue);
          }
      

      This test code snippet fails with the following exception:

      org.apache.kafka.connect.errors.DataException: Unknown schema type: null
      
      	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
      	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
      	at org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
      

       

      This seems related to the issue https://github.com/FasterXML/jackson-databind/issues/2211 , where jackson lib started returning `MissingNode` for empty input in `ObjectMapper.readTree(input)` method invocation. Precise code change can be observed here: https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094 

       

      This causes an exception to throw up in our JsonConverter class : https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764 

       

      In my opinion, when the `jsonValue.getNodeType()` is `MISSING` (https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754 ), we need to fall back to the behaviour of the case `NULL` (i.e. return null), although not sure of any further repercussions this might bring in.

       

      Things were working fine when the dependency on jackson lib was of version  v2.9.10.3 or lesser as the `ObjectMapper` returned null in that case.

       

      Thanks,

      Zakir

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                shaikzakir.iitm Shaik Zakir Hussain
                Reporter:
                shaikzakir.iitm Shaik Zakir Hussain
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: