diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index d1d6c4b..169a656 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -16,14 +16,19 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.Time; + import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import org.apache.kafka.clients.producer.BufferExhaustedException; - /** * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In @@ -44,6 +49,9 @@ public final class BufferPool { private final Deque free; private final Deque waiters; private long availableMemory; + private final Metrics metrics; + private final Time time; + private final Sensor waitTime; /** * Create a new buffer pool @@ -54,7 +62,7 @@ public final class BufferPool { * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false * {@link #allocate(int)} will throw an exception if the buffer is out of memory. */ - public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion) { + public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time) { this.poolableSize = poolableSize; this.blockOnExhaustion = blockOnExhaustion; this.lock = new ReentrantLock(); @@ -62,7 +70,13 @@ public final class BufferPool { this.waiters = new ArrayDeque(); this.totalMemory = memory; this.availableMemory = memory; - } + this.metrics = metrics; + this.time = time; + this.waitTime = this.metrics.sensor("bufferpool-wait-time"); + this.waitTime.add("bufferpool-wait-ratio", + "The fraction of time an appender waits for space allocation.", + new Rate(TimeUnit.NANOSECONDS)); + } /** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool @@ -111,7 +125,14 @@ public final class BufferPool { // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { - moreMemory.await(); + try { + long startWait = time.nanoseconds(); + moreMemory.await(300, TimeUnit.MILLISECONDS); + long endWait = time.nanoseconds(); + this.waitTime.record(endWait - startWait, time.milliseconds()); + } catch (InterruptedException e) { + // This should never happen. Just let it go. + } // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 5ededcc..4010d42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -82,7 +82,7 @@ public final class RecordAccumulator { this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); - this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull); + this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time); this.time = time; registerMetrics(metrics); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java index f227b5c..fe3c13f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.clients.producer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.kafka.clients.producer.internals.BufferPool; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -26,13 +28,11 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.kafka.clients.producer.BufferExhaustedException; -import org.apache.kafka.clients.producer.internals.BufferPool; -import org.apache.kafka.test.TestUtils; -import org.junit.Test; +import static org.junit.Assert.*; public class BufferPoolTest { + private MockTime time = new MockTime(); + private Metrics metrics = new Metrics(time); /** * Test the simple non-blocking allocation paths @@ -41,7 +41,7 @@ public class BufferPoolTest { public void testSimple() throws Exception { int totalMemory = 64 * 1024; int size = 1024; - BufferPool pool = new BufferPool(totalMemory, size, false); + BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time); ByteBuffer buffer = pool.allocate(size); assertEquals("Buffer size should equal requested size.", size, buffer.limit()); assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); @@ -68,7 +68,7 @@ public class BufferPoolTest { */ @Test(expected = IllegalArgumentException.class) public void testCantAllocateMoreMemoryThanWeHave() throws Exception { - BufferPool pool = new BufferPool(1024, 512, true); + BufferPool pool = new BufferPool(1024, 512, true, metrics, time); ByteBuffer buffer = pool.allocate(1024); assertEquals(1024, buffer.limit()); pool.deallocate(buffer); @@ -77,7 +77,7 @@ public class BufferPoolTest { @Test public void testNonblockingMode() throws Exception { - BufferPool pool = new BufferPool(2, 1, false); + BufferPool pool = new BufferPool(2, 1, false, metrics, time); pool.allocate(1); try { pool.allocate(2); @@ -92,7 +92,7 @@ public class BufferPoolTest { */ @Test public void testDelayedAllocation() throws Exception { - BufferPool pool = new BufferPool(5 * 1024, 1024, true); + BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time); ByteBuffer buffer = pool.allocate(1024); CountDownLatch doDealloc = asyncDeallocate(pool, buffer); CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); @@ -141,7 +141,7 @@ public class BufferPoolTest { final int iterations = 50000; final int poolableSize = 1024; final int totalMemory = numThreads / 2 * poolableSize; - final BufferPool pool = new BufferPool(totalMemory, poolableSize, true); + final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) threads.add(new StressTestThread(pool, iterations)); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a9c0465..518d2df 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging} +import kafka.utils.{ZkUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig @@ -216,7 +216,7 @@ class Partition(val topic: String, inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion - + leaderReplicaIdOpt.foreach { leaderReplica => if (topic == OffsetManager.OffsetsTopicName && /* if we are making a leader->follower transition */ @@ -261,15 +261,7 @@ class Partition(val topic: String, info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache - val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, - leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) - if(updateSucceeded) { - inSyncReplicas = newInSyncReplicas - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } + updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) @@ -333,15 +325,7 @@ class Partition(val topic: String, info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache - val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, - leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) - if(updateSucceeded) { - inSyncReplicas = newInSyncReplicas - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } + updateIsr(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() @@ -389,6 +373,22 @@ class Partition(val topic: String, } } + private def updateIsr(newIsr: Set[Replica]) { + debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + // use the epoch of the controller that made the leadership decision, instead of the current controller epoch + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) + if (updateSucceeded){ + inSyncReplicas = newIsr + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } + } + override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Partition])) return false diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8af48ab..d0cf5f1 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -983,7 +983,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) + leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) @@ -1037,7 +1037,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) + leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) if (updateSucceeded) @@ -1335,16 +1335,6 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") leaderAndIsrInfo.toString() } - - override def equals(obj: Any): Boolean = { - obj match { - case null => false - case n: LeaderIsrAndControllerEpoch => - leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted == n.leaderAndIsr.isr.sorted && - leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch == n.controllerEpoch - case _ => false - } - } } object ControllerStats extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index e29e470..6457b56 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -22,7 +22,7 @@ import collection.mutable.Buffer import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.utils.{Logging, ZkUtils, ReplicationUtils} +import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.log4j.Logger @@ -336,7 +336,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) + ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion) newLeaderAndIsr = leaderAndIsr newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded @@ -521,3 +521,5 @@ case object NewPartition extends PartitionState { val state: Byte = 0 } case object OnlinePartition extends PartitionState { val state: Byte = 1 } case object OfflinePartition extends PartitionState { val state: Byte = 2 } case object NonExistentPartition extends PartitionState { val state: Byte = 3 } + + diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala deleted file mode 100644 index eb53837..0000000 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils -import kafka.cluster.Replica -import kafka.api.LeaderAndIsr -import kafka.controller.LeaderIsrAndControllerEpoch -import org.apache.zookeeper.data.Stat -import org.I0Itec.zkclient.ZkClient - -import scala.Some -import scala.collection._ - -object ReplicationUtils extends Logging { - - def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch: Int, - controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = { - debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) - val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) - val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId) - val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) - // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, - Some(checkLeaderAndIsrZkData)) - } - - def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String,newLeaderData: String, zkVersion: Int): (Boolean,Int) = { - try { - val newLeaderStat: Stat = new Stat() - newLeaderStat.setVersion(zkVersion) - val newLeader = parseLeaderAndIsr(newLeaderData, path, newLeaderStat) - val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path) - val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 - val writtenStat = writtenLeaderAndIsrInfo._2 - writtenLeaderOpt match { - case Some(writtenData) => - val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat) - (newLeader,writtenLeader) match { - case (Some(newLeader),Some(writtenLeader)) => - if(newLeader.equals(writtenLeader)) - return (true,writtenStat.getVersion()) - case _ => - } - case None => - } - } catch { - case e1: Exception => - } - (false,-1) - } - - def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat) - : Option[LeaderIsrAndControllerEpoch] = { - Json.parseFull(leaderAndIsrStr) match { - case Some(m) => - val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] - val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] - val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] - val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] - val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] - val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch, - isr.toString(), zkPathVersion, path)) - Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) - case None => None - } - } - -} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 1a23eb4..fcbe269 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -20,8 +20,7 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount import org.I0Itec.zkclient.{IZkDataListener, ZkClient} -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, - ZkMarshallingError, ZkBadVersionException} +import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ import kafka.api.LeaderAndIsr @@ -59,7 +58,7 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } - def getTopicConfigPath(topic: String): String = + def getTopicConfigPath(topic: String): String = TopicConfigPath + "/" + topic def getDeleteTopicPath(topic: String): String = @@ -92,7 +91,7 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = leaderAndIsrInfo._1 val stat = leaderAndIsrInfo._2 leaderAndIsrOpt match { - case Some(leaderAndIsrStr) => ReplicationUtils.parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat) + case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat) case None => None } } @@ -100,12 +99,29 @@ object ZkUtils extends Logging { def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } - + def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) makeSurePersistentPathExists(zkClient, path) } + def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat) + : Option[LeaderIsrAndControllerEpoch] = { + Json.parseFull(leaderAndIsrStr) match { + case Some(m) => + val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] + val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] + val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] + val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] + val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] + val zkPathVersion = stat.getVersion + debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for partition [%s,%d]".format(leader, epoch, + isr.toString(), zkPathVersion, topic, partition)) + Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) + case None => None + } + } + def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { @@ -364,26 +380,16 @@ object ZkUtils extends Logging { * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't * exist, the current version is not the expected version, etc.) return (false, -1) */ - def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int, - optionalChecker:Option[(ZkClient, String,String,Int) => (Boolean,Int)] = None): (Boolean, Int) = { + def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { - case e1: ZkBadVersionException => { - optionalChecker match { - case Some(checker) => return checker(client,path,data,expectVersion) - case _ => debug("Checker method is not passed skipping zkData match") - } - error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e1.getMessage)) - (false, -1) - } - case e2: Exception => + case e: Exception => error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e2.getMessage)) + expectVersion, e.getMessage)) (false, -1) } } @@ -422,7 +428,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -445,7 +451,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def maybeDeletePath(zkUrl: String, dir: String) { try { val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala deleted file mode 100644 index f364980..0000000 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import kafka.cluster.{Replica, Partition} -import kafka.server.{ReplicaFetcherManager, KafkaConfig} -import kafka.utils.TestUtils._ -import kafka.zk.ZooKeeperTestHarness -import kafka.log.Log -import kafka.common.TopicAndPartition -import org.scalatest.junit.JUnit3Suite -import org.junit.Assert._ -import org.junit.Test -import org.I0Itec.zkclient.ZkClient -import org.easymock.EasyMock -import org.apache.log4j.Logger - - -class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { - private val logger = Logger.getLogger(classOf[UtilsTest]) - val topic = "my-topic-test" - val partitionId = 0 - val brokerId = 1 - val leaderEpoch = 1 - val controllerEpoch = 1 - val zkVersion = 1 - val topicPath = "/brokers/topics/my-topic-test/partitions/0/state" - val topicData = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, - "versions" -> 1, "leader_epoch" -> 1,"isr" -> List(1,2))) - val topicDataVersionMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, - "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(2,1))) - val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, - "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2))) - - - override def setUp() { - super.setUp() - ZkUtils.createPersistentPath(zkClient,topicPath,topicData) - } - - def testCheckLeaderAndIsrZkData() { - //mismatched zkVersion with the same data - val(dataMatched1,newZkVersion1) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataVersionMismatch,1) - assertTrue(dataMatched1) - assertEquals(newZkVersion1,0) - - //mismatched zkVersion and leaderEpoch - val(dataMatched2,newZkVersion2) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataMismatch,1) - assertFalse(dataMatched2) - assertEquals(newZkVersion2,-1) - } - - def testUpdateIsr() { - val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_)) - - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes() - EasyMock.expect(log) - EasyMock.replay(log) - - val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() - EasyMock.replay(logManager) - - val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) - EasyMock.expect(replicaManager.config).andReturn(configs.head) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) - EasyMock.replay(replicaManager) - - val partition = new Partition(topic,0,1,new MockTime,replicaManager) - val replicas = Set(new Replica(1,partition),new Replica(2,partition)) - - // regular update - val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateIsr(zkClient, - "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, 0, replicas) - assertTrue(updateSucceeded1) - assertEquals(newZkVersion1,1) - - // mismatched zkVersion with the same data - val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateIsr(zkClient, - "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, zkVersion + 1, replicas) - assertTrue(updateSucceeded2) - // returns true with existing zkVersion - assertEquals(newZkVersion2,1) - - // mismatched zkVersion and leaderEpoch - val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateIsr(zkClient, - "my-topic-test", partitionId, brokerId, leaderEpoch + 1, controllerEpoch, zkVersion + 1, replicas) - assertFalse(updateSucceeded3) - assertEquals(newZkVersion3,-1) - } - -}