Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21691

KafkaSource fails with NPE when setting it up

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Duplicate
    • 1.12.2, 1.13.0
    • 1.13.0, 1.12.3
    • Connectors / Kafka
    • None

    Description

      A user reported that the new KafkaSource fails with a NullPointerException:

      Exception in thread "main" java.lang.NullPointerException
      at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
      at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
      

      when setting it up like this:

      val kafkaSource = buildKafkaSource(params)
      val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
      
      private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
          val builder = KafkaSource.builder<String>()
              .setBootstrapServers(params.get("bootstrapServers"))
              .setGroupId(params.get("groupId"))
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setTopics("topic")
              .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
      
          if (params.getBoolean("boundedSource", false)) {
              builder.setBounded(OffsetsInitializer.latest())
          }
      
          return builder.build()
      }
      

      The problem seems to be that the ValueDeserializerWrapper does not set the deserializer the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException.

      https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            trohrmann Till Rohrmann
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment