Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Duplicate
-
1.12.2, 1.13.0
-
None
Description
A user reported that the new KafkaSource fails with a NullPointerException:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
when setting it up like this:
val kafkaSource = buildKafkaSource(params) val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka") private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> { val builder = KafkaSource.builder<String>() .setBootstrapServers(params.get("bootstrapServers")) .setGroupId(params.get("groupId")) .setStartingOffsets(OffsetsInitializer.earliest()) .setTopics("topic") .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) if (params.getBoolean("boundedSource", false)) { builder.setBounded(OffsetsInitializer.latest()) } return builder.build() }
The problem seems to be that the ValueDeserializerWrapper does not set the deserializer the deserialize method is called, but getProducedType is actually called first resulting in the NullPointerException.
Attachments
Issue Links
- duplicates
-
FLINK-21160 ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
- Resolved