Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.5.0
-
None
Description
According to the comments what is needed to extend the 'KafkaJsonTableSource' looks as follows:
A version-agnostic Kafka JSON {@link StreamTableSource}. * * <p>The version-specific Kafka consumers need to extend this class and * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}. * * <p>The field names are used to parse the JSON file and so are the types.
This will cause an NPE, since there is no default value for startupMode in the abstract class itself only in the builder of this class.
For the 'getKafkaConsumer' method the switch statement will be executed on non-initialized 'startupMode' field:
switch (startupMode) { case EARLIEST: kafkaConsumer.setStartFromEarliest(); break; case LATEST: kafkaConsumer.setStartFromLatest(); break; case GROUP_OFFSETS: kafkaConsumer.setStartFromGroupOffsets(); break; case SPECIFIC_OFFSETS: kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); break; }
Attachments
Issue Links
- links to