From a6636fa41045dd7d48e595c41a0e899d7ac0387c Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 30 May 2013 17:50:43 -0700
Subject: [PATCH] Integrate controlled shutdown into kafka shutdown hook

---
 core/src/main/scala/kafka/api/RequestKeys.scala    |    4 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |    6 +-
 .../scala/kafka/controller/KafkaController.scala   |   81 +++++-------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |   13 +++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   14 +++-
 core/src/main/scala/kafka/server/KafkaServer.scala |   89 +++++++++++++++++++-
 .../main/scala/kafka/server/ReplicaManager.scala   |    3 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |    6 +-
 .../server/HighwatermarkPersistenceTest.scala      |    5 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |    3 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    8 ++-
 11 files changed, 155 insertions(+), 77 deletions(-)

diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 541cf84..e2ce9bd 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -28,6 +28,7 @@ object RequestKeys {
   val LeaderAndIsrKey: Short = 4
   val StopReplicaKey: Short = 5
   val UpdateMetadataKey: Short = 6
+  val ControlledShutdownKey: Short = 7
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -36,7 +37,8 @@ object RequestKeys {
         MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
         LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
         StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
-        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom))
+        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
+        ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom))
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 02d2c44..50a41a4 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -195,8 +195,10 @@ class Partition(val topic: String,
           leaderEpoch = leaderAndIsr.leaderEpoch
           zkVersion = leaderAndIsr.zkVersion
           leaderReplicaIdOpt = Some(newLeaderBrokerId)
-          // start fetcher thread to current leader
-          replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+          if (!replicaManager.isShuttingDown.get()) {
+            // start fetcher thread to current leader if we are not shutting down
+            replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+          }
         case None => // leader went down
           stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " +
             " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation"
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a4e96cc..7198b93 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -70,7 +70,7 @@ class ControllerContext(val zkClient: ZkClient,
 }
 
 trait KafkaControllerMBean {
-  def shutdownBroker(id: Int): Int
+  def shutdownBroker(id: Int): Set[TopicAndPartition]
 }
 
 object KafkaController {
@@ -118,17 +118,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
 
   /**
-   * JMX operation to initiate clean shutdown of a broker. On clean shutdown,
-   * the controller first determines the partitions that the shutting down
-   * broker leads, and moves leadership of those partitions to another broker
-   * that is in that partition's ISR. When all partitions have been moved, the
-   * broker process can be stopped normally (i.e., by sending it a SIGTERM or
-   * SIGINT) and no data loss should be observed.
+   * On clean shutdown, the controller first determines the partitions that the
+   * shutting down broker leads, and moves leadership of those partitions to another broker
+   * that is in that partition's ISR.
    *
    * @param id Id of the broker to shutdown.
    * @return The number of partitions that the broker still leads.
    */
-  def shutdownBroker(id: Int) = {
+  def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
 
     controllerContext.brokerShutdownLock synchronized {
       info("Shutting down broker " + id)
@@ -151,68 +148,34 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         }
       }
 
-      def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
-        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
-        controllerContext.partitionLeadershipInfo.filter {
-          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
-            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
-        }.map(_._1)
-      }
-
-      val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
-
-      debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
-
-      partitionsToMove.foreach { topicAndPartition =>
-        val (topic, partition) = topicAndPartition.asTuple
+      allPartitionsAndReplicationFactorOnBroker.foreach {
+        case(topicAndPartition, replicationFactor) =>
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
           controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
             if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
+              // if the broker leads the topic partition, transition the leader and update isr. Updates zk and
+              // notifies all affected brokers
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                 controlledShutdownPartitionLeaderSelector)
-              val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition)
-
-              // mark replica offline only if leadership was moved successfully
-              if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
-                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
-            } else
-              debug("Partition %s moved from leader %d to new leader %d during shutdown."
-                .format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader))
+            }
+            else {
+              // if the broker is a follower, updates the isr in ZK and notifies the current leader
+              replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+                topicAndPartition.partition, id)), OfflineReplica)
+            }
           }
         }
       }
 
-      val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
-
-      /*
-      * Force the shutting down broker out of the ISR of partitions that it
-      * follows, and shutdown the corresponding replica fetcher threads.
-      * This is really an optimization, so no need to register any callback
-      * to wait until completion.
-      */
-      if (partitionsRemaining.size == 0) {
-        brokerRequestBatch.newBatch()
-        allPartitionsAndReplicationFactorOnBroker foreach {
-          case(topicAndPartition, replicationFactor) =>
-            val (topic, partition) = topicAndPartition.asTuple
-            if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) {
-              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
-              removeReplicaFromIsr(topic, partition, id) match {
-                case Some(updatedLeaderIsrAndControllerEpoch) =>
-                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
-                    Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
-                    updatedLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition))
-                case None =>
-                // ignore
-              }
-            }
-        }
-        brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+      def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
+        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+        controllerContext.partitionLeadershipInfo.filter {
+          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+        }.map(_._1)
       }
-
-      debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
-      partitionsRemaining.size
+      replicatedPartitionsBrokerLeads().toSet
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dd88ccd..208e3ef 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,6 +30,7 @@ import kafka.common._
 import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
 import kafka.cluster.Broker
+import kafka.controller.KafkaController
 
 
 /**
@@ -38,7 +39,8 @@ import kafka.cluster.Broker
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val zkClient: ZkClient,
-                brokerId: Int) extends Logging {
+                brokerId: Int,
+                val controller: KafkaController) extends Logging {
 
   private val producerRequestPurgatory =
     new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
@@ -68,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
         case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
+        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
         case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
       }
     } catch {
@@ -126,6 +129,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
   }
 
+  def handleControlledShutdownRequest(request: RequestChannel.Request) {
+    val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
+    val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
+    val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
+      ErrorMapping.NoError, partitionsRemaining)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
+  }
+
   /**
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 549b4b0..b774431 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -169,4 +169,16 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the purge interval (in number of requests) of the producer request purgatory */
   val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
 
- }
+  /*********** Controlled shutdown configuration ***********/
+
+  /** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */
+  val controlledShutdownMaxRetries = props.getInt("controlled.shutdown.max.retries", 3)
+
+  /** Before each retry, the system needs time to recover from the state that caused the previous failure (Controller
+    * fail over, replica lag etc). This config determines the amount of time to wait before retrying. */
+  val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms", 5000)
+
+  /* enable controlled shutdown of the server */
+  val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", false)
+
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b0348bb..d829419 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,13 +17,15 @@
 
 package kafka.server
 
-import kafka.network.SocketServer
+import kafka.network.{Receive, BlockingChannel, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
 import java.util.concurrent._
-import atomic.AtomicBoolean
+import atomic.{AtomicInteger, AtomicBoolean}
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
+import kafka.cluster.Broker
+import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -33,6 +35,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   this.logIdent = "[Kafka Server " + config.brokerId + "], "
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
+  private var startupComplete = false;
+  val correlationId: AtomicInteger = new AtomicInteger(0)
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
   var logManager: LogManager = null
@@ -43,6 +47,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   val kafkaScheduler = new KafkaScheduler(4)
   var zkClient: ZkClient = null
 
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
@@ -79,10 +84,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
     info("Connecting to ZK: " + config.zkConnect)
 
-    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
+    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager, isShuttingDown)
 
     kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, kafkaController)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 
@@ -92,6 +97,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     kafkaController.startup()
     // register metrics beans
     registerStats()
+    startupComplete = true;
     info("started")
   }
 
@@ -105,6 +111,80 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   }
 
   /**
+   *  Performs controlled shutdown
+   */
+  private def doControlledShutdown() {
+    if (startupComplete && config.controlledShutdownEnable) {
+      // We are going to try shutting down cleanly. If that is not possible we are going to do an unclean shutdown.
+      // In either case, we eventually shutdown. So as the first step we stop all the fetchers.
+      replicaManager.replicaFetcherManager.closeAllFetchers()
+
+      // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
+      // of time and try again for a configured number of retries. If all the attempt fails, we simply force
+      // the shutdown.
+      var remainingRetries = config.controlledShutdownMaxRetries
+      info("starting controlled shutdown")
+      var channel : BlockingChannel = null;
+      var prevController : Broker = null
+      var shutdownSuceeded : Boolean =false
+      while (!shutdownSuceeded && remainingRetries > 0) {
+        remainingRetries = remainingRetries - 1
+        // Get the current controller info. This is to ensure we use the most recent info to issue the
+        // controlled shutdown request
+        val controllerId = ZkUtils.getController(kafkaZookeeper.getZookeeperClient)
+        ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, controllerId) match {
+          case Some(broker) =>
+            if (channel == null || prevController == null || !prevController.equals(broker)) {
+              // if this is the first attempt or if the controller has changed, create a channel to the most recent
+              // controller
+              if (channel != null) {
+                channel.disconnect()
+              }
+              channel = new BlockingChannel(broker.host, broker.port,
+                BlockingChannel.UseDefaultBufferSize,
+                BlockingChannel.UseDefaultBufferSize,
+                config.controllerSocketTimeoutMs)
+              channel.connect()
+              prevController = broker
+            }
+          case None=>
+            //ignore and try again
+        }
+        if (channel != null) {
+          var response: Receive = null
+          try {
+            // send the controlled shutdown request
+            val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
+            channel.send(request)
+            response = channel.receive()
+            val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
+            if (shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.size == 0) {
+              shutdownSuceeded = true
+              info ("Controlled shutdown succeeded")
+            }
+            else {
+              info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
+              info("Error code from controller: %d".format(shutdownResponse.errorCode))
+            }
+          }
+          catch {
+            case ioe: java.io.IOException =>
+              channel.disconnect()
+              channel = null
+              // ignore and try again
+          }
+        }
+        if (!shutdownSuceeded) {
+          Thread.sleep(config.controlledShutdownRetryBackoffMs)
+        }
+      }
+      if (!shutdownSuceeded) {
+        info ("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
+      }
+    }
+  }
+
+  /**
    * Shutdown API for shutting down a single instance of the Kafka server.
    * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
    */
@@ -112,6 +192,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     info("shutting down")
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
+      Utils.swallow(doControlledShutdown())
       if(kafkaZookeeper != null)
         Utils.swallow(kafkaZookeeper.shutdown())
       if(socketServer != null)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8e49b83..9d41e82 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -40,7 +40,8 @@ class ReplicaManager(val config: KafkaConfig,
                      time: Time, 
                      val zkClient: ZkClient, 
                      kafkaScheduler: KafkaScheduler,
-                     val logManager: LogManager) extends Logging with KafkaMetricsGroup {
+                     val logManager: LogManager,
+                     val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val localBrokerId = config.brokerId
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 95e7218..0d8b70f 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -345,7 +345,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       // wait for the update metadata request to trickle to the brokers
       assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
         activeServers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
-      assertEquals(0, partitionsRemaining)
+      assertEquals(0, partitionsRemaining.size)
       var partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
       var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
@@ -353,7 +353,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
 
       partitionsRemaining = controller.shutdownBroker(1)
-      assertEquals(0, partitionsRemaining)
+      assertEquals(0, partitionsRemaining.size)
       activeServers = servers.filter(s => s.config.brokerId == 0)
       partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
       leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
@@ -361,7 +361,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
       assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
       partitionsRemaining = controller.shutdownBroker(0)
-      assertEquals(1, partitionsRemaining)
+      assertEquals(1, partitionsRemaining.size)
       // leader doesn't change since all the replicas are shut down
       assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
     }
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index f30b097..2719055 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -25,6 +25,7 @@ import org.junit.Assert._
 import kafka.common.KafkaException
 import kafka.cluster.Replica
 import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
+import java.util.concurrent.atomic.AtomicBoolean
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
@@ -47,7 +48,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
     var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
@@ -86,7 +87,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
     var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 6184f42..7026432 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -24,6 +24,7 @@ import org.easymock.EasyMock
 import kafka.log.Log
 import org.junit.Assert._
 import kafka.utils._
+import java.util.concurrent.atomic.AtomicBoolean
 
 class IsrExpirationTest extends JUnit3Suite {
 
@@ -80,7 +81,7 @@ class IsrExpirationTest extends JUnit3Suite {
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId=config.brokerId
-    val replicaManager = new ReplicaManager(config, time, null, null, null)
+    val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false))
     val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index c7dd8a7..23a8cb5 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -87,10 +87,12 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
+    val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
+
     // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
     // don't provide replica or leader callbacks since they will not be tested here
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, controller)
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -184,8 +186,10 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
+    val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
+
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, controller)
 
     /**
      * This fetch, coming from a replica, requests all data at offset "15".  Because the request is coming
-- 
1.7.1

