Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (working copy) @@ -26,7 +26,7 @@ import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder import kafka.server.{KafkaConfig, KafkaServer} -import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness} +import kafka.zk.ZooKeeperTestHarness import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.{PropertyConfigurator, Logger} import org.junit.{After, Before, Test} @@ -36,23 +36,17 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { var logDirZk: File = null - var logDirBl: File = null - var serverBl: KafkaServer = null var serverZk: KafkaServer = null var simpleConsumerZk: SimpleConsumer = null - var simpleConsumerBl: SimpleConsumer = null val tLogger = Logger.getLogger(getClass()) private val brokerZk = 0 - private val brokerBl = 1 private val ports = TestUtils.choosePorts(2) - private val (portZk, portBl) = (ports(0), ports(1)) + private val portZk = ports(0) - private var zkServer:EmbeddedZookeeper = null - @Before override def setUp() { super.setUp() @@ -62,26 +56,17 @@ logDirZk = new File(logDirZkPath) serverZk = TestUtils.createServer(new KafkaConfig(propsZk)); - val propsBl: Properties = TestUtils.createBrokerConfig(brokerBl, portBl) - val logDirBlPath = propsBl.getProperty("log.dir") - logDirBl = new File(logDirBlPath) - serverBl = TestUtils.createServer(new KafkaConfig(propsBl)) - Thread.sleep(100) simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024) - simpleConsumerBl = new SimpleConsumer("localhost", portBl, 1000000, 64*1024) } @After override def tearDown() { simpleConsumerZk.close - simpleConsumerBl.close serverZk.shutdown - serverBl.shutdown Utils.rm(logDirZk) - Utils.rm(logDirBl) Thread.sleep(500) super.tearDown() @@ -174,13 +159,6 @@ count = count + 1 } - val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) - val messagesFromOtherBroker = response2.messageSet("test-topic", 0) - - for(message <- messagesFromOtherBroker) { - count = count + 1 - } - assertEquals(5, count) } @@ -195,11 +173,6 @@ props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") props } - - private def getLogDir(): File = { - val dir = TestUtils.tempDir() - dir - } } class AppenderStringEncoder extends Encoder[LoggingEvent] { Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -34,11 +34,13 @@ import scala.collection.Map import kafka.serializer.Encoder import kafka.api.{ProducerRequest, TopicData, PartitionData} +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.TimeUnit /** * Utility functions to help with testing */ -object TestUtils { +object TestUtils extends Logging { val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" val Digits = "0123456789" @@ -385,6 +387,35 @@ val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data) pr } + + def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = { + val leaderLock = new ReentrantLock() + val leaderExists = leaderLock.newCondition() + + info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition)) + leaderLock.lock() + try { + // check if leader already exists + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) + leader match { + case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition)) + leader + case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), + new LeaderExists(topic, partition, leaderExists)) + leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS) + // check if leader is elected + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) + leader match { + case Some(l) => info("Leader %d elected for topic %s partition %d".format(l, topic, partition)) + case None => error("Timing out after %d ms since leader is not elected for topic %s partition %d" + .format(timeoutMs, topic, partition)) + } + leader + } + } finally { + leaderLock.unlock() + } + } } object TestZKUtils { Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala (working copy) @@ -27,8 +27,9 @@ import kafka.server._ import org.scalatest.junit.JUnit3Suite import kafka.integration.KafkaServerTestHarness +import kafka.producer.{ProducerData, Producer} import kafka.utils.TestUtils -import kafka.producer.{ProducerData, Producer} +import kafka.utils.TestUtils._ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -43,7 +44,7 @@ val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, c.brokerId, - new Partition(c.brokerId, 0), + 0, queue, new AtomicLong(0), new AtomicLong(0), @@ -66,6 +67,7 @@ def testFetcher() { val perNode = 2 var count = sendMessages(perNode) + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) fetch(count) Thread.sleep(100) assertQueueEmpty() Index: core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (working copy) @@ -39,6 +39,8 @@ props.put("buffer.size", "65536") props.put("connect.timeout.ms", "100000") props.put("reconnect.interval", "10000") + props.put("producer.retry.backoff.ms", "1000") + props.put("producer.num.retries", "3") producer = new Producer(new ProducerConfig(props)) consumer = new SimpleConsumer(host, port, Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (working copy) @@ -21,11 +21,11 @@ import kafka.common.OffsetOutOfRangeException import kafka.message.{Message, ByteBufferMessageSet} import kafka.server.{KafkaRequestHandler, KafkaConfig} -import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.producer.ProducerData +import kafka.utils.TestUtils /** * End to end tests of the primitive apis against a local server @@ -61,6 +61,7 @@ val producerData = new ProducerData[String, Message](topic, topic, sentMessages) producer.send(producerData) + var fetchedMessage: ByteBufferMessageSet = null while(fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -124,10 +124,58 @@ producer.close } +// @Test +// def testZKSendWithDeadBroker() { +// val props = new Properties() +// props.put("serializer.class", "kafka.serializer.StringEncoder") +// props.put("partitioner.class", "kafka.utils.StaticPartitioner") +// props.put("zk.connect", TestZKUtils.zookeeperConnect) +// +// // create topic +// CreateTopicCommand.createTopic(zkClient, "new-topic", 2, 1, "0,0") +// +// val config = new ProducerConfig(props) +// +// val producer = new Producer[String, String](config) +// val message = new Message("test1".getBytes) +// try { +//// // kill 2nd broker +//// server1.shutdown +//// Thread.sleep(100) +// +// // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and +// // all partitions have broker 0 as the leader. +// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) +// Thread.sleep(100) +// +// producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) +// Thread.sleep(3000) +// +// // restart server 1 +//// server1.startup() +//// Thread.sleep(100) +// +// // cross check if brokers got the messages +// val response = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) +// val messageSet = response.messageSet("new-topic", 0).iterator +// var numMessagesReceived = 0 +// while(messageSet.hasNext) { +// val messageAndOffset = messageSet.next() +// assertEquals(message, messageSet.next.message) +// println("Received message at offset %d".format(messageAndOffset.offset)) +// numMessagesReceived += 1 +// } +// assertEquals("Message set should have 2 messages", 2, numMessagesReceived) +// } catch { +// case e: Exception => fail("Not expected", e) +// } +// producer.close +// } + // TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions // and when leader logic is changed. - @Test - def testZKSendWithDeadBroker() { +// @Test +// def testZKSendWithDeadBroker2() { // val props = new Properties() // props.put("serializer.class", "kafka.serializer.StringEncoder") // props.put("partitioner.class", "kafka.utils.StaticPartitioner") @@ -172,7 +220,7 @@ // case e: Exception => fail("Not expected", e) // } // producer.close - } +// } @Test def testZKSendToExistingTopicWithNoBrokers() { @@ -227,7 +275,7 @@ } catch { case e: Exception => fail("Not expected", e) } finally { - server.shutdown + if(server != null) server.shutdown producer.close } } Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (revision 0) +++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (revision 0) @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import kafka.admin.CreateTopicCommand +import org.I0Itec.zkclient.ZkClient +import kafka.utils.TestUtils._ +import junit.framework.Assert._ +import kafka.utils.{ZKStringSerializer, Utils, TestUtils} + +class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { + + val brokerId1 = 0 + val brokerId2 = 1 + + val port1 = TestUtils.choosePort() + val port2 = TestUtils.choosePort() + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var zkClient: ZkClient = null + + override def setUp() { + super.setUp() + + zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer) + + // start both servers + val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) + val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) + + servers ++= List(server1, server2) + } + + override def tearDown() { + // shutdown the servers and delete data hosted on them + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDir)) + + super.tearDown() + } + + def testLeaderElectionWithCreateTopic { + // start 2 brokers + val topic = "new-topic" + val partitionId = 0 + + // create topic with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + + // wait until leader is elected + var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) + + assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1)) + + // kill the server hosting the preferred replica + servers.head.shutdown() + + // check if leader moves to the other server + leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000) + assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) + + Thread.sleep(500) + + // bring the preferred replica back + servers.head.startup() + + leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) + // TODO: Once the optimization for preferred replica re-election is in, this check should change to broker 0 + assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1)) + + // shutdown current leader (broker 1) + servers.last.shutdown() + leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) + + // test if the leader is the preferred replica + assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1)) + } +} \ No newline at end of file Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (working copy) @@ -24,10 +24,11 @@ import kafka.message.{Message, ByteBufferMessageSet} import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{TestUtils, Utils} import kafka.producer._ +import kafka.utils.TestUtils._ import kafka.admin.CreateTopicCommand import kafka.api.FetchRequestBuilder +import kafka.utils.{TestUtils, Utils} class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort @@ -72,7 +73,7 @@ val server = new KafkaServer(config) server.startup() - Thread.sleep(100) + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 1000) var fetchedMessage: ByteBufferMessageSet = null while(fetchedMessage == null || fetchedMessage.validBytes == 0) { @@ -83,7 +84,6 @@ val newOffset = fetchedMessage.validBytes // send some more messages - println("Sending messages to topic " + topic) producer.send(new ProducerData[Int, Message](topic, 0, sent2)) Thread.sleep(200) Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala =================================================================== --- core/src/test/scala/unit/kafka/admin/AdminTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/admin/AdminTest.scala (working copy) @@ -169,8 +169,6 @@ case Some(metadata) => assertEquals(topic, metadata.topic) assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata) assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size) - assertEquals("leader of partition 0 should be 0", 0, metadata.partitionsMetadata.head.leader.get.id) - assertEquals("leader of partition 1 should be 1", 1, metadata.partitionsMetadata.last.leader.get.id) val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas) val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList assertEquals(expectedReplicaAssignment.toList, actualReplicaList) Index: core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (working copy) @@ -43,7 +43,7 @@ @Test def testPartition() { - assertTrue(new Partition(10, 0) == new Partition(10, 0)) - assertTrue(new Partition(10, 1) != new Partition(10, 0)) + assertTrue(new Partition("foo", 10) == new Partition("foo", 10)) + assertTrue(new Partition("foo", 1) != new Partition("foo", 0)) } } Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -31,6 +31,7 @@ import kafka.utils._ import kafka.producer.{ProducerConfig, ProducerData, Producer} import java.util.{Collections, Properties} +import kafka.utils.TestUtils._ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -51,10 +52,12 @@ val consumer2 = "consumer2" val consumer3 = "consumer3" val nMessages = 2 + var zkClient: ZkClient = null override def setUp() { super.setUp() dirs = new ZKGroupTopicDirs(group, topic) + zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer) } def testBasic() { @@ -94,6 +97,10 @@ TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) + + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) + val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) assertEquals(sentMessages1.size, receivedMessages1.size) assertEquals(sentMessages1, receivedMessages1) @@ -102,7 +109,6 @@ val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_1 = List( ("0", "group1_consumer1-0"), ("1", "group1_consumer1-0")) -// assertEquals(expected_1, actual_1) assertEquals(expected_1, actual_1) // commit consumed offsets @@ -118,7 +124,8 @@ val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum) - Thread.sleep(200) + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1) val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2) @@ -141,7 +148,10 @@ val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum) - Thread.sleep(200) + + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) + val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1) val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2) val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum) @@ -168,6 +178,9 @@ val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum) + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) + // create a consumer val consumerConfig1 = new ConsumerConfig( TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -196,7 +209,8 @@ val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum) - Thread.sleep(200) + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1) val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2) @@ -219,7 +233,10 @@ val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum) - Thread.sleep(200) + + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) + val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1) val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2) val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum) @@ -300,11 +317,15 @@ val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) + waitUntilLeaderIsElected(zkClient, topic, 0, 500) + waitUntilLeaderIsElected(zkClient, topic, 1, 500) + val zkConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, true) val topicMessageStreams = zkConsumerConnector.createMessageStreams(Predef.Map(topic -> 1), new StringDecoder) + var receivedMessages: List[String] = Nil for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { @@ -341,10 +362,10 @@ val topicRegistry = zkConsumerConnector1.getTopicRegistry assertEquals(1, topicRegistry.map(r => r._1).size) assertEquals(topic, topicRegistry.map(r => r._1).head) - val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._1))) + val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2))) val brokerPartition = topicsAndPartitionsInRegistry.head._2.head assertEquals(0, brokerPartition.brokerId) - assertEquals(0, brokerPartition.partId) + assertEquals(0, brokerPartition.partitionId) // also check partition ownership val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (revision 1300793) +++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -20,14 +20,14 @@ import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.server._ -import kafka.utils.{Utils, Logging} -import kafka.utils.TestUtils import org.scalatest.junit.JUnit3Suite import scala.collection.JavaConversions._ import kafka.consumer.{ConsumerConfig, KafkaMessageStream} import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.javaapi.producer.{ProducerData, Producer} +import kafka.utils.{Utils, Logging, TestUtils} +import kafka.utils.TestUtils._ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -52,11 +52,15 @@ // send some messages to each broker val sentMessages1 = sendMessages(nMessages, "batch1") + + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) + // create a consumer - val consumerConfig1 = new ConsumerConfig( - TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1)) + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2))) + val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) assertEquals(sentMessages1, receivedMessages1) Index: core/src/test/resources/log4j.properties =================================================================== --- core/src/test/resources/log4j.properties (revision 1300793) +++ core/src/test/resources/log4j.properties (working copy) @@ -18,8 +18,8 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=OFF +log4j.logger.kafka=INFO # zkclient can be verbose, during debugging it is common to adjust is separately -log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -log4j.logger.org.apache.zookeeper=WARN +log4j.logger.org.I0Itec.zkclient.ZkClient=OFF +log4j.logger.org.apache.zookeeper=OFF Index: core/src/main/scala/kafka/cluster/Replica.scala =================================================================== --- core/src/main/scala/kafka/cluster/Replica.scala (revision 0) +++ core/src/main/scala/kafka/cluster/Replica.scala (revision 0) @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.cluster + +import kafka.log.Log + +case class Replica(brokerId: Int, partition: Partition, topic: String, + var log: Option[Log] = None, var hw: Long = -1, var leo: Long = -1, isLocal: Boolean = false) \ No newline at end of file Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1300793) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -16,14 +16,12 @@ */ package kafka.cluster -case class Partition(brokerId: Int, partId: Int, topic: String = "") extends Ordered[Partition] { - - def name = partId - - def compare(that: Partition) = - if (this.topic == that.topic) - this.partId - that.partId - else - this.topic.compareTo(that.topic) - -} +/** + * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR + * TODO: Commit queue to be added as part of KAFKA-46. Add AR, ISR, CUR, RAR state maintenance as part of KAFKA-302 + */ +case class Partition(topic: String, val partId: Int, var leader: Option[Replica] = None, + assignedReplicas: Set[Replica] = Set.empty[Replica], + inSyncReplicas: Set[Replica] = Set.empty[Replica], + catchUpReplicas: Set[Replica] = Set.empty[Replica], + reassignedReplicas: Set[Replica] = Set.empty[Replica]) Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1300793) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -26,7 +26,7 @@ import kafka.api.OffsetRequest import java.util._ -private[log] object Log { +private[kafka] object Log { val FileSuffix = ".kafka" /** @@ -100,7 +100,7 @@ * An append-only log for storing messages. */ @threadsafe -private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging { +private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging { /* A lock that guards all modifications to the log */ private val lock = new Object Index: core/src/main/scala/kafka/log/LogManager.scala =================================================================== --- core/src/main/scala/kafka/log/LogManager.scala (revision 1300793) +++ core/src/main/scala/kafka/log/LogManager.scala (working copy) @@ -25,6 +25,7 @@ import kafka.common.{InvalidTopicException, InvalidPartitionException} import kafka.api.OffsetRequest import org.I0Itec.zkclient.ZkClient +import kafka.cluster.{Partition, Replica} /** * The guy who creates and hands out logs @@ -51,6 +52,7 @@ private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap) private val logRetentionSize = config.logRetentionSize private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) + private var replicas: Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]() /* Initialize a log for each subdirectory of the main log directory */ private val logs = new Pool[String, Pool[Int, Log]]() @@ -120,7 +122,9 @@ new Log(d, maxSize, flushInterval, false) } } - + + def getReplicaForPartition(topic: String, partition: Int): Option[Replica] = replicas.get((topic, partition)) + /** * Return the Pool (partitions) for a specific log */ @@ -145,17 +149,23 @@ def getOffsets(offsetRequest: OffsetRequest): Array[Long] = { val log = getLog(offsetRequest.topic, offsetRequest.partition) - if (log != null) return log.getOffsetsBefore(offsetRequest) - Log.getEmptyOffsets(offsetRequest) + log match { + case Some(l) => l.getOffsetsBefore(offsetRequest) + case None => Log.getEmptyOffsets(offsetRequest) + } } /** * Get the log if exists */ - def getLog(topic: String, partition: Int): Log = { + def getLog(topic: String, partition: Int): Option[Log] = { val parts = getLogPool(topic, partition) - if (parts == null) return null - parts.get(partition) + if (parts == null) None + else { + val log = parts.get(partition) + if(log == null) None + else Some(log) + } } /** @@ -188,9 +198,40 @@ info("Created log for '" + topic + "'-" + partition) } + // add this log to the list of replicas hosted on this broker + addReplicaForPartition(topic, partition) log } - + + def addReplicaForPartition(topic: String, partitionId: Int): Replica = { + val replica = replicas.get((topic, partitionId)) + val log = getLog(topic, partitionId) + replica match { + case Some(r) => + r.log match { + case None => + val log = getLog(topic, partitionId) + r.log = log + case Some(l) => // nothing to do since log already exists + } + case None => + val partition = new Partition(topic, partitionId) + log match { + case Some(l) => + val replica = new Replica(config.brokerId, partition, topic, log, l.getHighwaterMark, l.maxSize, true) + replicas += (topic, partitionId) -> replica + info("Added replica for topic %s partition %s on broker %d" + .format(replica.topic, replica.partition.partId, replica.brokerId)) + case None => + val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false) + replicas += (topic, partitionId) -> replica + info("Added replica for topic %s partition %s on broker %d" + .format(replica.topic, replica.partition.partId, replica.brokerId)) + } + } + replicas.get((topic, partitionId)).get + } + /* Attemps to delete all provided segments from a log and returns how many it was able to */ private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = { var total = 0 Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (revision 1300793) +++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (working copy) @@ -16,12 +16,11 @@ */ package kafka.producer -import kafka.cluster.{Broker, Partition} import collection.mutable.HashMap import kafka.api.{TopicMetadataRequest, TopicMetadata} import java.lang.IllegalStateException -import kafka.common.NoLeaderForPartitionException import kafka.utils.Logging +import kafka.cluster.{Replica, Partition} class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging { val topicPartitionInfo = new HashMap[String, TopicMetadata]() @@ -33,29 +32,37 @@ * @return a sequence of (brokerId, numPartitions). Returns a zero-length * sequence if no brokers are available. */ - def getBrokerPartitionInfo(topic: String): Seq[(Partition, Broker)] = { + def getBrokerPartitionInfo(topic: String): Seq[Partition] = { + debug("Getting broker partition info for topic %s".format(topic)) // check if the cache has metadata for this topic val topicMetadata = topicPartitionInfo.get(topic) val metadata: TopicMetadata = - topicMetadata match { - case Some(m) => m - case None => - // refresh the topic metadata cache - info("Fetching metadata for topic %s".format(topic)) - updateInfo(topic) - val topicMetadata = topicPartitionInfo.get(topic) - topicMetadata match { - case Some(m) => m - case None => throw new IllegalStateException("Failed to fetch topic metadata for topic: " + topic) - } - } + topicMetadata match { + case Some(m) => m + case None => + // refresh the topic metadata cache + info("Fetching metadata for topic %s".format(topic)) + updateInfo(topic) + val topicMetadata = topicPartitionInfo.get(topic) + topicMetadata match { + case Some(m) => m + case None => throw new IllegalStateException("Failed to fetch topic metadata for topic: " + topic) + } + } val partitionMetadata = metadata.partitionsMetadata partitionMetadata.map { m => + val partition = new Partition(topic, m.partitionId) m.leader match { - case Some(leader) => (new Partition(leader.id, m.partitionId, topic) -> leader) - case None => throw new NoLeaderForPartitionException("No leader for topic %s, partition %d".format(topic, m.partitionId)) + case Some(leader) => + val leaderReplica = new Replica(leader.id, partition, topic) + partition.leader = Some(leaderReplica) + debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id)) + partition + case None => + debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId)) + partition } - }.sortWith((s, t) => s._1.partId < t._1.partId) + }.sortWith((s, t) => s.partId < t.partId) } /** @@ -78,8 +85,8 @@ // refresh cache for all topics val topics = topicPartitionInfo.keySet.toList val topicMetadata = producer.send(new TopicMetadataRequest(topics)) - info("Fetched metadata for topics %s".format(topicMetadata.mkString(","))) topicMetadata.foreach(metadata => topicPartitionInfo += (metadata.topic -> metadata)) + } } } Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1300793) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -18,14 +18,14 @@ package kafka.producer.async import kafka.api.{ProducerRequest, TopicData, PartitionData} -import kafka.common.{FailedToSendMessageException, InvalidPartitionException, NoBrokersForPartitionException} -import kafka.cluster.{Partition, Broker} +import kafka.cluster.Partition import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder -import kafka.utils.{Utils, Logging} import scala.collection.Map import scala.collection.mutable.{ListBuffer, HashMap} +import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException} +import kafka.utils.{Utils, Logging} class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing private val partitioner: Partitioner[K], // use the other constructor @@ -43,41 +43,37 @@ def handle(events: Seq[ProducerData[K,V]]) { lock synchronized { val serializedData = serialize(events) - handleSerializedData(serializedData, config.producerRetries) + var outstandingProduceRequests = serializedData + var remainingRetries = config.producerRetries + Stream.continually(dispatchSerializedData(outstandingProduceRequests)) + .takeWhile(requests => (remainingRetries > 0) && (requests.size > 0)).foreach { + currentOutstandingRequests => + outstandingProduceRequests = currentOutstandingRequests + // back off and update the topic metadata cache before attempting another send operation + Thread.sleep(config.producerRetryBackoffMs) + brokerPartitionInfo.updateInfo() + remainingRetries -= 1 + } } } - private def handleSerializedData(messages: Seq[ProducerData[K,Message]], requiredRetries: Int) { + private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = { val partitionedData = partitionAndCollate(messages) - for ( (brokerid, eventsPerBrokerMap) <- partitionedData ) { - if (logger.isTraceEnabled) - eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s" - .format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) - val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) + val failedProduceRequests = new ListBuffer[ProducerData[K,Message]] + try { + for ( (brokerid, eventsPerBrokerMap) <- partitionedData ) { + if (logger.isTraceEnabled) + eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s" + .format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) + val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) - try { - send(brokerid, messageSetPerBroker) - } catch { - case t => - warn("error sending data to broker " + brokerid, t) - var numRetries = 0 - val eventsPerBroker = new ListBuffer[ProducerData[K,Message]] - eventsPerBrokerMap.foreach(e => eventsPerBroker.appendAll(e._2)) - while (numRetries < requiredRetries) { - numRetries +=1 - Thread.sleep(config.producerRetryBackoffMs) - try { - brokerPartitionInfo.updateInfo() - handleSerializedData(eventsPerBroker, 0) - return - } - catch { - case t => warn("error sending data to broker " + brokerid + " in " + numRetries + " retry", t) - } - } - throw new FailedToSendMessageException("can't send data after " + numRetries + " retries", t) + if((brokerid < 0) || (!send(brokerid, messageSetPerBroker))) + failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten) } + }catch { + case t: Throwable => error("Failed to send messages") } + failedProduceRequests } def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = { @@ -93,16 +89,22 @@ val partitionIndex = getPartition(event.getKey, totalNumPartitions) val brokerPartition = topicPartitionsList(partitionIndex) + val leaderBrokerId = brokerPartition.leader match { + case Some(leader) => leader.brokerId + case None => -1 + // postpone the failure until the send operation, so that requests for other brokers are handled correctly + } + var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null - ret.get(brokerPartition._2.id) match { + ret.get(leaderBrokerId) match { case Some(element) => dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] case None => dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] - ret.put(brokerPartition._2.id, dataPerBroker) + ret.put(leaderBrokerId, dataPerBroker) } - val topicAndPartition = (event.getTopic, brokerPartition._1.partId) + val topicAndPartition = (event.getTopic, brokerPartition.partId) var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null dataPerBroker.get(topicAndPartition) match { case Some(element) => @@ -116,10 +118,11 @@ ret } - private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[(Partition, Broker)] = { + private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = { debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic) - debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList) + debug("Broker partitions registered for topic: %s are %s" + .format(pd.getTopic, topicPartitionsList.map(p => p.partId).mkString(","))) val totalNumPartitions = topicPartitionsList.length if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) topicPartitionsList @@ -150,23 +153,30 @@ * @param brokerId the broker that will receive the request * @param messagesPerTopic the messages as a map from (topic, partition) -> messages */ - private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) { - if(messagesPerTopic.size > 0) { - val topics = new HashMap[String, ListBuffer[PartitionData]]() - for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { - topics.get(topicName) match { - case Some(x) => trace("found " + topicName) - case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic + private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Boolean = { + try { + if(brokerId < 0) + throw new NoLeaderForPartitionException("No leader for some partition(s) on broker %d".format(brokerId)) + if(messagesPerTopic.size > 0) { + val topics = new HashMap[String, ListBuffer[PartitionData]]() + for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { + topics.get(topicName) match { + case Some(x) => trace("found " + topicName) + case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic + } + topics(topicName).append(new PartitionData(partitionId, messagesSet)) } - topics(topicName).append(new PartitionData(partitionId, messagesSet)) + val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)) + val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray) + val syncProducer = producerPool.getProducer(brokerId) + val response = syncProducer.send(producerRequest) + // TODO: possibly send response to response callback handler + trace("kafka producer sent messages for topics %s to broker %d on %s:%d" + .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) } - val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)) - val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray) - val syncProducer = producerPool.getProducer(brokerId) - val response = syncProducer.send(producerRequest) - // TODO: possibly send response to response callback handler - trace("kafka producer sent messages for topics %s to broker %s:%d" - .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) + true + }catch { + case t: Throwable => false } } Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1300793) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -63,7 +63,11 @@ // returns a null object val expired = currentQueueItem == null if(currentQueueItem != null) { - trace("Dequeued item for topic %s, partition key: %s, data: %s" + if(currentQueueItem.getKey == null) + trace("Dequeued item for topic %s, no partition key, data: %s" + .format(currentQueueItem.getTopic, currentQueueItem.getData.toString)) + else + trace("Dequeued item for topic %s, partition key: %s, data: %s" .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString)) events += currentQueueItem Index: core/src/main/scala/kafka/admin/AdminUtils.scala =================================================================== --- core/src/main/scala/kafka/admin/AdminUtils.scala (revision 1300793) +++ core/src/main/scala/kafka/admin/AdminUtils.scala (working copy) @@ -80,10 +80,6 @@ for (i <- 0 until replicaAssignmentList.size) { val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString) ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i))) - // TODO: Remove this with leader election patch - // assign leader for the partition i -// ZkUtils.updateEphemeralPath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, i.toString), -// replicaAssignmentList(i).head) debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i)))) } } Index: core/src/main/scala/kafka/consumer/FetcherRunnable.scala =================================================================== --- core/src/main/scala/kafka/consumer/FetcherRunnable.scala (revision 1300793) +++ core/src/main/scala/kafka/consumer/FetcherRunnable.scala (working copy) @@ -20,7 +20,7 @@ import java.io.IOException import java.util.concurrent.CountDownLatch import kafka.api.{FetchRequestBuilder, OffsetRequest} -import kafka.cluster.{Partition, Broker} +import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.message.ByteBufferMessageSet import kafka.utils._ @@ -48,7 +48,7 @@ override def run() { for (infopti <- partitionTopicInfos) - info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: " + info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partitionId + " offset: " + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port) var reqId = 0 @@ -61,7 +61,7 @@ maxWait(0). minBytes(0) partitionTopicInfos.foreach(pti => - builder.addFetch(pti.topic, pti.partition.partId, pti.getFetchOffset(), config.fetchSize) + builder.addFetch(pti.topic, pti.partitionId, pti.getFetchOffset(), config.fetchSize) ) val fetchRequest = builder.build() @@ -70,13 +70,13 @@ var read = 0L for(infopti <- partitionTopicInfos) { - val messages = response.messageSet(infopti.topic, infopti.partition.partId).asInstanceOf[ByteBufferMessageSet] + val messages = response.messageSet(infopti.topic, infopti.partitionId).asInstanceOf[ByteBufferMessageSet] try { var done = false if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { info("offset for " + infopti + " out of range") // see if we can fix this error - val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partition) + val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partitionId) if(resetOffset >= 0) { infopti.resetFetchOffset(resetOffset) infopti.resetConsumeOffset(resetOffset) @@ -126,7 +126,7 @@ private def shutdownComplete() = shutdownLatch.countDown private def resetConsumerOffsets(topic : String, - partition: Partition) : Long = { + partitionId: Int) : Long = { var offset : Long = 0 config.autoOffsetReset match { case OffsetRequest.SmallestTimeString => offset = OffsetRequest.EarliestTime @@ -135,13 +135,13 @@ } // get mentioned offset from the broker - val offsets = simpleConsumer.getOffsetsBefore(topic, partition.partId, offset, 1) + val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, offset, 1) val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) // reset manually in zookeeper - info("updating partition " + partition.name + " for topic " + topic + " with " + + info("updating partition " + partitionId + " for topic " + topic + " with " + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0)) - ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString) + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partitionId, offsets(0).toString) offsets(0) } Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1300793) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -20,13 +20,12 @@ import java.util.concurrent._ import java.util.concurrent.atomic._ import kafka.message._ -import kafka.cluster._ import kafka.utils.Logging import kafka.common.ErrorMapping private[consumer] class PartitionTopicInfo(val topic: String, val brokerId: Int, - val partition: Partition, + val partitionId: Int, private val chunkQueue: BlockingQueue[FetchedDataChunk], private val consumedOffset: AtomicLong, private val fetchedOffset: AtomicLong, @@ -74,6 +73,6 @@ chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) } - override def toString(): String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get + + override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get + ": consumed offset = " + consumedOffset.get } Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1300793) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -91,7 +91,7 @@ private val rebalanceLock = new Object private var fetcher: Option[Fetcher] = None private var zkClient: ZkClient = null - private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] + private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] // queues : (topic,consumerThreadId) -> queue private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false) @@ -202,7 +202,7 @@ } // this API is used by unit tests only - def getTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]] = topicRegistry + def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { info("begin registering consumer " + consumerIdString + " in ZK") @@ -241,7 +241,7 @@ for (info <- infos.values) { val newOffset = info.getConsumeOffset try { - updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name, + updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, newOffset.toString) } catch { case t: Throwable => @@ -261,7 +261,7 @@ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) for(partition <- infos.values) { builder.append("\n {") - builder.append{partition.partition.name} + builder.append{partition} builder.append(",fetch offset:" + partition.getFetchOffset) builder.append(",consumer offset:" + partition.getConsumeOffset) builder.append("}") @@ -278,10 +278,9 @@ getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId) def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = { - val partition = new Partition(brokerId, partitionId) val partitionInfos = topicRegistry.get(topic) if (partitionInfos != null) { - val partitionInfo = partitionInfos.get(partition) + val partitionInfo = partitionInfos.get(partitionId) if (partitionInfo != null) return partitionInfo.getConsumeOffset } @@ -289,7 +288,7 @@ //otherwise, try to get it from zookeeper try { val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - val znode = topicDirs.consumerOffsetDir + "/" + partition.name + val znode = topicDirs.consumerOffsetDir + "/" + partitionId val offsetString = readDataMaybeNull(zkClient, znode) if (offsetString != null) return offsetString.toLong @@ -383,7 +382,7 @@ info("Releasing partition ownership") for ((topic, infos) <- topicRegistry) { for(partition <- infos.keys) { - val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.partId.toString) + val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString) deletePath(zkClient, partitionOwnerPath) debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath) } @@ -475,7 +474,7 @@ var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]() for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { topicRegistry.remove(topic) - topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo]) + topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) val topicDirs = new ZKGroupTopicDirs(group, topic) val curConsumers = consumersPerTopicMap.get(topic).get @@ -566,7 +565,7 @@ for (partition <- partitionInfos.values) allPartitionInfos ::= partition info("Consumer " + consumerIdString + " selected partitions : " + - allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(",")) + allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) fetcher match { case Some(f) => @@ -648,18 +647,17 @@ else offset = offsetString.toLong - val partitionObject = new Partition(leader, partition.toInt, topic) val queue = queues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) val fetchedOffset = new AtomicLong(offset) val partTopicInfo = new PartitionTopicInfo(topic, leader, - partitionObject, + partition.toInt, queue, consumedOffset, fetchedOffset, new AtomicInteger(config.fetchSize)) - partTopicInfoMap.put(partitionObject, partTopicInfo) + partTopicInfoMap.put(partition.toInt, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) } } Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1300793) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -246,6 +246,22 @@ else value } + def getLong(props: Properties, name: String, default: Long): Long = + getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue)) + + def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = { + val v = + if(props.containsKey(name)) + props.getProperty(name).toInt + else + default + if(v < range._1 || v > range._2) + throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".") + else + v + } + + def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = { val value = buffer.getLong if(value < range._1 || value > range._2) Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1300793) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -17,13 +17,14 @@ package kafka.utils -import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.serialize.ZkSerializer import kafka.cluster.{Broker, Cluster} import scala.collection._ import java.util.Properties import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} import kafka.consumer.TopicCount +import org.I0Itec.zkclient.{IZkDataListener, ZkClient} +import java.util.concurrent.locks.Condition object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -68,15 +69,9 @@ } def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { - // TODO: When leader election is implemented, change this method to return the leader as follows - // until then, assume the first replica as the leader -// val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString)) - val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString)) - val replicas = Utils.getCSVList(replicaListString) - replicas.size match { - case 0 => None - case _ => Some(replicas.head.toInt) - } + val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString)) + if(leader == null) None + else Some(leader.toInt) } def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = { @@ -94,6 +89,16 @@ replicas.contains(brokerId.toString) } + def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { + try { + createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString) + true + } catch { + case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false + case oe => false + } + } + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val broker = new Broker(id, creator, host, port) @@ -317,6 +322,27 @@ ret } + def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = { + val topicsAndPartitions = getPartitionsForTopics(zkClient, topics.iterator) + + topicsAndPartitions.map { tp => + val topic = tp._1 + val partitions = tp._2.map(p => p.toInt) + val relevantPartitions = partitions.filter { partition => + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt) + assignedReplicas.contains(brokerId) + } + (topic -> relevantPartitions) + } + } + + def getPartitionsAssignedToBroker(zkClient: ZkClient, topic: String, partitions: Seq[Int], broker: Int): Seq[Int] = { + partitions.filter { p => + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, p).map(r => r.toInt) + assignedReplicas.contains(broker) + } + } + def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) { val brokerIdPath = BrokerIdsPath + "/" + brokerId zkClient.delete(brokerIdPath) @@ -372,8 +398,31 @@ def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) ) + + def getAllTopics(zkClient: ZkClient): Seq[String] = { + val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) + if(topics == null) Seq.empty[String] + else topics + } + } +class LeaderExists(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener { + @throws(classOf[Exception]) + def handleDataChange(dataPath: String, data: Object) { + val t = dataPath.split("/").takeRight(3).head + val p = dataPath.split("/").takeRight(2).head.toInt + if(t == topic && p == partition) + leaderExists.signal() + } + + @throws(classOf[Exception]) + def handleDataDeleted(dataPath: String) { + leaderExists.signal() + } + +} + object ZKStringSerializer extends ZkSerializer { @throws(classOf[ZkMarshallingError]) Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaZooKeeper.scala (revision 1300793) +++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala (working copy) @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -18,30 +18,37 @@ package kafka.server import kafka.utils._ -import org.I0Itec.zkclient.{IZkStateListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import kafka.log.LogManager import java.net.InetAddress import kafka.common.KafkaZookeeperClient +import kafka.cluster.Replica +import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient} /** * Handles the server's interaction with zookeeper. The server needs to register the following paths: * /topics/[topic]/[node_id-partition_num] * /brokers/[0...N] --> host:port - * + * */ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Logging { - + val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId var zkClient: ZkClient = null var topics: List[String] = Nil val lock = new Object() - + var existingTopics: Set[String] = Set.empty[String] + val leaderChangeListener = new LeaderChangeListener + val topicPartitionsChangeListener = new TopicChangeListener + private val topicListenerLock = new Object + private val leaderChangeLock = new Object + def startup() { /* start client */ info("connecting to ZK: " + config.zkConnect) zkClient = KafkaZookeeperClient.getZookeeperClient(config) zkClient.subscribeStateChanges(new SessionExpireListener) + subscribeToTopicAndPartitionsChanges } def registerBrokerInZk() { @@ -73,6 +80,12 @@ info("re-registering broker info in ZK for broker " + config.brokerId) registerBrokerInZk() info("done re-registering broker") + info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener) + val topics = ZkUtils.getAllTopics(zkClient) + debug("Existing topics are %s".format(topics.mkString(","))) + topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener)) + handleNewTopics(topics) } } @@ -81,6 +94,169 @@ info("Closing zookeeper client...") zkClient.close() } - } - + } + + def handleNewTopics(topics: Seq[String]) { + // get relevant partitions to this broker + val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId) + topicsAndPartitionsOnThisBroker.foreach { tp => + val topic = tp._1 + val partitionsAssignedToThisBroker = tp._2 + // subscribe to leader changes for these partitions + subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker) + // start replicas for these partitions + startReplicasForPartitions(topic, partitionsAssignedToThisBroker) + } + } + + def handleNewPartitions(topic: String, partitions: Seq[Int]) { + info("Handling topic %s partitions %s".format(topic, partitions.mkString(","))) + // find the partitions relevant to this broker + val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topic, partitions, config.brokerId) + info("Partitions assigned to broker %d for topic %s are %s" + .format(config.brokerId, topic, partitionsAssignedToThisBroker.mkString(","))) + + // subscribe to leader changes for these partitions + subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker) + // start replicas for these partitions + startReplicasForPartitions(topic, partitionsAssignedToThisBroker) + } + + def subscribeToTopicAndPartitionsChanges { + info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener) + val topics = ZkUtils.getAllTopics(zkClient) + debug("Existing topics are %s".format(topics.mkString(","))) + topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener)) + + val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId) + debug("Partitions assigned to broker %d are %s".format(config.brokerId, partitionsAssignedToThisBroker.mkString(","))) + partitionsAssignedToThisBroker.foreach { tp => + val topic = tp._1 + val partitions = tp._2.map(p => p.toInt) + partitions.foreach { partition => + // register leader change listener + zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener) + } + } + } + + private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) { + partitions.foreach { partition => + info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition)) + // register leader change listener + zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener) + } + } + + def startReplicasForTopics(topics: Seq[String]) { + val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId) + partitionsAssignedToThisBroker.foreach(tp => startReplicasForPartitions(tp._1, tp._2)) + } + + private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) { + partitions.foreach { partition => + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt) + if(assignedReplicas.contains(config.brokerId)) { + val replica = logManager.addReplicaForPartition(topic, partition) + startReplica(replica) + } else + warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it" + .format(partition, topic, config.brokerId)) + } + } + + private def startReplica(replica: Replica) { + info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partId, replica.brokerId)) + replica.log match { + case Some(log) => // log is already started + case None => + // TODO: Add log recovery upto the last checkpointed HW as part of KAFKA-46 + } + ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partId) match { + case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader)) + case None => // leader election + leaderElection(replica) + + } + } + + def leaderElection(replica: Replica) { + info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partId)) + // read the AR list for replica.partition from ZK + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partId).map(r => r.toInt) + // TODO: read the ISR as part of KAFKA-302 + if(assignedReplicas.contains(replica.brokerId)) { + // wait for some time if it is not the preferred replica + try { + if(replica.brokerId != assignedReplicas.head) + Thread.sleep(config.preferredReplicaWaitTime) + }catch { + case e => // ignoring + } + if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) { + info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId)) + // TODO: Become leader as part of KAFKA-302 + } + } + } + + class TopicChangeListener extends IZkChildListener with Logging { + + @throws(classOf[Exception]) + def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + topicListenerLock.synchronized { + debug("Topic/partition change listener fired for path " + parentPath) + import scala.collection.JavaConversions._ + val currentChildren = asBuffer(curChilds) + // check if topic has changed or a partition for an existing topic has changed + if(parentPath == ZkUtils.BrokerTopicsPath) { + val currentTopics = currentChildren + debug("New topics " + currentTopics.mkString(",")) + // for each new topic [topic], watch the path /brokers/topics/[topic]/partitions + currentTopics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this)) + handleNewTopics(currentTopics) + }else { + val topic = parentPath.split("/").takeRight(2).head + debug("Partitions changed for topic %s on broker %d with new value %s" + .format(topic, config.brokerId, currentChildren.mkString(","))) + handleNewPartitions(topic, currentChildren.map(p => p.toInt).toSeq) + } + } + } + } + + class LeaderChangeListener extends IZkDataListener with Logging { + + @throws(classOf[Exception]) + def handleDataChange(dataPath: String, data: Object) { + // handle leader change event for path + val newLeader: String = data.asInstanceOf[String] + debug("Leader change listener fired for path %s. New leader is %s".format(dataPath, newLeader)) + // TODO: update the leader in the list of replicas maintained by the log manager + } + + @throws(classOf[Exception]) + def handleDataDeleted(dataPath: String) { + leaderChangeLock.synchronized { + // leader is deleted for topic partition + val topic = dataPath.split("/").takeRight(4).head + val partitionId = dataPath.split("/").takeRight(2).head.toInt + debug("Leader deleted listener fired for topic %s partition %d on broker %d" + .format(topic, partitionId, config.brokerId)) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt) + if(assignedReplicas.contains(config.brokerId)) { + val replica = logManager.getReplicaForPartition(topic, partitionId) + replica match { + case Some(r) => leaderElection(r) + case None => error("No replica exists for topic %s partition %s on broker %d" + .format(topic, partitionId, config.brokerId)) + } + } + } + } + } } + + + Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1300793) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -94,6 +94,15 @@ /* enable auto creation of topic on the server */ val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true) + /** + * Following properties are relevant to Kafka replication + */ + /* default replication factors for automatically created topics */ val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1) -} + + /* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during + * leader election on all replicas minus the preferred replica */ + val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300) + + } Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1300793) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -20,9 +20,9 @@ import java.util.concurrent._ import java.util.concurrent.atomic._ import java.io.File -import kafka.utils.{Mx4jLoader, Utils, SystemTime, Logging} import kafka.network.{SocketServerStats, SocketServer} import kafka.log.LogManager +import kafka.utils._ /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -72,6 +72,11 @@ * So this should happen after socket server start. */ logManager.startup + + // starting relevant replicas and leader election for partitions assigned to this broker + // TODO: Some part of the broker startup logic is hidden inside KafkaZookeeper, but some of it has to be done here + // since it requires the log manager to come up. Ideally log manager should not hide KafkaZookeeper inside it + logManager.kafkaZookeeper.startReplicasForTopics(ZkUtils.getAllTopics(logManager.getZookeeperClient)) info("Server started.") } @@ -82,7 +87,7 @@ def shutdown() { val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { - info("Shutting down Kafka server") + info("Shutting down Kafka server with id " + config.brokerId) if (socketServer != null) socketServer.shutdown() if(requestHandlerPool != null) @@ -108,3 +113,5 @@ def getStats(): SocketServerStats = socketServer.stats } + + Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1300793) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -130,7 +130,7 @@ try { trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) val log = logManager.getLog(topic, partition) - response = Right(if(log != null) log.read(offset, maxSize) else MessageSet.Empty) + response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty }) } catch { case e => error("error when processing request " + (topic, partition, offset, maxSize), e) @@ -168,7 +168,7 @@ if(config.autoCreateTopics) { CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with partitions %d and replication factor %d is successful!" + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.numPartitions, config.defaultReplicationFactor)) val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head newTopicMetadata match { Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1300793) +++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) @@ -26,7 +26,6 @@ val ackTimeout: Int, val data: Array[TopicData]) extends Request(RequestKeys.Produce) { - import Implicits._ val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data) def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }