Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7679

With acks=all a single "stuck" non-leader replica can cause a timeout

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None
    • None

    Description

      From the documentation:

      acks=all
      
      This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
      min.insync.replicas
      
      When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
      When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.	int

      Given a replication factor of 3 and min.inseyc.repliacs set to 2, I would expect that the client get an acknowledgment as soon as it writes to the leader and at least one replica. This is what happens when on a 3 node cluster one of the broker is down for example.

      Howether, it looks like this is not the case when a broker is "stuck" (which happens when you have network "blips").

      Here is how I reproduced this, but you can probably do the same with iptables on your own cluster:

      # Start a cluster with 3 nodes
      $ docker-compose up -d
      $ docker-compose scale kafka=3
      Starting kafka-docker_kafka_1_dbf4109a3095 ... done
      Creating kafka-docker_kafka_2_973a373fa5b5 ... done
      Creating kafka-docker_kafka_3_3d8fab2ac44a ... done
      
      # Create topics with various settings
      $ docker-compose exec kafka bash
      $ kafka-topics.sh --create --topic tests-irs2 --config min.insync.replicas=2 --zookeeper=${KAFKA_ZOOKEEPER_CONNECT} --partitions=1 --replication-factor=3
      $ kafka-topics.sh --describe --zookeeper ${KAFKA_ZOOKEEPER_CONNECT}
      Topic:tests-irs2 PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2
      Topic: tests-irs2 Partition: 0 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002,1001

       

      Then start a small script that produces message periodically

       

      # Start the latency to get an idea of the normal latency
      $ KAFKA_BOOTSTRAP_SERVERS=localhost:32784 KAFKA_TOPIC=tests-irs2 KAFKA_ACKS=-1 ../test.py
      localhost:32784 tests-irs2 0.068457s
      localhost:32784 tests-irs2 0.016032s
      localhost:32784 tests-irs2 0.015884s
      localhost:32784 tests-irs2 0.018244s
      localhost:32784 tests-irs2 0.008625s

       

      Run `docker pause` on 1002

       

       
      2018-11-27 14:07:47,608 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout waiting for future
      Traceback (most recent call last):
       File "../test.py", line 27, in send_message
       producer.flush(timeout=MESSAGE_INTERVAL_SECS)
       File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py", line 577, in flush
       self._accumulator.await_flush_completion(timeout=timeout)
       File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py", line 530, in await_flush_completion
       raise Errors.KafkaTimeoutError('Timeout waiting for future')
      KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
      2018-11-27 14:07:49,618 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout waiting for future
      Traceback (most recent call last):
       File "../test.py", line 27, in send_message
       producer.flush(timeout=MESSAGE_INTERVAL_SECS)
       File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py", line 577, in flush
       self._accumulator.await_flush_completion(timeout=timeout)
       File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py", line 530, in await_flush_completion
       raise Errors.KafkaTimeoutError('Timeout waiting for future')
      KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
      2018-11-27 14:07:51,628 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout waiting for future
      Traceback (most recent call last):
       File "../test.py", line 27, in send_message
       producer.flush(timeout=MESSAGE_INTERVAL_SECS)
       File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py", line 577, in flush
       self._accumulator.await_flush_completion(timeout=timeout)
       File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py", line 530, in await_flush_completion
       raise Errors.KafkaTimeoutError('Timeout waiting for future')
      KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
      localhost:32784 tests-irs2 0.017413s
      

      We get a timeout until the controller gets:

      [2018-11-27 13:15:18,020] INFO [Controller id=1001] Newly added brokers: , deleted brokers: 1002, all live brokers: 1001,1003 (kafka.controller.KafkaController)
      [2018-11-27 13:15:18,020] INFO [RequestSendThread controllerId=1001] Shutting down (kafka.controller.RequestSendThread)
      [2018-11-27 13:15:18,021] INFO [RequestSendThread controllerId=1001] Stopped (kafka.controller.RequestSendThread)
      [2018-11-27 13:15:18,021] INFO [RequestSendThread controllerId=1001] Shutdown completed (kafka.controller.RequestSendThread)
      [2018-11-27 13:15:18,024] INFO [Controller id=1001] Broker failure callback for 1002 (kafka.controller.KafkaController)
      [2018-11-27 13:15:18,030] DEBUG The stop replica request (delete = true) sent to broker 1002 is (kafka.controller.ControllerBrokerRequestBatch)
      [2018-11-27 13:15:18,030] DEBUG The stop replica request (delete = false) sent to broker 1002 is [Topic=tests-irs1,Partition=0,Replica=1002],[Topic=tests-irs2,Partition=0,Replica=1002] (kafka.controller.ControllerBrokerRequestBatch)
      [2018-11-27 13:15:18,030] WARN [Channel manager on controller 1001]: Not sending request (type=StopReplicaRequest, controllerId=1001, controllerEpoch=1, deletePartitions=false, partitions=) to broker 1002, since it is offline. (kafka.controller.ControllerChannelManager)
      [2018-11-27 13:15:18,030] WARN [Channel manager on controller 1001]: Not sending request (type=StopReplicaRequest, controllerId=1001, controllerEpoch=1, deletePartitions=false, partitions=tests-irs1-0) to broker 1002, since it is offline. (kafka.controller.ControllerChannelManager)
      [2018-11-27 13:15:18,030] WARN [Channel manager on controller 1001]: Not sending request (type=StopReplicaRequest, controllerId=1001, controllerEpoch=1, deletePartitions=false, partitions=tests-irs2-0) to broker 1002, since it is offline. (kafka.controller.ControllerChannelManager)
      [2018-11-27 13:15:18,031] DEBUG [Controller id=1001] Unregister BrokerModifications handler for Vector(1002) (kafka.controller.KafkaController)
      [2018-11-27 13:15:23,676] INFO [Controller id=1001] Newly added brokers: 1002, deleted brokers: , all live brokers: 1001,1002,1003 (kafka.controller.KafkaController)
      [2018-11-27 13:15:23,676] DEBUG [Channel manager on controller 1001]: Controller 1001 trying to connect to broker 1002 (kafka.controller.ControllerChannelManager)
      [2018-11-27 13:15:23,680] INFO [Controller id=1001] New broker startup callback for 1002 (kafka.controller.KafkaController)
      [2018-11-27 13:15:23,680] INFO [RequestSendThread controllerId=1001] Starting (kafka.controller.RequestSendThread)
      

       

       

      Looking at the code it goes down to:

      val minIsr = leaderReplica.log.get.config.minInSyncReplicas
      if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
        /*
        * The topic may be configured not to accept messages if there are not enough replicas in ISR
        * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
        */
        if (minIsr <= curInSyncReplicas.size)
          (true, Errors.NONE)
        else
         (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
      } else
        (false, Errors.NONE)

      So all should be good as soon as we have `curInSyncReplicas`, except that here is how highWatermark seems to be updated:

      val allLogEndOffsets = assignedReplicas.filter { replica =>
        curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
      }.map(_.logEndOffset)
      val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)

      This would mean that when a replica is stuck, we can increase the latency of every producer by replicaLagTimeMaxMs until we basically mark this replica as "dead" and do not account for it anymore.

      I guess what we really want here is to check the offset on only 2 replicas.

      This also means that the latency of the producer is currently also the maximum latency of each of the replicas.

      What do you think?

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            iksaif Corentin Chary
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: