Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9627

Extending 'KafkaJsonTableSource' according to comments will result in NPE

    XMLWordPrintableJSON

Details

    Description

      According to the comments what is needed to extend the 'KafkaJsonTableSource' looks as follows:

       

      A version-agnostic Kafka JSON {@link StreamTableSource}.
      *
      * <p>The version-specific Kafka consumers need to extend this class and
      * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
      *
      * <p>The field names are used to parse the JSON file and so are the types.

      This will cause an NPE, since there is no default value for startupMode in the abstract class itself only in the builder of this class.
      For the 'getKafkaConsumer' method the switch statement will be executed on non-initialized 'startupMode' field:

      switch (startupMode) {
      case EARLIEST:
      kafkaConsumer.setStartFromEarliest();
      break;
      case LATEST:
      kafkaConsumer.setStartFromLatest();
      break;
      case GROUP_OFFSETS:
      kafkaConsumer.setStartFromGroupOffsets();
      break;
      case SPECIFIC_OFFSETS:
      kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
      break;
      }

       

       

      Attachments

        Issue Links

          Activity

            People

              yanghua vinoyang
              Wosinsan Dominik Wosiński
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: