Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21160

ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

    XMLWordPrintableJSON

Details

    Description

      The variable deserializer in class ValueDeserializerWrapper won't be instantiated until method deserialize() is invoked in runtime, so in the job compiling stage when invoking getProducedType(), NPE will be thrown because of referencing the uninstantiated variable deserializer.

      A user reported that the new KafkaSource fails with a NullPointerException:

      Exception in thread "main" java.lang.NullPointerException
      at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
      at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
      

      when setting it up like this:

      val kafkaSource = buildKafkaSource(params)
      val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka")
      
      private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
          val builder = KafkaSource.builder<String>()
              .setBootstrapServers(params.get("bootstrapServers"))
              .setGroupId(params.get("groupId"))
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setTopics("topic")
              .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
      
          if (params.getBoolean("boundedSource", false)) {
              builder.setBounded(OffsetsInitializer.latest())
          }
      
          return builder.build()
      }
      

      The problem seems to be that the ValueDeserializerWrapper does not set the deserializer the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException.

      https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              renqs Qingsheng Ren
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: