Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21691

KafkaSource fails with NPE when setting it up

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Duplicate
    • 1.12.2, 1.13.0
    • 1.13.0, 1.12.3
    • Connectors / Kafka
    • None

    Description

      A user reported that the new KafkaSource fails with a NullPointerException:

      Exception in thread "main" java.lang.NullPointerException
      at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
      at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
      

      when setting it up like this:

      val kafkaSource = buildKafkaSource(params)
      val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
      
      private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
          val builder = KafkaSource.builder<String>()
              .setBootstrapServers(params.get("bootstrapServers"))
              .setGroupId(params.get("groupId"))
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setTopics("topic")
              .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
      
          if (params.getBoolean("boundedSource", false)) {
              builder.setBounded(OffsetsInitializer.latest())
          }
      
          return builder.build()
      }
      

      The problem seems to be that the ValueDeserializerWrapper does not set the deserializer the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException.

      https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              trohrmann Till Rohrmann
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: