Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10477

Sink Connector fails with DataException when trying to convert Kafka record with empty key to Connect Record

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1
    • 2.2.3, 2.3.2, 2.4.2, 2.7.0, 2.5.2, 2.6.1
    • connect
    • None

    Description

      Sink connector is facing a DataException when trying to convert a kafka record with empty key to Connect data format. 

      Kafka's trunk branch currently depends on jackson v2.10.5 

      A short unit test (shared below) in `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue.  

      @Test
          public void testToConnectDataEmptyKey() throws IOException {
              Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
              converter.configure(props, true);
              String str = "";
              SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", str.getBytes());
              System.out.println(schemaAndValue);
          }
      

      This test code snippet fails with the following exception:

      org.apache.kafka.connect.errors.DataException: Unknown schema type: null
      
      	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
      	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
      	at org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
      	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
      	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
      	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
      

       

      This seems related to the issue https://github.com/FasterXML/jackson-databind/issues/2211 , where jackson lib started returning `MissingNode` for empty input in `ObjectMapper.readTree(input)` method invocation. Precise code change can be observed here: https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094 

       

      This causes an exception to throw up in our JsonConverter class : https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764 

       

      In my opinion, when the `jsonValue.getNodeType()` is `MISSING` (https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754 ), we need to fall back to the behaviour of the case `NULL` (i.e. return null), although not sure of any further repercussions this might bring in.

       

      Things were working fine when the dependency on jackson lib was of version  v2.9.10.3 or lesser as the `ObjectMapper` returned null in that case.

       

      Thanks,

      Zakir

      Attachments

        Issue Links

          Activity

            People

              shaikzakir.iitm Shaik Zakir Hussain
              shaikzakir.iitm Shaik Zakir Hussain
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: