Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20753

Duplicates With Exactly-once Kafka -> Kakfa Producer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Cannot Reproduce
    • 1.12.0
    • None
    • None
    • Java 11
      Flink stated within IDE

    Description

      Introduction

      Based on as follows statements from Flink's docs:

      1. https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

      Flink provides an Apache Kafka connector for reading data from and writing data to Kafka topics with exactly-once guarantees.

      2. https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#exactly-once-end-to-end

      To achieve exactly once end-to-end, so that every event from the sources affects the sinks exactly once, the following must be true:

      1. your sources must be replayable, and
      2. your sinks must be transactional (or idempotent)

      3. https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#caveats

      Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka's transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time)

      4. https://issues.apache.org/jira/browse/FLINK-7210
      There is references/mentions about two-phase commit mechanic used in old Flink Kafka connector. So it is expected that latest one version of connector has the same functionality.

      it is indirectly expectation of EXACTLY_ONCE Kafka->Kafka end-to-end delivery guarantees.

      Moreover it is emphasised to tune Kafka cluster transaction timeout (make it from 15 mins to 1 hour) to omit data loss.

      Moving forward, all these three statements are met by `Kafka Source` -> `Kafka Sink` app:

      • regarding first-one -> you are reading from & to Kafka
      • about second-one -> `Kafka Source` is replayable & `Kafka Sink` is transactional
      • last one -> `Kafka Sink` is transactional & consequently in case of EXACTLY_ONCE this operator has a state; so it expected that transaction will be rolled back.

      But in fact there is no possibility to achieve EXACTLY_ONCE for simple Flink `Kafka Source` -> `Kafka Sink` application. Duplicates still exists as result EXACTLY_ONCE semantics is violated.

      Details

      STRs:

      1. Create simple Flink's `Kafka Source` -> `Kafka Sink` app
        1. Stream execution env:
          1. Parallelism -> 1
          2. Enable checkpointing -> 10000 ms (do it so big intentionally)
          3. State backend -> RocksDB
          4. Checkpointing mode -> EXACTLY_ONCE
          5. Min pause between checkpoints -> 500 ms
          6. Max concurrent checkpoints -> 1
        2. Flink Kafka consumer
          1. Nothing valuable
        3. Flink Kafka producer
          1. Props:
            1. ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"
            2. ProducerConfig.ACKS_CONFIG, "all"
            3. ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"
          2. EXACTLY_ONCE Semantic
      2. Deploy `Kafka Source` Cluster
        1. Cretae `topic-1` with 3 patitions
      3. Deploy `Kafka Sink` Cluster
        1. Cretae `topic-1` with 3 patitions
      4. Spin up some Kafka client to generate data into `Kafka Source`:`topic-1` (e.g. Confluent `kafka-console-producer`)
      5. Spin up transactional Kafka consumer to drain data from `Kafka Sink`:`topic-1` (e.g. Confluent `kafka-console-consumer`)
      6. Use Flink's app described in step #1 to ship data from `Kafka Source` -> `Kafka Sink` Kafka cluster.
      7. Wait until Flink app will create a first checkpoint. 
      8. Brutally kill Flink's app (SIGKILL)
      9. Wait 10 secs
      10. Start Flink app again.
      11. Check on duplications in transactional Kafka consumer (described in step #5)

      Actual

      Duplication are exist in transactional Kafka consumer output.

      Expected

      • Kafka transaction should be rolled back by Flink Kafka producer with EXACTLY_ONCE Semantic
      • Flink should automatically replay the data from `Kafka Source` based on offsets persisted in latest checkpoint

      Example

      App

      build.gradle (dependencies)
      ...
      ext {
        ...
        javaVersion = '11'
        flinkVersion = '1.12.0'
        scalaBinaryVersion = '2.11'
        ...
      }
      
      dependencies {
        ...
        implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
        implementation "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
        implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
        ...
      }
      
      App
      public static void main(String[] args) {
        ...
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
        env.setParallelism(1); // to make things simple
      
        env.enableCheckpointing(10000); // intentionally specified 10 secs to have a room to stop app between checkpoints
      
        env.setStateBackend(new RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
      
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
      
        FlinkKafkaConsumer<Record> consumer = createConsumer();
        FlinkKafkaProducer<Record> producer = createProducer();
      
        env
          .addSource(consumer)
          .uid("kafka-consumer")
          .addSink(producer)
          .uid("kafka-producer")
        ;
      
        env.execute();
      }
      
      public static FlinkKafkaConsumer<Record> createConsumer() {
        ...
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-source-1:9091");
        ... // nothing special
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      
        FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new RecordKafkaDerSchema(), props);
      
        ... // RecordKafkaDerSchema --> custom schema is used to copy not only message body but message key too
        ... // SimpleStringSchema --> can be used instead to reproduce issue
      
        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);
      
        return consumer;
      }
      
      public static FlinkKafkaProducer<Record> createProducer() {
        ...
        Properties props = new Properties();
        ...
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-target-1:9094");
        props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
        props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
        props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
        props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 1000)); // decreased from 1 hour to 15 mins; app is going to be shutdown less than 15 mins
        ...
        FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
      
        ... // RecordKafkaSerSchema --> custom schema is used to copy not only message body but message key too
        ... // SimpleStringSchema --> can be used instead to reproduce issue
      
        return producer;
      }
      
      kafka-source-1 Producer
      bash -c 'echo Producing data... && \
       for ((i=0; ;++i)); do echo "t1-k-$$i:t1-v-$$i"; sleep 2; done | kafka-console-producer --request-required-acks 1 --broker-list kafka-source-1:9091 --topic topic-1 --property parse.key=true --property key.separator=":"'
      
      kafka-target-1 Consumer - 0 partition
      bash -c 'echo Consuming data for topic-1... && \
            kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic topic-1 --partition 0 --from-beginning --property print.key=true --property key.separator=":" --value-deserializer org.apache.kafka.common.serialization.StringDeserializer' --isolation-level read_committed
      
      kafka-target-1 Consumer - 1 partition
      bash -c 'echo Consuming data for topic-1... && \
            kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic topic-1 --partition 1 --from-beginning --property print.key=true --property key.separator=":" --value-deserializer org.apache.kafka.common.serialization.StringDeserializer' --isolation-level read_committed
      
      kafka-target-1 Consumer - 2 partition
      bash -c 'echo Consuming data for topic-1... && \
            kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic topic-1 --partition 2 --from-beginning --property print.key=true --property key.separator=":" --value-deserializer org.apache.kafka.common.serialization.StringDeserializer' --isolation-level read_committed
      

      Output

      kafka-target-1 Consumer - 0 partition
      ...
      t1-k-40:t1-v-40
      t1-k-43:t1-v-43
      t1-k-44:t1-v-44
      t1-k-47:t1-v-47
      t1-k-48:t1-v-48
      t1-k-49:t1-v-49
      t1-k-48:t1-v-48 // DUPLICATION!!! --> EXACTLY ONCE is violated
      t1-k-49:t1-v-49 // DUPLICATION!!! --> EXACTLY ONCE is violated
      t1-k-54:t1-v-54
      t1-k-61:t1-v-61
      t1-k-62:t1-v-62
      t1-k-66:t1-v-66
      t1-k-71:t1-v-71
      t1-k-73:t1-v-73
      ...
      
      kafka-target-1 Consumer - 1 partition
      ...
      t1-k-35:t1-v-35
      t1-k-46:t1-v-46
      t1-k-50:t1-v-50
      t1-k-51:t1-v-51
      t1-k-53:t1-v-53
      t1-k-56:t1-v-56
      t1-k-57:t1-v-57
      t1-k-59:t1-v-59
      t1-k-60:t1-v-60
      t1-k-63:t1-v-63
      t1-k-65:t1-v-65
      t1-k-69:t1-v-69
      t1-k-74:t1-v-74
      ...
      
      kafka-target-1 Consumer - 2 partition
      ...
      t1-k-39:t1-v-39
      t1-k-41:t1-v-41
      t1-k-42:t1-v-42
      t1-k-45:t1-v-45
      t1-k-52:t1-v-52
      t1-k-55:t1-v-55
      t1-k-58:t1-v-58
      t1-k-64:t1-v-64
      t1-k-67:t1-v-67
      t1-k-68:t1-v-68
      t1-k-70:t1-v-70
      t1-k-72:t1-v-72
      t1-k-75:t1-v-75
      t1-k-77:t1-v-77
      ...
      

      Summary

      As we can see from `kafka-target-1 Consumer - 0 partition` EXACTLY ONCE delivery has been violated.

      P.S.: If I have missed something. Please let me know what & how achieve EXACTLY ONCE delivery in native way (via Flink configuration) for this particular simple application described above.

      P.S.: If it is not possible to do in native way (only manual/custom implementation) then please let me know.

      P.S.: Similar issue discussions:

      Attachments

        Activity

          People

            Unassigned Unassigned
            nvolynets Nazar Volynets
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: