Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21691

KafkaSource fails with NPE when setting it up

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Duplicate
    • Affects Version/s: 1.12.2, 1.13.0
    • Fix Version/s: 1.13.0, 1.12.3
    • Component/s: Connectors / Kafka
    • Labels:
      None

      Description

      A user reported that the new KafkaSource fails with a NullPointerException:

      Exception in thread "main" java.lang.NullPointerException
      at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
      at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
      

      when setting it up like this:

      val kafkaSource = buildKafkaSource(params)
      val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
      
      private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
          val builder = KafkaSource.builder<String>()
              .setBootstrapServers(params.get("bootstrapServers"))
              .setGroupId(params.get("groupId"))
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setTopics("topic")
              .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
      
          if (params.getBoolean("boundedSource", false)) {
              builder.setBounded(OffsetsInitializer.latest())
          }
      
          return builder.build()
      }
      

      The problem seems to be that the ValueDeserializerWrapper does not set the deserializer the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException.

      https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                trohrmann Till Rohrmann
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: