Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28156

Flink KafkaSource with Bounded Throw Exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.14.3
    • None
    • None

    Description

      I want to use KafkaSource consume topic between commited offset and last-offset,

      but throw a exception

       KafkaSource.<String>builder()
      .setBootstrapServers("10.18.34.43:9092,10.18.34.44:9092,10.18.34.45:9092")
      .setTopics(topic)
      .setGroupId(groupId)
      // .setStartingOffsets(OffsetsInitializer.timestamp(1655717760000L))
      .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .setBounded(OffsetsInitializer.latest())
      .build();

       

      Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
          at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
          at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          ... 1 more
      Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
          at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223)
          at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
          at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:99)
          at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
          at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
          ... 6 more

      Attachments

        Activity

          People

            Unassigned Unassigned
            Jiangfei Liu Jiangfei Liu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: