Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9627

Extending 'KafkaJsonTableSource' according to comments will result in NPE

    XMLWordPrintableJSON

    Details

      Description

      According to the comments what is needed to extend the 'KafkaJsonTableSource' looks as follows:

       

      A version-agnostic Kafka JSON {@link StreamTableSource}.
      *
      * <p>The version-specific Kafka consumers need to extend this class and
      * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
      *
      * <p>The field names are used to parse the JSON file and so are the types.

      This will cause an NPE, since there is no default value for startupMode in the abstract class itself only in the builder of this class.
      For the 'getKafkaConsumer' method the switch statement will be executed on non-initialized 'startupMode' field:

      switch (startupMode) {
      case EARLIEST:
      kafkaConsumer.setStartFromEarliest();
      break;
      case LATEST:
      kafkaConsumer.setStartFromLatest();
      break;
      case GROUP_OFFSETS:
      kafkaConsumer.setStartFromGroupOffsets();
      break;
      case SPECIFIC_OFFSETS:
      kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
      break;
      }

       

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                yanghua vinoyang
                Reporter:
                Wosinsan Dominik Wosiński
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: