Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-2325

PersistentKafkaSource throws ArrayIndexOutOfBoundsException if reading from a topic that is created after starting the Source

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.9
    • 0.9.1, 0.10.0
    • Connectors / Kafka
    • None

    Description

      I'm creating a PersistentKafkaSource reading from a specified topic from Kafka, that is at the time the PersistentKafkaSource is started (via open(.)) not yet present. That's why the number of partitions, that is read in the open(.) function is 0, which leads to arrays of length 0 (lastOffsets and committedOffsets).
      May be it is better to check, whether numberOfPartitions returns 0 and if so, to take the default number of partitions from Kafka config?

      Stacktrace:
      java.lang.ArrayIndexOutOfBoundsException: 0
      at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:180)
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49)
      at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
      at java.lang.Thread.run(Thread.java:745)

      Attachments

        Activity

          People

            rmetzger Robert Metzger
            bergmann Rico Bergmann
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: