Kafka
  1. Kafka
  2. KAFKA-628

System Test Failure Case 5005 (Mirror Maker bouncing) - Data Loss in ConsoleConsumer

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None
    1. kafka-628-reproduce-issue.patch
      0.7 kB
      John Fung
    2. log4j_and_data_logs.tar.gz
      952 kB
      John Fung

      Activity

      John Fung created issue -
      Hide
      John Fung added a comment -
      • There are data loss in testcase_5005 as shown below.

      validation_status :
      Unique messages from consumer on [test_1] at console_consumer.log : 1550
      Unique messages from consumer on [test_2] at console_consumer.log : 1650
      Unique messages from producer on [test_1] : 1700
      Unique messages from producer on [test_2] : 1700
      Validate for data matched on topic [test_1] : FAILED
      Validate for data matched on topic [test_2] : FAILED
      Validate for merged log segment checksum in cluster [source] : PASSED
      Validate for merged log segment checksum in cluster [target] : PASSED

      • The issue is reproducible in latest 0.8 branch:
        1. Download the latest 0.8 branch.
        2. Apply the attached patch from <kafka_home>: patch -p0 -i kafka-628-reproduce-issue.patch
        3. Build Kafka from <kafka_home>: ./sbt update package
        4. Run the testcase from <kafka_home>/system_test : python -u -B system_test_runner.py 2>&1 | tee system_test_output_`date +%s`.log
      Show
      John Fung added a comment - There are data loss in testcase_5005 as shown below. validation_status : Unique messages from consumer on [test_1] at console_consumer.log : 1550 Unique messages from consumer on [test_2] at console_consumer.log : 1650 Unique messages from producer on [test_1] : 1700 Unique messages from producer on [test_2] : 1700 Validate for data matched on topic [test_1] : FAILED Validate for data matched on topic [test_2] : FAILED Validate for merged log segment checksum in cluster [source] : PASSED Validate for merged log segment checksum in cluster [target] : PASSED The issue is reproducible in latest 0.8 branch: 1. Download the latest 0.8 branch. 2. Apply the attached patch from <kafka_home>: patch -p0 -i kafka-628-reproduce-issue.patch 3. Build Kafka from <kafka_home>: ./sbt update package 4. Run the testcase from <kafka_home>/system_test : python -u -B system_test_runner.py 2>&1 | tee system_test_output_`date +%s`.log
      John Fung made changes -
      Field Original Value New Value
      Attachment kafka-628-reproduce-issue.patch [ 12554250 ]
      John Fung made changes -
      Attachment log4j_and_data_logs.tar.gz [ 12554259 ]
      Hide
      John Fung added a comment -

      This test case is consistently failing in System Test. The followings are the errors found in the log4j messages from Broker & MirrorMaker:

      • Logs Archive location: /mnt/u001/hudson_kafka_replication_system_test_archives/test_1354190469/testcase_5005/logs

      ===================

      • Errors in Source Broker :
        ===================
        $ less broker-5/kafka_server_5.log (source)
        [2012-11-29 10:18:14,307] ERROR [KafkaApi-2] error when processing request (test_2,0,0,1048576) (kafka.server.KafkaApis)
        kafka.common.UnknownTopicOrPartitionException: Topic test_2 partition 0 doesn't exist on 2
        at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:163)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:359)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:325)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:321)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.map(Map.scala:93)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:321)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:289)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:57)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
        at java.lang.Thread.run(Thread.java:619)

      ===================

      • Errors in Target Broker :
        ===================
        $ less broker-7/kafka_server_7.log (target)
        [2012-11-29 10:18:29,711] INFO [KafkaApi-4] Auto creation of topic test_1 with 5 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
        [2012-11-29 10:18:29,713] ERROR [KafkaApi-4] Error while retrieving topic metadata (kafka.server.KafkaApis)
        kafka.admin.AdministrationException: topic test_1 already exists
        at kafka.admin.AdminUtils$.createTopicPartitionAssignmentPathInZK(AdminUtils.scala:85)
        at kafka.admin.CreateTopicCommand$.createTopic(CreateTopicCommand.scala:95)
        at kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$2.apply(KafkaApis.scala:437)
        at kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$2.apply(KafkaApis.scala:430)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
        at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:429)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
        at java.lang.Thread.run(Thread.java:619)
        . . .
        [2012-11-29 10:19:48,714] ERROR [Partition state machine on Controller 5]: State change for partition [test_2, 0] from OfflinePa
        rtition to OnlinePartition failed (kafka.controller.PartitionStateMachine)
        kafka.common.PartitionOfflineException: All replicas for partition [test_2, 0] are dead. Marking this partition offline
        at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:300)
        at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachin
        e.scala:141)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$1.apply(PartitionStateMachine.scala:86)
        at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$1.apply(PartitionStateMachine.scala:84)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:84)
        at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:59)
        at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:221)
        at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:85)
        at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
        at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
        at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
        Caused by: kafka.common.PartitionOfflineException: No replica for partition ([test_2, 0]) is alive. Live brokers are: [Set(5, 6)], Assigned replicas are: [List(4)]
        at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:62)
        at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:283)
        ... 18 more

      ===================

      • Errors in Mirror Maker :
        ===================
        $ less mirror_maker-14/mirror_maker_14.log
        [2012-11-29 10:18:29,738] ERROR Failed to collate messages by topic, partition due to (kafka.producer.async.DefaultEventHandler)
        kafka.common.KafkaException: Failed to fetch topic metadata for topic: test_1
        at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:53)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:170)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:131)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:130)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:130)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:76)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:57)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:103)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:86)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:66)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:43)
        [2012-11-29 10:18:29,740] ERROR Error in handling batch of 200 events (kafka.producer.async.ProducerSendThread)
        kafka.common.KafkaException: Failed to fetch topic metadata for topic: test_1
        at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:53)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:170)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:131)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:130)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:130)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:76)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:57)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:103)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:86)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:66)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:43)
      Show
      John Fung added a comment - This test case is consistently failing in System Test. The followings are the errors found in the log4j messages from Broker & MirrorMaker: Logs Archive location: /mnt/u001/hudson_kafka_replication_system_test_archives/test_1354190469/testcase_5005/logs =================== Errors in Source Broker : =================== $ less broker-5/kafka_server_5.log (source) [2012-11-29 10:18:14,307] ERROR [KafkaApi-2] error when processing request (test_2,0,0,1048576) (kafka.server.KafkaApis) kafka.common.UnknownTopicOrPartitionException: Topic test_2 partition 0 doesn't exist on 2 at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:163) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:359) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:325) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:321) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:321) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:289) at kafka.server.KafkaApis.handle(KafkaApis.scala:57) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) =================== Errors in Target Broker : =================== $ less broker-7/kafka_server_7.log (target) [2012-11-29 10:18:29,711] INFO [KafkaApi-4] Auto creation of topic test_1 with 5 partitions and replication factor 1 is successful! (kafka.server.KafkaApis) [2012-11-29 10:18:29,713] ERROR [KafkaApi-4] Error while retrieving topic metadata (kafka.server.KafkaApis) kafka.admin.AdministrationException: topic test_1 already exists at kafka.admin.AdminUtils$.createTopicPartitionAssignmentPathInZK(AdminUtils.scala:85) at kafka.admin.CreateTopicCommand$.createTopic(CreateTopicCommand.scala:95) at kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$2.apply(KafkaApis.scala:437) at kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$2.apply(KafkaApis.scala:430) at scala.collection.immutable.Set$Set1.foreach(Set.scala:81) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:429) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:619) . . . [2012-11-29 10:19:48,714] ERROR [Partition state machine on Controller 5] : State change for partition [test_2, 0] from OfflinePa rtition to OnlinePartition failed (kafka.controller.PartitionStateMachine) kafka.common.PartitionOfflineException: All replicas for partition [test_2, 0] are dead. Marking this partition offline at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:300) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachin e.scala:141) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$1.apply(PartitionStateMachine.scala:86) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$1.apply(PartitionStateMachine.scala:84) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:84) at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:59) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:221) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:85) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: kafka.common.PartitionOfflineException: No replica for partition ( [test_2, 0] ) is alive. Live brokers are: [Set(5, 6)] , Assigned replicas are: [List(4)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:62) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:283) ... 18 more =================== Errors in Mirror Maker : =================== $ less mirror_maker-14/mirror_maker_14.log [2012-11-29 10:18:29,738] ERROR Failed to collate messages by topic, partition due to (kafka.producer.async.DefaultEventHandler) kafka.common.KafkaException: Failed to fetch topic metadata for topic: test_1 at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:53) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:170) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:131) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:130) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:130) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:76) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:57) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:103) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:86) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:66) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:43) [2012-11-29 10:18:29,740] ERROR Error in handling batch of 200 events (kafka.producer.async.ProducerSendThread) kafka.common.KafkaException: Failed to fetch topic metadata for topic: test_1 at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:53) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:170) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:131) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:130) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:130) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:76) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:57) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:103) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:86) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:66) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:43)
      Hide
      John Fung added a comment -

      This issue is not showing any more in 0.8 branch. So mark this close now.

      Show
      John Fung added a comment - This issue is not showing any more in 0.8 branch. So mark this close now.
      John Fung made changes -
      Status Open [ 1 ] Resolved [ 5 ]
      Resolution Fixed [ 1 ]
      John Fung made changes -
      Status Resolved [ 5 ] Closed [ 6 ]

        People

        • Assignee:
          Unassigned
          Reporter:
          John Fung
        • Votes:
          0 Vote for this issue
          Watchers:
          1 Start watching this issue

          Dates

          • Created:
            Updated:
            Resolved:

            Development