Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1561

Data Loss for Incremented Replica Factor and Leader Election

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      This is reported on the mailing list (thanks to Jad).

      Hi,

      I have a test that continuously sends messages to one broker, brings up
      another broker, and adds it as a replica for all partitions, with it being
      the preferred replica for some. I have auto.leader.rebalance.enable=true,
      so replica election gets triggered. Data is being pumped to the old broker
      all the while. It seems that some data gets lost while switching over to
      the new leader. Is this a bug, or do I have something misconfigured? I also
      have request.required.acks=-1 on the producer.

      Here's what I think is happening:

      1. Producer writes message to broker 0, [EventServiceUpsertTopic,13], w/
      broker 0 currently leader, with ISR=(0), so write returns successfully,
      even when acks = -1. Correlation id 35836

      Producer log:

      [2014-07-24 14:44:26,991] [DEBUG] [dw-97 - PATCH
      /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
      [kafka.producer.BrokerPartitionInfo] Partition
      [EventServiceUpsertTopic,13] has leader 0

      [2014-07-24 14:44:26,993] [DEBUG] [dw-97 - PATCH
      /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
      [k.producer.async.DefaultEventHandler] Producer sent messages with
      correlation id 35836 for topics [EventServiceUpsertTopic,13] to broker 0 on
      localhost:56821
      2. Broker 1 is still catching up

      Broker 0 Log:

      [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3]
      [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker
      0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New hw is 971.
      All leo's are 975,971

      [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3]
      [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in 0 ms

      [2014-07-24 14:44:26,992] [DEBUG] [kafka-processor-56821-0]
      [kafka.request.logger] Completed request:Name: ProducerRequest; Version:
      0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; AckTimeoutMs: 10000
      ms from client /127.0.0.1:57086
      ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
      3. Leader election is triggered by the scheduler:

      Broker 0 Log:

      [2014-07-24 14:44:26,991] [INFO ] [kafka-scheduler-0]
      [k.c.PreferredReplicaPartitionLeaderSelector]
      [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for partition [
      EventServiceUpsertTopic,13] is not the preferred replica. Trigerring
      preferred replica leader election

      [2014-07-24 14:44:26,993] [DEBUG] [kafka-scheduler-0]
      [kafka.utils.ZkUtils$] Conditional update of path
      /brokers/topics/EventServiceUpsertTopic/partitions/13/state with value

      Unknown macro: {"controller_epoch"}

      and expected version 3 succeeded, returning the new version: 4

      [2014-07-24 14:44:26,994] [DEBUG] [kafka-scheduler-0]
      [k.controller.PartitionStateMachine] [Partition state machine on
      Controller 0]: After leader election, leader cache is updated to
      Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>)

      [2014-07-24 14:44:26,994] [INFO ] [kafka-scheduler-0]
      [kafka.controller.KafkaController] [Controller 0]: Partition [
      EventServiceUpsertTopic,13] completed preferred replica leader election.
      New leader is 1
      4. Broker 1 is still behind, but it sets the high water mark to 971!!!

      Broker 1 Log:

      [2014-07-24 14:44:26,999] [INFO ] [kafka-request-handler-6]
      [kafka.server.ReplicaFetcherManager] [ReplicaFetcherManager on broker 1]
      Removed fetcher for partitions [EventServiceUpsertTopic,13]

      [2014-07-24 14:44:27,000] [DEBUG] [kafka-request-handler-6]
      [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker
      1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New hw is -1.
      All leo's are -1,971

      [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3]
      [kafka.server.KafkaApis] [KafkaApi-1] Maybe update partition HW due to
      fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId:
      ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes;
      RequestInfo: [EventServiceUpsertTopic,13] ->
      PartitionFetchInfo(971,1048576), <Snipped>

      [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3]
      [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker
      1: Recording follower 0 position 971 for partition [
      EventServiceUpsertTopic,13].

      [2014-07-24 14:44:27,100] [DEBUG] [kafka-request-handler-3]
      [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker
      1: Highwatermark for partition [EventServiceUpsertTopic,13] updated to 971
      5. Consumer is none the wiser. All data that was in offsets 972-975 doesn't
      show up!

      I tried this with 2 initial replicas, and adding a 3rd which is supposed to
      be the leader for some new partitions, and this problem also happens there.
      The log on the old leader gets truncated to the offset on the new leader.
      What's the solution? Can I make a new broker leader for partitions that are
      currently active without losing data?

      Thanks,
      Jad.

      1. broker0.log
        9 kB
        Jad Naous
      2. broker2.log
        27 kB
        Jad Naous
      3. consumer.log
        5 kB
        Jad Naous
      4. producer.log
        7 kB
        Jad Naous

        Activity

        Hide
        jnaous Jad Naous added a comment -

        Here's some more detailed info on what the latest test does (from which these logs are obtained):

        0) Start two brokers, one producer, one consumer. Topic has 20 partitions, using default partitioning scheme (which seems to send data to only a couple of partitions when the keys are null, but that doesn't matter for this test).
        1) Start a data generator sending data through Kafka continuously
        2) Start a new broker
        3) Reassign partitions:

        {"version": 1, "partitions":[
            {"topic":"EventServiceUpsertTopic","partition":0,        "replicas": [0, 1, 2]},
            {"topic":"EventServiceUpsertTopic","partition":1,        "replicas": [1, 2, 0]},
            {"topic":"EventServiceUpsertTopic","partition":2,        "replicas": [2, 0, 1]},
            {"topic":"EventServiceUpsertTopic","partition":3,        "replicas": [0, 1, 2]},
            {"topic":"EventServiceUpsertTopic","partition":4,        "replicas": [1, 2, 0]},
            {"topic":"EventServiceUpsertTopic","partition":5,        "replicas": [2, 0, 1]},
            {"topic":"EventServiceUpsertTopic","partition":6,        "replicas": [0, 1, 2]},
            {"topic":"EventServiceUpsertTopic","partition":7,        "replicas": [1, 2, 0]},
            {"topic":"EventServiceUpsertTopic","partition":8,        "replicas": [2, 0, 1]},
            {"topic":"EventServiceUpsertTopic","partition":9,        "replicas": [0, 1, 2]},
            {"topic":"EventServiceUpsertTopic","partition":10,        "replicas": [1, 2, 0]},
            {"topic":"EventServiceUpsertTopic","partition":11,        "replicas": [2, 0, 1]},
            {"topic":"EventServiceUpsertTopic","partition":12,        "replicas": [0, 1, 2]},
            {"topic":"EventServiceUpsertTopic","partition":13,        "replicas": [1, 2, 0]},
            {"topic":"EventServiceUpsertTopic","partition":14,        "replicas": [2, 0, 1]},
            {"topic":"EventServiceUpsertTopic","partition":15,        "replicas": [0, 1, 2]},
            {"topic":"EventServiceUpsertTopic","partition":16,        "replicas": [1, 2, 0]},
            {"topic":"EventServiceUpsertTopic","partition":17,        "replicas": [2, 0, 1]},
            {"topic":"EventServiceUpsertTopic","partition":18,        "replicas": [0, 1, 2]},
            {"topic":"EventServiceUpsertTopic","partition":19,        "replicas": [1, 2, 0]}]}
        

        4) Wait until reassignment is complete (i.e. until ZkUtils.getPartitionsBeingReassigned() returns empty map)
        5) Wait until all replicas are caught up (i.e. until ZkUtils.getLeaderAndIsrForPartition() returns all brokers in the ISR for each partition)
        6) Trigger leader re-election by calling the PreferredReplicaLeaderElectionCommand
        7) Wait until all the leaders are the preferred leaders for partitions according to the replica reassignment from step 3
        8) Stop the data generator
        9) Check that all the data was consumed

        You can see from the producer.log that the data: {{

        {"field1": ["10"], "idField": "id-5-59"}

        }} was sent to broker0 successfully, but the consumer never sees it.

        Show
        jnaous Jad Naous added a comment - Here's some more detailed info on what the latest test does (from which these logs are obtained): 0) Start two brokers, one producer, one consumer. Topic has 20 partitions, using default partitioning scheme (which seems to send data to only a couple of partitions when the keys are null, but that doesn't matter for this test). 1) Start a data generator sending data through Kafka continuously 2) Start a new broker 3) Reassign partitions: { "version" : 1, "partitions" :[ { "topic" : "EventServiceUpsertTopic" , "partition" :0, "replicas" : [0, 1, 2]}, { "topic" : "EventServiceUpsertTopic" , "partition" :1, "replicas" : [1, 2, 0]}, { "topic" : "EventServiceUpsertTopic" , "partition" :2, "replicas" : [2, 0, 1]}, { "topic" : "EventServiceUpsertTopic" , "partition" :3, "replicas" : [0, 1, 2]}, { "topic" : "EventServiceUpsertTopic" , "partition" :4, "replicas" : [1, 2, 0]}, { "topic" : "EventServiceUpsertTopic" , "partition" :5, "replicas" : [2, 0, 1]}, { "topic" : "EventServiceUpsertTopic" , "partition" :6, "replicas" : [0, 1, 2]}, { "topic" : "EventServiceUpsertTopic" , "partition" :7, "replicas" : [1, 2, 0]}, { "topic" : "EventServiceUpsertTopic" , "partition" :8, "replicas" : [2, 0, 1]}, { "topic" : "EventServiceUpsertTopic" , "partition" :9, "replicas" : [0, 1, 2]}, { "topic" : "EventServiceUpsertTopic" , "partition" :10, "replicas" : [1, 2, 0]}, { "topic" : "EventServiceUpsertTopic" , "partition" :11, "replicas" : [2, 0, 1]}, { "topic" : "EventServiceUpsertTopic" , "partition" :12, "replicas" : [0, 1, 2]}, { "topic" : "EventServiceUpsertTopic" , "partition" :13, "replicas" : [1, 2, 0]}, { "topic" : "EventServiceUpsertTopic" , "partition" :14, "replicas" : [2, 0, 1]}, { "topic" : "EventServiceUpsertTopic" , "partition" :15, "replicas" : [0, 1, 2]}, { "topic" : "EventServiceUpsertTopic" , "partition" :16, "replicas" : [1, 2, 0]}, { "topic" : "EventServiceUpsertTopic" , "partition" :17, "replicas" : [2, 0, 1]}, { "topic" : "EventServiceUpsertTopic" , "partition" :18, "replicas" : [0, 1, 2]}, { "topic" : "EventServiceUpsertTopic" , "partition" :19, "replicas" : [1, 2, 0]}]} 4) Wait until reassignment is complete (i.e. until ZkUtils.getPartitionsBeingReassigned() returns empty map) 5) Wait until all replicas are caught up (i.e. until ZkUtils.getLeaderAndIsrForPartition() returns all brokers in the ISR for each partition) 6) Trigger leader re-election by calling the PreferredReplicaLeaderElectionCommand 7) Wait until all the leaders are the preferred leaders for partitions according to the replica reassignment from step 3 8) Stop the data generator 9) Check that all the data was consumed You can see from the producer.log that the data: {{ {"field1": ["10"], "idField": "id-5-59"} }} was sent to broker0 successfully, but the consumer never sees it.
        Hide
        junrao Jun Rao added a comment -

        Hmm, interesting. From the description, when the LEOs on the leader are 574 and 571, the HW on broker 0 is still at 571. This suggests that messages between 572 and 574 haven't been committed and the producer shouldn't have received a successful ack with acks=-1.

        Show
        junrao Jun Rao added a comment - Hmm, interesting. From the description, when the LEOs on the leader are 574 and 571, the HW on broker 0 is still at 571. This suggests that messages between 572 and 574 haven't been committed and the producer shouldn't have received a successful ack with acks=-1.
        Hide
        vinayak10 Vinayak Sharma added a comment -

        Hi,
        Jun Rao
        I have Cluster consisting of 3 Zookeeper nodes and 2 Brokers running on AWS instances. When I am trying to scale Brokers from 2 to 3 while simultaneously producing and consuming from topic I am experiencing loss of messages.

        Topic :
        Partitions - 40
        Replication Factor - 2

        I am using console producer to produce 1000 messages at a time to the topic. I do this for 200 secs and then print total no of messages produced, while simultaneously I run a script to consume from the same topic.While these scripts are running I reassign the partitions of the same topic from 2 brokers(0,1) to 3 brokers(0,1,2 ). While these reassignment of partitions is running I see producer throwing the following logs:
        [2017-05-31 13:31:13,311] WARN Got error produce response with correlation id 4 on topic-partition Topic6-39, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,338] WARN Got error produce response with correlation id 4 on topic-partition Topic6-27, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,338] WARN Got error produce response with correlation id 4 on topic-partition Topic6-21, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,338] WARN Got error produce response with correlation id 4 on topic-partition Topic6-15, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,339] WARN Got error produce response with correlation id 6 on topic-partition Topic6-36, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,339] WARN Got error produce response with correlation id 6 on topic-partition Topic6-6, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,339] WARN Got error produce response with correlation id 6 on topic-partition Topic6-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,342] WARN Got error produce response with correlation id 6 on topic-partition Topic6-18, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,343] WARN Got error produce response with correlation id 6 on topic-partition Topic6-24, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
        [2017-05-31 13:31:13,401] WARN Got error produce response with correlation id 11 on topic-partition Topic6-3, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)

        After 200 secs I see that producer says the total number of messages produced are 61000 while consumed messages are 60974.

        Just to make sure whether it is consumer's fault or producer's fault, I run another console consumer on the same topic from the beginning and observer that there were actually 60974 messages in that topic. So that proves that the messages were lost at the producer end.

        I also tried the same test with adding the following property to the topic being used:
        unclean.leader.election.enable = false

        And I also changed the "leader.imbalance.check.interval.seconds" in server.properties from 30 secs to 1 sec.

        Still the loss of messages persist.

        I have posted these issue on confluent-platform(Thread name = Loss of data while Scaling Kafka Brokers) also but have not got any reply yet.
        Please tell me how can I completely avoid this loss of messages.
        Thanks.

        Show
        vinayak10 Vinayak Sharma added a comment - Hi, Jun Rao I have Cluster consisting of 3 Zookeeper nodes and 2 Brokers running on AWS instances. When I am trying to scale Brokers from 2 to 3 while simultaneously producing and consuming from topic I am experiencing loss of messages. Topic : Partitions - 40 Replication Factor - 2 I am using console producer to produce 1000 messages at a time to the topic. I do this for 200 secs and then print total no of messages produced, while simultaneously I run a script to consume from the same topic.While these scripts are running I reassign the partitions of the same topic from 2 brokers(0,1) to 3 brokers(0,1,2 ). While these reassignment of partitions is running I see producer throwing the following logs: [2017-05-31 13:31:13,311] WARN Got error produce response with correlation id 4 on topic-partition Topic6-39, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,338] WARN Got error produce response with correlation id 4 on topic-partition Topic6-27, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,338] WARN Got error produce response with correlation id 4 on topic-partition Topic6-21, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,338] WARN Got error produce response with correlation id 4 on topic-partition Topic6-15, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,339] WARN Got error produce response with correlation id 6 on topic-partition Topic6-36, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,339] WARN Got error produce response with correlation id 6 on topic-partition Topic6-6, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,339] WARN Got error produce response with correlation id 6 on topic-partition Topic6-0, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,342] WARN Got error produce response with correlation id 6 on topic-partition Topic6-18, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,343] WARN Got error produce response with correlation id 6 on topic-partition Topic6-24, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) [2017-05-31 13:31:13,401] WARN Got error produce response with correlation id 11 on topic-partition Topic6-3, retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender) After 200 secs I see that producer says the total number of messages produced are 61000 while consumed messages are 60974. Just to make sure whether it is consumer's fault or producer's fault, I run another console consumer on the same topic from the beginning and observer that there were actually 60974 messages in that topic. So that proves that the messages were lost at the producer end. I also tried the same test with adding the following property to the topic being used: unclean.leader.election.enable = false And I also changed the "leader.imbalance.check.interval.seconds" in server.properties from 30 secs to 1 sec. Still the loss of messages persist. I have posted these issue on confluent-platform(Thread name = Loss of data while Scaling Kafka Brokers) also but have not got any reply yet. Please tell me how can I completely avoid this loss of messages. Thanks.
        Hide
        junrao Jun Rao added a comment -

        Vinayak Sharma, did you set acks=all in the producer?

        Show
        junrao Jun Rao added a comment - Vinayak Sharma , did you set acks=all in the producer?
        Hide
        vinayak10 Vinayak Sharma added a comment -

        Jun Rao I tried with acks=all, there is no loss of messages now.

        But, what I see now is some messages are produced more than once now.

        this is what I did:
        I produced using the following command:
        bin/kafka-console-producer.sh --broker-list 172.31.15.135:9092,172.31.17.243:9092 --topic Topic --message-send-max-retries 1000 --request-timeout-ms 60000 --request-required-acks "all" --max-block-ms 9223372036854775807

        I also set min.insync.replicas = 2 in topic and broker configurations.

        While reassigning the partitions I found out that number of messages produced were 730000 while consumed were 730012.

        Can you tell me where did I go wrong?

        Thanks.

        Show
        vinayak10 Vinayak Sharma added a comment - Jun Rao I tried with acks=all, there is no loss of messages now. But, what I see now is some messages are produced more than once now. this is what I did: I produced using the following command: bin/kafka-console-producer.sh --broker-list 172.31.15.135:9092,172.31.17.243:9092 --topic Topic --message-send-max-retries 1000 --request-timeout-ms 60000 --request-required-acks "all" --max-block-ms 9223372036854775807 I also set min.insync.replicas = 2 in topic and broker configurations. While reassigning the partitions I found out that number of messages produced were 730000 while consumed were 730012. Can you tell me where did I go wrong? Thanks.
        Hide
        junrao Jun Rao added a comment -

        Vinayak Sharma, currently, duplicates can be introduced during producer retry on transient failure such as leader changes. In the 0.11.0.0 release, we are introducing an idempotent producer that can avoid duplicates during retry.

        Show
        junrao Jun Rao added a comment - Vinayak Sharma , currently, duplicates can be introduced during producer retry on transient failure such as leader changes. In the 0.11.0.0 release, we are introducing an idempotent producer that can avoid duplicates during retry.
        Hide
        vinayak10 Vinayak Sharma added a comment -

        Jun Rao I will look forward to the release.
        Thanks alot for your replies. It really helped.

        Show
        vinayak10 Vinayak Sharma added a comment - Jun Rao I will look forward to the release. Thanks alot for your replies. It really helped.

          People

          • Assignee:
            guozhang Guozhang Wang
            Reporter:
            guozhang Guozhang Wang
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development