Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10477

Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1
    • 2.2.3, 2.3.2, 2.4.2, 2.7.0, 2.5.2, 2.6.1
    • KafkaConnect
    • None

    Description

      Sink connector is facing a DataException when trying to convert a kafka record with empty key to Connect data format. 

      Kafka's trunk branch currently depends on jackson v2.10.5 

      A short unit test (shared below) in `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue.  

      @Test
          public void testToConnectDataEmptyKey() throws IOException {
              Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
              converter.configure(props, true);
              String str = "";
              SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", str.getBytes());
              System.out.println(schemaAndValue);
          }
      

      This test code snippet fails with the following exception:

      org.apache.kafka.connect.errors.DataException: Unknown schema type: null
      
      	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
      	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
      	at org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
      

       

      This seems related to the issue https://github.com/FasterXML/jackson-databind/issues/2211 , where jackson lib started returning `MissingNode` for empty input in `ObjectMapper.readTree(input)` method invocation. Precise code change can be observed here: https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094 

       

      This causes an exception to throw up in our JsonConverter class : https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764 

       

      In my opinion, when the `jsonValue.getNodeType()` is `MISSING` (https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754 ), we need to fall back to the behaviour of the case `NULL` (i.e. return null), although not sure of any further repercussions this might bring in.

       

      Things were working fine when the dependency on jackson lib was of version  v2.9.10.3 or lesser as the `ObjectMapper` returned null in that case.

       

      Thanks,

      Zakir

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            shaikzakir.iitm Shaik Zakir Hussain
            shaikzakir.iitm Shaik Zakir Hussain
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment