Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35565

Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • kafka-3.1.0
    • None
    • Connectors / Kafka
    • None
    • This is reproduced on a Flink 1.18.1 with the latest Kafka connector 3.1.0-1.18 on a session cluster.

    Description

      Summary

      Flink batch job gets into an infinite fetch loop and could not gracefully finish if the connected Kafka topic is empty and starting offset value in Flink job is lower than the current start/end offset of the related topic. See below for details:

      How to reproduce

      Flink batch job which works as a KafkaSource, will consume events from Kafka topic.

      Related Kafka topic is empty, there are no events, and the offset value is as below: 15

       

      Flink job uses a specific starting offset value, which is less than the current offset of the topic/partition.

      See below, it set as “4”

       

      package naci.grpId;
      
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.connector.kafka.source.KafkaSource;
      import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.kafka.common.TopicPartition;
      
      import java.util.HashMap;
      import java.util.Map;
      
      public class KafkaSource_Print {
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setRuntimeMode(RuntimeExecutionMode.BATCH);
      
              // Define the specific offsets for the partitions
              Map<TopicPartition, Long> specificOffsets = new HashMap<>();
              specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start from offset 4 for partition 0
      
              KafkaSource<String> kafkaSource = KafkaSource
                      .<String>builder()
                      .setBootstrapServers("localhost:9093")  // Make sure the port is correct
                      .setTopics("topic_test")
                      .setValueOnlyDeserializer(new SimpleStringSchema())
                      .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
                      .setBounded(OffsetsInitializer.latest())
                      .build();
      
              DataStream<String> stream = env.fromSource(
                      kafkaSource,
                      WatermarkStrategy.noWatermarks(),
                      "Kafka Source"
              );
              stream.print();
      
              env.execute("Flink KafkaSource test job");
          }
      }

       

       

      Here are the initial logs printed related to the offset, as soon as the job gets submitted:

       

      2024-05-30 12:15:50,010 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]]
      2024-05-30 12:15:50,069 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]]]
      2024-05-30 12:15:50,074 TRACE org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - Seeking starting offsets to specified offsets: {topic_test-0=4}
      2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer              [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Seeking to offset 4 for partition topic_test-0
      2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - SplitsChange handling result: [topic_test-0, start:4, stop: 15]
      2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, StoppingOffset: 15]]]
      2024-05-30 12:15:50,075 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask

       

      Since the starting offset 4 is out of range for the Kafka topic, KafkaConsumer initiates an offset reset, as seen on task manager logs:

       

      2024-05-30 12:15:50,193 INFO  org.apache.kafka.clients.consumer.internals.Fetcher          [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
      2024-05-30 12:15:50,195 INFO  org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset for partition topic_test-0 to position FetchPosition{offset=15, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: null)], epoch=0}}.

       

       

      Then, an infinite FetchTask loop starts:

       

      2024-05-30 12:16:00,079 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task FetchTask
      2024-05-30 12:16:00,079 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask
      2024-05-30 12:16:00,079 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current fetch is finished.
      2024-05-30 12:16:00,080 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source reader status: NOTHING_AVAILABLE
      2024-05-30 12:16:06,288 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from df54e7abdfa0095dc5c214b056153dea.
      2024-05-30 12:16:08,755 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from e1746de110bfdd23c7dba50f3b083621.
      2024-05-30 12:16:10,082 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task FetchTask
      2024-05-30 12:16:10,082 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask
      2024-05-30 12:16:10,082 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting next source data batch from queue
      2024-05-30 12:16:10,082 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current fetch is finished.
      2024-05-30 12:16:10,082 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source reader status: NOTHING_AVAILABLE
      2024-05-30 12:16:16,290 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from df54e7abdfa0095dc5c214b056153dea.
      2024-05-30 12:16:17,393 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received file upload request for file LOG
      2024-05-30 12:16:17,394 DEBUG org.apache.flink.runtime.blob.BlobClient                     [] - PUT BLOB stream to /127.0.0.1:55663.
      2024-05-30 12:16:18,757 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from e1746de110bfdd23c7dba50f3b083621.
      2024-05-30 12:16:20,084 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task FetchTask
      2024-05-30 12:16:20,084 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting next source data batch from queue
      2024-05-30 12:16:20,084 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask
      2024-05-30 12:16:20,084 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current fetch is finished.
      2024-05-30 12:16:20,084 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source reader status: NOTHING_AVAILABLE
      2024-05-30 12:16:26,293 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from df54e7abdfa0095dc5c214b056153dea.
      2024-05-30 12:16:28,761 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from e1746de110bfdd23c7dba50f3b083621.
      2024-05-30 12:16:30,086 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task FetchTask
      2024-05-30 12:16:30,086 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting next source data batch from queue
      2024-05-30 12:16:30,086 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask
      2024-05-30 12:16:30,086 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current fetch is finished.
      2024-05-30 12:16:30,086 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source reader status: NOTHING_AVAILABLE
      2024-05-30 12:16:36,296 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from df54e7abdfa0095dc5c214b056153dea.
      2024-05-30 12:16:38,762 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from e1746de110bfdd23c7dba50f3b083621.
      2024-05-30 12:16:40,087 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task FetchTask
      2024-05-30 12:16:40,087 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Getting next source data batch from queue
      2024-05-30 12:16:40,087 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare to run FetchTask
      2024-05-30 12:16:40,088 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Current fetch is finished.
      2024-05-30 12:16:40,088 TRACE org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Source reader status: NOTHING_AVAILABLE
      2024-05-30 12:16:46,297 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from df54e7abdfa0095dc5c214b056153dea.
      2024-05-30 12:16:48,765 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received heartbeat request from e1746de110bfdd23c7dba50f3b083621.
      2024-05-30 12:16:50,089 DEBUG org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished running task FetchTask

       

      The loop ends as soon as I add a new event on this Kafka topic, which will be placed in offset 15.

      Expected Result: Since this is a batch job, and since there is no event on the Kafka topic, right after offset reset, Flink connector should identify that there is no events to process, and gracefully finish the application.

      Actual Result: Flink connector infinitely tries to fetch an event from offset:15 which actually exists but no events on that offset, application keep fetching that same offset!

       

      This issue is NOT happening if the above Flink application sets a starting offset 15 or higher! If it is given as 15 or higher, no offset reset is performed, and the Flink application gracefully finishes!

      This is reproduced on a Flink 1.18.1 with the latest Kafka connector 3.1.0-1.18 on a session cluster.

      Logs are attached.

      Attachments

        1. taskmanager_localhost_54489-ac092a_log.txt
          561 kB
          Naci Simsek
        2. image-2024-06-11-11-19-09-889.png
          116 kB
          Naci Simsek

        Activity

          People

            Unassigned Unassigned
            nacisimsek Naci Simsek
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: