Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2575

inconsistant offset count in replication-offset-checkpoint during lead election leads to huge exceptions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.8.2.1
    • None
    • replication
    • None

    Description

      We have 3 brokers, more than 100 topics in production, the default partition number is 24 for each topic, the replication factor is 3.

      We noticed the following errors in recent days.
      2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr request correlationId 438501 received from controller 2 epoch 12 for partition [LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger)
      java.io.IOException: Expected 3918 entries but found only 3904
      at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99)
      at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
      at kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
      at kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
      at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
      at kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171)
      at kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
      at kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
      at kafka.utils.Utils$.inLock(Utils.scala:535)
      at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
      at kafka.cluster.Partition.makeLeader(Partition.scala:163)
      at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427)
      at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426)
      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
      at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
      at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426)
      at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378)
      at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:63)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
      at java.lang.Thread.run(Thread.java:745)

      It occurs in LOGIST.DELIVERY.SUBSCRIBE partition election,
      then it repeatly pring out the error message:
      2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 14943530 from client ReplicaFetcherThread-2-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 (kafka.server.ReplicaManager)
      2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 (kafka.server.ReplicaManager)
      2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 (kafka.server.ReplicaManager)
      2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 13477660 from client ReplicaFetcherThread-2-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 (kafka.server.ReplicaManager)
      2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 (kafka.server.ReplicaManager)
      2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 (kafka.server.ReplicaManager)
      2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 14988525 from client ReplicaFetcherThread-3-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 (kafka.server.ReplicaManager)
      2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
      kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 0's position -1 since the replica is not recognized to be one of the assigned replicas for partition [LOGIST.DELIVERY.SUBSCRIBE,1]
      at kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
      at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
      at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
      at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
      at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
      at kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
      at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
      at java.lang.Thread.run(Thread.java:745)
      2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
      kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 0's position -1 since the replica is not recognized to be one of the assigned replicas for partition [LOGIST.DELIVERY.SUBSCRIBE,4]
      at kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
      at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
      at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
      at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
      at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
      at kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
      at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
      at java.lang.Thread.run(Thread.java:745)

      I checked the kafka source code:OffsetCheckpoint.scala, ReplicaManager.scala, LogManager.Scala, the replication manager write the offsets of all the partitions to replication-offset-checkpoint every 5 seconds, and it has the internel lock for this file for every OffsetCheckpoint, it shoud be impossible that the offset count is 3918, but the actual count of offset entries is 3904? Is it the multihread issue that some other thread flush the content to the same file due to the internal lock in OffsetCheckPoint? I'm not familiar with the Scala, but the

      class OffsetCheckpoint(val file: File) extends Logging {
      private val lock = new Object()

      it looks like the instance lock not the static class lock.

      If it's the issue in Kafka, is there any quick work around for this problem?
      We restart this broker, the error was disappearred, but the replicas for this topic is not that correct although it could produce and consume the message.

      Please let me know if you need more information.

      Thanks and best regards,
      Warren

      Attachments

        Activity

          People

            Unassigned Unassigned
            Warren Warren Jin
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: