Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9280

Duplicate messages are observed in ACK mode ALL

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.2.1
    • None
    • None
    • None

    Description

      In ack mode ALL, leader is sending the message to consumer even before receiving the acknowledgements from other replicas. This can lead to duplicate messages.

       

      Setup details:

      • 1 zookeeper, 5 brokers
      • Producer: Synchronous
      • Topic: 1 partition, replication factor - 3, min isr - 2

       

      Say First replica (Leader), Second replica and Third replica are the three replicas of the topic.

       

      Sequence of events:

      a) All brokers are up and running.

      b) Clients started running.

      c) Kill second replica of the topic.

      d) Kill the third replica. Now min isr will not be satisfied.

      e) Bring up third replica. Min isr will be satisfied.

       

      Breakdown of step 'd':

      1. Just before producer sends next message, killed third replica with kill -9 (Leader takes time ~5sec to detect that the broker is down).
      2. Producer sent a message to leader.
      3. Before the leader knows that third replica is down, it accepts the message from producer.
      4. Leader forwards the message to third replica.
      5. Before receiving ACK from third replica, leader sent the message to consumer.
      6. Leader doesn't get an ACK from third replica.
      7. Now leader detects that third replica is down and throws NOT_ENOUGH_REPLICAS_EXCEPTION.
      8. Now leader stops accepting messages from producer.
      9. Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION after timeout (2min in our case) .
      10. So far, producer believes that the message was not received by leader whereas the consumer actually received it.
      11. Now producer retries sending the same message. (In our application it is the next integer we send).
      12. Now when second/third replica is up, leader accepts the message and sends the same message to consumer. Thus sending duplicates.

       

       

      Logs:

      1. 2-3 seconds before producer sends next message, killed third replica with kill -9 (Leader takes time ~5sec to detect that the broker is down).

      {{{

      > kill -9 49596

      }}}

       __ 

      1. Producer sent a message to leader.

      {{{

      [20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=p229-4, timestamp=null)

      }}}

       

      1. Before the leader knows that third replica is down, it accepts the message from producer.
      2. Leader forwards the message to third replica.
      3. Before receiving ACK from third replica, leader sent the message to consumer.

      {{{

       Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size = -1, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = p229-4)

      }}}

       __ 

      1. Leader doesn't get an ACK from third replica.
      2. Now leader detects that third replica is down and throws NOT_ENOUGH_REPLICAS_EXCEPTION.

      {{{

      [2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing append operation on partition t229-0 (kafka.server.ReplicaManager)

      org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 for partition t229-0

      }}}

       

      1. Now leader stops accepting messages from producer.
      2. Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION after timeout (2min in our case) .

      {{{

       java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t229-0:120000 ms

      has passed since batch creation

              at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)

              at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)

              at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)

      Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t229-0:120000 ms has passed since batch creation

      }}}

       

      1. So far, producer believes that the message was not received by leader whereas the consumer actually received it.
      2. Now producer retries sending the same message. (In our application it is the next integer we send).
      3. Now when second/third replica is up, leader accepts the message and sends the same to consumer. Thus sending duplicates.

       

      Ideally, in ACK mode all it is expected that leader sends message to consumer only after it receives ack from all other replicas. But this is not happening.

       

      Question

      1) In ack =all case, Does leader send message to consumer only after all in-sync followers receive the message?

      (or)

      will it send message to consumer and then wait for followers acknowledgement?

       

      Observation

      For a topic with replication factor > 1, We did a test to measure round trip time (client1 -> kafka -> client2 -> kafka -> client1) of messages with both acks = 1 and acks = all , and observed latency to be same in both cases. Is this expected?

      Attachments

        Activity

          People

            Unassigned Unassigned
            vikram484 VIkram
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: