Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36674

Flink will not commit to kafka if checkpointing is not enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.15.0, 1.16.0, 1.17.2, 1.18.1, 1.20.0, 1.19.1
    • None
    • Connectors / Kafka
    • None

    Description

      background: recently I found that Flink-sql-connector-kafka-1.13.5 kafka source DDL style, checkpoint is NOT enabled,  I still can see consumer group defined by properties.group.id values in Kafka. And when I change to flink-sql-connector-kafka-1.17.2, checkpoint must enabled otherwise consumer group will never seem. Developers offen look into consumer groups to see whether program works properly.

      The expected result will be Flink document, saying: 

      If checkpointing is not enabled, Kafka source relies on Kafka consumer’s internal automatic periodic offset committing logic, configured by enable.auto.commit and auto.commit.interval.ms in the properties of Kafka consumer.

       

      The simplist kafka source DDL:

       

      CREATE TABLE KafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `behavior` STRING,
        `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_behavior',
        'properties.bootstrap.servers' = 'localhost:9092',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
      ) 

       

      Precondition: group.id always provided.

       

      Here're my question and my research result:

      1. Flink will not commit offset since the  enable.auto.commit behavior turned off by default (FLINK-20114)

      why the enable.auto.commit behavior turned off by default (The Kafka officially says defaults to true if there group.id provided) which may cause user use must define expilicity true to the kafka source DDL. (naturely peolple won't set it)

      Answered by: becket_qin 

      The idea is to associate the offset commit with the Flink checkpoint to avoid the rewind in committed offset. So the Flink job will only commit an offset after a checkpoint has succeeded. Otherwise, what could happen is after a job failover, the committed offset may go back because the job resets to the last successful checkpoint. By default checkpoint is enabled in Flink. So by default the offsets will be committed to Kafka.

      When checkpoint is disabled, to make sure of the same semantic of when checkpoint is enabled, by default no offset is committed unless user explicitly specifies to do so. But I can understand the argument that if checkpoint is disabled, default Kafka behavior should be honored. So both behavior has their own point. But from backwards compatibility perspective, I think keep the behavior the same as FlinkKafkaConsumer makes sense.

       

      2. It seems that this ticket FLINK-20114 fix a few KafkaSource-related bugs and takes effects earlier than 1.13.5, what causes the commiting to consumer group behaivor diffierence between Flink 1.13.5 and Flink 1.17.2. (no checkpoint enabled)?

      This changes seems make by FLINK-25368 Use AdminClient to get offsets rather than KafkaConsumer

      It removed kafka Consumer which replaced by `AdminClient`

       

      So the conclusion is:

      Since Flink 1.15.0, Flink will not commit to kafka if checkpointing is not enabled. 

      Since Flink 1.12.0, Flink Kafka Connector internally turned off the auto commit options

       

      logic related:

      Github PR-15161

       

      org.apache.flink.connector.kafka.source.KafkaSourceBuilder:
      
      maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); 

      FlinkKafkaConsumer.java respect the default value (which is true) , 

       

      @Override
      protected boolean getIsAutoCommitEnabled() {
          return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                  && PropertiesUtil.getLong(
                                  properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
                          > 0;
      } 

       

      The OffsetCommitModes : 

      public class OffsetCommitModes {
      ...
          public static OffsetCommitMode fromConfiguration(
                  boolean enableAutoCommit,
                  boolean enableCommitOnCheckpoint,
                  boolean enableCheckpointing) {
      
              if (enableCheckpointing) {
                  // if checkpointing is enabled, the mode depends only on whether committing on
                  // checkpoints is enabled
                  return (enableCommitOnCheckpoint)
                          ? OffsetCommitMode.ON_CHECKPOINTS
                          : OffsetCommitMode.DISABLED;
              } else {
                  // else, the mode depends only on whether auto committing is enabled in the provided
                  // Kafka properties
                  return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
              }
          }
      } 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Adrian Z slankka
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: