diff --git a/config/test-log4j.properties b/config/test-log4j.properties index e0bbc13..a3ae33f 100644 --- a/config/test-log4j.properties +++ b/config/test-log4j.properties @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout +log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout @@ -45,8 +45,8 @@ log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n # Turn on all our debugging info #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender -log4j.logger.kafka.tools=DEBUG, kafkaAppender -log4j.logger.kafka.tools.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +log4j.logger.kafka.perf=DEBUG, kafkaAppender +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG log4j.logger.kafka=INFO, kafkaAppender @@ -64,3 +64,5 @@ log4j.additivity.kafka.controller=false log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=false + + diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e776423..d0cf5f1 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -249,22 +249,24 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // Move leadership serially to relinquish lock. inLock(controllerContext.controllerLock) { controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => - if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id && replicationFactor > 1) { - // If the broker leads the topic partition, transition the leader and update isr. Updates zk and - // notifies all affected brokers - partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, - controlledShutdownPartitionLeaderSelector) - } else { - // Stop the replica first. The state change below initiates ZK changes which should take some time - // before which the stop replica request should be completed (in most cases) - brokerRequestBatch.newBatch() - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, - topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) - - // If the broker is a follower, updates the isr in ZK and notifies the current leader - replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, - topicAndPartition.partition, id)), OfflineReplica) + if (replicationFactor > 1) { + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { + // If the broker leads the topic partition, transition the leader and update isr. Updates zk and + // notifies all affected brokers + partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, + controlledShutdownPartitionLeaderSelector) + } else { + // Stop the replica first. The state change below initiates ZK changes which should take some time + // before which the stop replica request should be completed (in most cases) + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, + topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + + // If the broker is a follower, updates the isr in ZK and notifies the current leader + replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, + topicAndPartition.partition, id)), OfflineReplica) + } } } } diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index cd4ca2f..b9405cf 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -50,8 +50,8 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness private var producer3: KafkaProducer = null private var producer4: KafkaProducer = null - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) props1.put("auto.create.topics.enable", "false") props2.put("auto.create.topics.enable", "false") private val config1 = new KafkaConfig(props1) @@ -333,4 +333,4 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness producer.close } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 3c2bf36..34a7db4 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -45,8 +45,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) props1.put("num.partitions", "4") props2.put("num.partitions", "4") private val config1 = new KafkaConfig(props1) @@ -255,4 +255,4 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { } } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index fcd5eee..1bf2667 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -37,10 +37,10 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { val port3 = TestUtils.choosePort() val port4 = TestUtils.choosePort() - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) - val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var brokers: Seq[Broker] = Seq.empty[Broker] @@ -205,4 +205,4 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(partition6DataForTopic3.replicas(2).id, 2) assertEquals(partition6DataForTopic3.replicas(3).id, 3) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 4f6ddca..e289798 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -236,7 +236,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def testReassigningNonExistingPartition() { val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions // create brokers - val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), @@ -298,7 +298,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partition = 1 val preferredReplica = 0 // create brokers - val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) + val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) @@ -318,7 +318,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topic = "test" val partition = 1 // create brokers - val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) + val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_)) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 1b3c04e..5d3c57a 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -101,7 +101,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4) + val brokerConfigs = TestUtils.createBrokerConfigs(4, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) @@ -258,7 +258,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(3) + val brokerConfigs = TestUtils.createBrokerConfigs(3, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 5eee08a..eab4b5f 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -35,15 +35,11 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { val port3 = TestUtils.choosePort() val port4 = TestUtils.choosePort() - val enableShutdown = true + // controlled.shutdown.enable is true by default val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - configProps1.put("controlled.shutdown.enable", "true") val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) - configProps2.put("controlled.shutdown.enable", "true") val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) - configProps3.put("controlled.shutdown.enable", "true") val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) - configProps4.put("controlled.shutdown.enable", "true") configProps4.put("controlled.shutdown.retry.backoff.ms", "100") var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] @@ -111,4 +107,4 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { // Start the server back up again servers(prevLeader).startup() } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index b61c0b8..dd71d81 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -48,10 +48,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) private var servers = List.empty[KafkaServer] - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) props1.put("num.partitions", "4") private val config1 = new KafkaConfig(props1) - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) props2.put("num.partitions", "4") private val config2 = new KafkaConfig(props2) @@ -314,7 +314,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // make sure we don't wait fewer than timeoutMs assertTrue((t2-t1) >= timeoutMs) } - + @Test def testSendNullMessage() { val producer = TestUtils.createProducer[String, String]( @@ -332,7 +332,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ "Topic new-topic not created after timeout", waitTime = zookeeper.tickTime) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0) - + producer.send(new KeyedMessage[String, String]("new-topic", "key", null)) } finally { producer.close() diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 0dec9ec..4e43c09 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -31,7 +31,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private var messageBytes = new Array[Byte](2); - val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1).head)) + val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1, false).head)) val zookeeperConnect = TestZKUtils.zookeeperConnect @Test diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 25dffcf..c2ba07c 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -34,8 +34,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val port1 = TestUtils.choosePort() val port2 = TestUtils.choosePort() - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var staleControllerEpochDetected = false @@ -146,4 +146,4 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { case _ => false } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index b349fce..0ec120a 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -31,7 +31,7 @@ import org.junit.Assert._ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + val configs = TestUtils.createBrokerConfigs(2, false).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 5000L override val replicaLagMaxMessages = 10L override val replicaFetchWaitMaxMs = 1000 @@ -52,7 +52,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - + override def setUp() { super.setUp() @@ -131,7 +131,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(1) hw += 1 - + // give some time for follower 1 to record leader HW of 60 TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, @@ -162,7 +162,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(2) var hw = 2L - + // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, @@ -188,7 +188,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages(2) hw += 2 - + // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 3e0bc18..da4bafc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -27,7 +27,7 @@ import junit.framework.Assert._ import kafka.common._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { - val props = createBrokerConfigs(2) + val props = createBrokerConfigs(2,false) val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[KafkaServer] = null val topic1 = "foo" @@ -73,4 +73,4 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } waitUntilTrue(logsMatch, "Broker logs should be identical") } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4da0f2c..12f8045 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -48,7 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer * Utility functions to help with testing */ object TestUtils extends Logging { - + val IoTmpDir = System.getProperty("java.io.tmpdir") val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -127,9 +127,10 @@ object TestUtils extends Logging { /** * Create a test config for the given node id */ - def createBrokerConfigs(numConfigs: Int): List[Properties] = { + def createBrokerConfigs(numConfigs: Int, + enableControlledShutdown: Boolean = true): List[Properties] = { for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port) + yield createBrokerConfig(node, port, enableControlledShutdown) } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { @@ -139,7 +140,8 @@ object TestUtils extends Logging { /** * Create a test config for the given node id */ - def createBrokerConfig(nodeId: Int, port: Int = choosePort()): Properties = { + def createBrokerConfig(nodeId: Int, port: Int = choosePort(), + enableControlledShutdown: Boolean = true): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") @@ -147,6 +149,7 @@ object TestUtils extends Logging { props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") + props.put("controlled.shutdown.enable", enableControlledShutdown.toString) props } @@ -340,7 +343,7 @@ object TestUtils extends Logging { * Create a producer with a few pre-configured properties. * If certain properties need to be overridden, they can be provided in producerProps. */ - def createProducer[K, V](brokerList: String, + def createProducer[K, V](brokerList: String, encoder: String = classOf[DefaultEncoder].getName, keyEncoder: String = classOf[DefaultEncoder].getName, partitioner: String = classOf[DefaultPartitioner].getName, @@ -445,9 +448,9 @@ object TestUtils extends Logging { /** * Create a wired format request based on simple basic information */ - def produceRequest(topic: String, - partition: Int, - message: ByteBufferMessageSet, + def produceRequest(topic: String, + partition: Int, + message: ByteBufferMessageSet, acks: Int = SyncProducerConfig.DefaultRequiredAcks, timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, correlationId: Int = 0, @@ -455,10 +458,10 @@ object TestUtils extends Logging { produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId) } - def produceRequestWithAcks(topics: Seq[String], - partitions: Seq[Int], - message: ByteBufferMessageSet, - acks: Int = SyncProducerConfig.DefaultRequiredAcks, + def produceRequestWithAcks(topics: Seq[String], + partitions: Seq[Int], + message: ByteBufferMessageSet, + acks: Int = SyncProducerConfig.DefaultRequiredAcks, timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, correlationId: Int = 0, clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = { @@ -540,7 +543,7 @@ object TestUtils extends Logging { return leader } - + /** * Execute the given block. If it throws an assert error, retry. Repeat * until no error is thrown or the time limit ellapses @@ -554,7 +557,7 @@ object TestUtils extends Logging { return } catch { case e: AssertionFailedError => - val ellapsed = System.currentTimeMillis - startTime + val ellapsed = System.currentTimeMillis - startTime if(ellapsed > maxWaitMs) { throw e } else { @@ -631,7 +634,7 @@ object TestUtils extends Logging { leader } - + def writeNonsenseToFile(fileName: File, position: Long, size: Int) { val file = new RandomAccessFile(fileName, "rw") file.seek(position) @@ -639,7 +642,7 @@ object TestUtils extends Logging { file.writeByte(random.nextInt(255)) file.close() } - + def appendNonsenseToFile(fileName: File, size: Int) { val file = new FileOutputStream(fileName, true) for(i <- 0 until size)