Kafka
  1. Kafka
  2. KAFKA-772

System Test Transient Failure on testcase_0122

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:

      Description

      • This test case is failing randomly in the past few weeks. Please note there is a small % data loss allowance for the test case with Ack = 1. But the failure in this case is the mismatch of log segment checksum across the replicas.
      • Test description:
        3 brokers cluster
        Replication factor = 3
        No. topic = 2
        No. partitions = 3
        Controlled failure (kill -15)
        Ack = 1
      • Test case output
        _test_case_name : testcase_0122
        _test_class_name : ReplicaBasicTest
        arg : auto_create_topic : true
        arg : bounce_broker : true
        arg : broker_type : leader
        arg : message_producing_free_time_sec : 15
        arg : num_iteration : 3
        arg : num_partition : 3
        arg : replica_factor : 3
        arg : sleep_seconds_between_producer_calls : 1
        validation_status :
        Leader Election Latency - iter 1 brokerid 3 : 377.00 ms
        Leader Election Latency - iter 2 brokerid 1 : 374.00 ms
        Leader Election Latency - iter 3 brokerid 2 : 384.00 ms
        Leader Election Latency MAX : 384.00
        Leader Election Latency MIN : 374.00
        Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r1.log : 1750
        Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r2.log : 1750
        Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r3.log : 1750
        Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r1.log : 1750
        Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r2.log : 1750
        Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r3.log : 1750
        Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r1.log : 1500
        Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r2.log : 1500
        Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r3.log : 1500
        Unique messages from consumer on [test_2] : 5000
        Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r1.log : 1714
        Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r2.log : 1714
        Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r3.log : 1680
        Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r1.log : 1708
        Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r2.log : 1708
        Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r3.log : 1708
        Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r1.log : 1469
        Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r2.log : 1469
        Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r3.log : 1469
        Unique messages from producer on [test_2] : 4900
        Validate for data matched on topic [test_1] across replicas : PASSED
        Validate for data matched on topic [test_2] : FAILED
        Validate for data matched on topic [test_2] across replicas : FAILED
        Validate for merged log segment checksum in cluster [source] : FAILED
        Validate leader election successful : PASSED
      1. KAFKA-772.patch
        1 kB
        Sriram Subramanian
      2. testcase_0122.tar.gz
        1.02 MB
        John Fung
      3. testcase_0125.tar.gz
        1.35 MB
        John Fung

        Activity

        Hide
        John Fung added a comment -

        Attached a tar file for all log4j messages and data log files

        Show
        John Fung added a comment - Attached a tar file for all log4j messages and data log files
        Hide
        Sriram Subramanian added a comment -

        There are two issues with the given logs. Both the issues are for topic 2 - partition 0 on broker 3.

        1. Segment 1 starting with logical offset 0 on broker 3 does not have continuous logical offsets. Logical offset 699 is followed by 734.
        2. Segment 2 starting with logical offset 974 on broker 3 is 0 bytes while that in broker 2 has values from 974 to 1713. Broker 3 has segment 3 starting with logical offset 1012 to 1713. Broker 2 does not have any third segment.

        We have run the test in a loop multiple times for a day but have not been able to repro this on the local box. I am still investigating how the logs could end up in this state during continuous restarts with ack = 0 and replication factor = 3

        Show
        Sriram Subramanian added a comment - There are two issues with the given logs. Both the issues are for topic 2 - partition 0 on broker 3. 1. Segment 1 starting with logical offset 0 on broker 3 does not have continuous logical offsets. Logical offset 699 is followed by 734. 2. Segment 2 starting with logical offset 974 on broker 3 is 0 bytes while that in broker 2 has values from 974 to 1713. Broker 3 has segment 3 starting with logical offset 1012 to 1713. Broker 2 does not have any third segment. We have run the test in a loop multiple times for a day but have not been able to repro this on the local box. I am still investigating how the logs could end up in this state during continuous restarts with ack = 0 and replication factor = 3
        Hide
        John Fung added a comment -

        There is a similar failure in testcase_0125 yesterday in our distributed environment. Attached the log4j messages and data log segment files for reference.

        The failure is as follows (similar to testcase_0122):

        Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r1.log : 1715
        Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r2.log : 1715
        Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r3.log : 1715
        Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r1.log : 1711
        Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r2.log : 1711
        Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r3.log : 1711
        Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r1.log : 1469
        Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r2.log : 1469
        Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r3.log : 1469
        Unique messages from consumer on [test_2] : 4895
        Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r1.log : 1715
        Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r2.log : 1715
        Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r3.log : 1682
        Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r1.log : 1708
        Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r2.log : 1708
        Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r3.log : 1708
        Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r1.log : 1467
        Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r2.log : 1467
        Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r3.log : 1467
        Unique messages from producer on [test_2] : 4900
        Validate for data matched on topic [test_1] across replicas : PASSED
        Validate for data matched on topic [test_2] : PASSED
        Validate for data matched on topic [test_2] across replicas : FAILED
        Validate for merged log segment checksum in cluster [source] : FAILED
        Validate leader election successful : PASSED

        Show
        John Fung added a comment - There is a similar failure in testcase_0125 yesterday in our distributed environment. Attached the log4j messages and data log segment files for reference. The failure is as follows (similar to testcase_0122): Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r1.log : 1715 Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r2.log : 1715 Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r3.log : 1715 Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r1.log : 1711 Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r2.log : 1711 Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r3.log : 1711 Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r1.log : 1469 Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r2.log : 1469 Unique messages from consumer on [test_1] at simple_consumer_test_1-2_r3.log : 1469 Unique messages from consumer on [test_2] : 4895 Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r1.log : 1715 Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r2.log : 1715 Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r3.log : 1682 Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r1.log : 1708 Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r2.log : 1708 Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r3.log : 1708 Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r1.log : 1467 Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r2.log : 1467 Unique messages from consumer on [test_2] at simple_consumer_test_2-2_r3.log : 1467 Unique messages from producer on [test_2] : 4900 Validate for data matched on topic [test_1] across replicas : PASSED Validate for data matched on topic [test_2] : PASSED Validate for data matched on topic [test_2] across replicas : FAILED Validate for merged log segment checksum in cluster [source] : FAILED Validate leader election successful : PASSED
        Hide
        Neha Narkhede added a comment -

        It would be useful to maybe add a WARN message and log the topic, partition, replica id, current offset, fetch offset when this happens. Other than that, this fix looks good.

        Show
        Neha Narkhede added a comment - It would be useful to maybe add a WARN message and log the topic, partition, replica id, current offset, fetch offset when this happens. Other than that, this fix looks good.
        Hide
        Sriram Subramanian added a comment -

        I would like WARN to be actionable. Do you think it would be useful in this case? I am thinking what we would do if we saw this message in the log now that we know this is a valid case.

        Show
        Sriram Subramanian added a comment - I would like WARN to be actionable. Do you think it would be useful in this case? I am thinking what we would do if we saw this message in the log now that we know this is a valid case.
        Hide
        Sriram Subramanian added a comment -

        The test failed on Monday and then again failed on Friday. It was clear that the issue was timing related. We tried to reproduce the failure on the local box (repeatedly running the test) but could not reproduce it. I did some code browsing but did not have much luck. So I decided to setup tracing and run the test repeatedly in a distributed environment over the weekend and was hoping that it would fail. Luckily, it did and the trace logs proved to be useful in identifying the issue. Thanks to John for setting this up.

        What you see below are excerpts from the trace log which pertain to this failure at different points in time. In this particular failure, topic_2 / partitions 2 had missing logical offsets from 570 to 582 on broker 3 (3 brokers in total).

        current fetch offset = 582
        current HW = 570
        Leader for topic_2/partition 2 = broker 2

        1. The lines below show the Fetch request that was issued by broker 3 to broker 2 just before broker 1 was shutdown. The requested offset is 582 for [test_2,2].

        [2013-03-02 12:37:56,034] TRACE [ReplicaFetcherThread-0-2], issuing to broker 2 of fetch request Name: FetchRequest; Version: 0; CorrelationId: 121; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 4096 bytes; RequestInfo: [test_1,0] -> PartitionFetchInfo(700,1048576),[test_2,1] -> PartitionFetchInfo(677,1048576),[test_2,2] -> PartitionFetchInfo(582,1048576),[test_2,0] -> PartitionFetchInfo(679,1048576),[test_1,2] -> PartitionFetchInfo(600,1048576),[test_1,1] -> PartitionFetchInfo(699,1048576) (kafka.server.ReplicaFetcherThread)

        2. Broker 1 is shutdown and broker 3 handles leader and isr request. Note that [test_2,2] still follows broker 2 but we still issue a makefollower call for it.

        [2013-03-02 12:37:56,086] INFO Replica Manager on Broker 3: Handling leader and isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; ClientId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: (test_1,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" }

        ,1),3),(test_2,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" }

        ,2),3),(test_2,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" }

        ,1),3),(test_2,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" }

        ,2),3),(test_1,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" }

        ,2),3),(test_1,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" }

        ,1),3); Leaders: id:2,host:xxxx(kafka.server.ReplicaManager)

        3. The leader and isr request results in removing the fetcher to broker 2 for [test_2,2], truncating the log to high watermark (570) and then adding back the fetcher to the same broker.

        [2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] removing fetcher on topic test_2, partition 2 (kafka.server.ReplicaFetcherManager)
        [2013-03-02 12:37:56,088] INFO [Kafka Log on Broker 3], Truncated log segment /tmp/kafka_server_3_logs/test_2-2/00000000000000000000.log to target offset 570 (kafka.log.Log)
        [2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] adding fetcher on topic test_2, partion 2, initOffset 570 to broker 2 with fetcherId 0 (kafka.server.ReplicaFetcherManager)

        4. The leader and isr request is completed at this point of time.

        [2013-03-02 12:37:56,090] INFO Replica Manager on Broker 3: Completed leader and isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; ClientId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: (test_1,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" }

        ,1),3),(test_2,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" }

        ,2),3),(test_2,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" }

        ,1),3),(test_2,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" }

        ,2),3),(test_1,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" }

        ,2),3),(test_1,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch(

        { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" }

        ,1),3); Leaders: id:2,host:xxxx (kafka.server.ReplicaManager)

        5. A log append happens at offset 582 though the nextOffset for the log is at 570. This append actually pertains to the fetch request at step 1. This explains the gap in the log.

        [2013-03-02 12:37:56,098] TRACE [Kafka Log on Broker 3], Appending message set to test_2-2 offset: 582 nextOffset: 570 messageSet: ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 1408289663, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 cap=500]),582), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 3696400058, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 cap=500]),583), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 2403920749, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 cap=500]),584), ) (kafka.log.Log)

        From the set of steps above, it is clear that some thing is causing the fetch request at step 1 to complete even though step 2 and 3 removed the fetcher for that topic,partition.

        Looking at the code now it becomes obvious. The race condition is between the thread that removes the fetcher, truncates the log and adds the fetcher back and the thread that fetches bytes from the leader. Follow the steps below to understand what is happening.

        Partition.Scala

        replicaFetcherManager.removeFetcher(topic, partitionId) --> step 2 : Removes the topic,partition – offset mapping from partitionMap in AbstractFetcherThread
        // make sure local replica exists
        val localReplica = getOrCreateReplica()
        localReplica.log.get.truncateTo(localReplica.highWatermark) --> step 3 : Truncates to offset 570
        inSyncReplicas = Set.empty[Replica]
        leaderEpoch = leaderAndIsr.leaderEpoch
        zkVersion = leaderAndIsr.zkVersion
        leaderReplicaIdOpt = Some(newLeaderBrokerId)
        // start fetcher thread to current leader
        replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) --> step 4: Sets the new fetcher to fetch from the log end offset which is at 570 at this point

        AbstractFetcherThread.Scala

        private def processFetchRequest(fetchRequest: FetchRequest) {
        val partitionsWithError = new mutable.HashSet[TopicAndPartition]
        var response: FetchResponse = null
        try

        { trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) }

        catch {
        case t =>
        debug("error in fetch %s".format(fetchRequest), t)
        if (isRunning.get) {
        partitionMapLock synchronized

        { partitionsWithError ++= partitionMap.keys }

        }
        }
        fetcherStats.requestRate.mark() --> step 1 : Fetch completes. Fetch request is from offset 582.

        if (response != null) {
        // process fetched data
        partitionMapLock.lock() ---> step 5: This is where the fetch request is waiting when the addFetcher in Partition.Scala is executing above
        try {
        response.data.foreach {
        case(topicAndPartition, partitionData) =>
        val (topic, partitionId) = topicAndPartition.asTuple
        val currentOffset = partitionMap.get(topicAndPartition)
        if (currentOffset.isDefined) {
        partitionData.error match {
        case ErrorMapping.NoError =>
        val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
        val validBytes = messages.validBytes
        val newOffset = messages.lastOption match

        { --> step 6: The newOffset is set to 587 and partitionMap is updated case Some(m: MessageAndOffset) => m.nextOffset case None => currentOffset.get }

        partitionMap.put(topicAndPartition, newOffset)
        fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
        fetcherStats.byteRate.mark(validBytes)
        // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
        processPartitionData(topicAndPartition, currentOffset.get, partitionData) --> step 7: This appends data to the log with logical offsets from 582 – 587. Note that the offset passed to this method is 570 (currentOffset). Hence all offset validation checks in processPartitionData passes.
        case ErrorMapping.OffsetOutOfRangeCode =>
        try

        { val newOffset = handleOffsetOutOfRange(topicAndPartition) partitionMap.put(topicAndPartition, newOffset) warn("current offset %d for topic %s partition %d out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) }

        catch

        { case e => warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition }

        case _ =>
        warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
        ErrorMapping.exceptionFor(partitionData.error))
        partitionsWithError += topicAndPartition
        }
        }
        }
        } finally

        { partitionMapLock.unlock() }

        }

        Show
        Sriram Subramanian added a comment - The test failed on Monday and then again failed on Friday. It was clear that the issue was timing related. We tried to reproduce the failure on the local box (repeatedly running the test) but could not reproduce it. I did some code browsing but did not have much luck. So I decided to setup tracing and run the test repeatedly in a distributed environment over the weekend and was hoping that it would fail. Luckily, it did and the trace logs proved to be useful in identifying the issue. Thanks to John for setting this up. What you see below are excerpts from the trace log which pertain to this failure at different points in time. In this particular failure, topic_2 / partitions 2 had missing logical offsets from 570 to 582 on broker 3 (3 brokers in total). current fetch offset = 582 current HW = 570 Leader for topic_2/partition 2 = broker 2 1. The lines below show the Fetch request that was issued by broker 3 to broker 2 just before broker 1 was shutdown. The requested offset is 582 for [test_2,2] . [2013-03-02 12:37:56,034] TRACE [ReplicaFetcherThread-0-2] , issuing to broker 2 of fetch request Name: FetchRequest; Version: 0; CorrelationId: 121; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 4096 bytes; RequestInfo: [test_1,0] -> PartitionFetchInfo(700,1048576), [test_2,1] -> PartitionFetchInfo(677,1048576), [test_2,2] -> PartitionFetchInfo(582,1048576), [test_2,0] -> PartitionFetchInfo(679,1048576), [test_1,2] -> PartitionFetchInfo(600,1048576), [test_1,1] -> PartitionFetchInfo(699,1048576) (kafka.server.ReplicaFetcherThread) 2. Broker 1 is shutdown and broker 3 handles leader and isr request. Note that [test_2,2] still follows broker 2 but we still issue a makefollower call for it. [2013-03-02 12:37:56,086] INFO Replica Manager on Broker 3: Handling leader and isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; ClientId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: (test_1,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" } ,1),3),(test_2,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" } ,2),3),(test_2,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" } ,1),3),(test_2,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" } ,2),3),(test_1,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" } ,2),3),(test_1,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" } ,1),3); Leaders: id:2,host:xxxx(kafka.server.ReplicaManager) 3. The leader and isr request results in removing the fetcher to broker 2 for [test_2,2] , truncating the log to high watermark (570) and then adding back the fetcher to the same broker. [2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] removing fetcher on topic test_2, partition 2 (kafka.server.ReplicaFetcherManager) [2013-03-02 12:37:56,088] INFO [Kafka Log on Broker 3] , Truncated log segment /tmp/kafka_server_3_logs/test_2-2/00000000000000000000.log to target offset 570 (kafka.log.Log) [2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] adding fetcher on topic test_2, partion 2, initOffset 570 to broker 2 with fetcherId 0 (kafka.server.ReplicaFetcherManager) 4. The leader and isr request is completed at this point of time. [2013-03-02 12:37:56,090] INFO Replica Manager on Broker 3: Completed leader and isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; ClientId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: (test_1,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" } ,1),3),(test_2,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" } ,2),3),(test_2,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" } ,1),3),(test_2,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" } ,2),3),(test_1,2) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,3", "leader":"2", "leaderEpoch":"2" } ,2),3),(test_1,1) -> PartitionStateInfo(LeaderIsrAndControllerEpoch( { "ISR":"2,1,3", "leader":"2", "leaderEpoch":"1" } ,1),3); Leaders: id:2,host:xxxx (kafka.server.ReplicaManager) 5. A log append happens at offset 582 though the nextOffset for the log is at 570. This append actually pertains to the fetch request at step 1. This explains the gap in the log. [2013-03-02 12:37:56,098] TRACE [Kafka Log on Broker 3] , Appending message set to test_2-2 offset: 582 nextOffset: 570 messageSet: ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 1408289663, key = null, payload = java.nio.HeapByteBuffer [pos=0 lim=500 cap=500] ),582), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 3696400058, key = null, payload = java.nio.HeapByteBuffer [pos=0 lim=500 cap=500] ),583), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 2403920749, key = null, payload = java.nio.HeapByteBuffer [pos=0 lim=500 cap=500] ),584), ) (kafka.log.Log) From the set of steps above, it is clear that some thing is causing the fetch request at step 1 to complete even though step 2 and 3 removed the fetcher for that topic,partition. Looking at the code now it becomes obvious. The race condition is between the thread that removes the fetcher, truncates the log and adds the fetcher back and the thread that fetches bytes from the leader. Follow the steps below to understand what is happening. Partition.Scala replicaFetcherManager.removeFetcher(topic, partitionId) --> step 2 : Removes the topic,partition – offset mapping from partitionMap in AbstractFetcherThread // make sure local replica exists val localReplica = getOrCreateReplica() localReplica.log.get.truncateTo(localReplica.highWatermark) --> step 3 : Truncates to offset 570 inSyncReplicas = Set.empty [Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(newLeaderBrokerId) // start fetcher thread to current leader replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) --> step 4: Sets the new fetcher to fetch from the log end offset which is at 570 at this point AbstractFetcherThread.Scala private def processFetchRequest(fetchRequest: FetchRequest) { val partitionsWithError = new mutable.HashSet [TopicAndPartition] var response: FetchResponse = null try { trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { case t => debug("error in fetch %s".format(fetchRequest), t) if (isRunning.get) { partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } } } fetcherStats.requestRate.mark() --> step 1 : Fetch completes. Fetch request is from offset 582. if (response != null) { // process fetched data partitionMapLock.lock() ---> step 5: This is where the fetch request is waiting when the addFetcher in Partition.Scala is executing above try { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple val currentOffset = partitionMap.get(topicAndPartition) if (currentOffset.isDefined) { partitionData.error match { case ErrorMapping.NoError => val messages = partitionData.messages.asInstanceOf [ByteBufferMessageSet] val validBytes = messages.validBytes val newOffset = messages.lastOption match { --> step 6: The newOffset is set to 587 and partitionMap is updated case Some(m: MessageAndOffset) => m.nextOffset case None => currentOffset.get } partitionMap.put(topicAndPartition, newOffset) fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset fetcherStats.byteRate.mark(validBytes) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) --> step 7: This appends data to the log with logical offsets from 582 – 587. Note that the offset passed to this method is 570 (currentOffset). Hence all offset validation checks in processPartitionData passes. case ErrorMapping.OffsetOutOfRangeCode => try { val newOffset = handleOffsetOutOfRange(topicAndPartition) partitionMap.put(topicAndPartition, newOffset) warn("current offset %d for topic %s partition %d out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { case e => warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } case _ => warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), ErrorMapping.exceptionFor(partitionData.error)) partitionsWithError += topicAndPartition } } } } finally { partitionMapLock.unlock() } }
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for the patch. Committed to 0.8.
        Hide
        Neha Narkhede added a comment -

        Yeah, probably ok to skip the message

        Show
        Neha Narkhede added a comment - Yeah, probably ok to skip the message

          People

          • Assignee:
            Sriram Subramanian
            Reporter:
            John Fung
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development