diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d8f9ce6..c9ec8d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -77,6 +77,9 @@ public class NetworkClient implements KafkaClient { /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ private boolean metadataFetchInProgress; + /* the last timestamp in milliseconds that metadata update is attempted */ + private long metadataLastUpdateAttemptMS; + public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -94,6 +97,7 @@ public class NetworkClient implements KafkaClient { this.correlation = 0; this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; + this.metadataLastUpdateAttemptMS = 0; } /** @@ -163,6 +167,8 @@ public class NetworkClient implements KafkaClient { // should we update our metadata? long metadataTimeout = metadata.timeToNextUpdate(now); + // if we have just tried to update metadata, back off refreshing + metadataTimeout = Math.max(metadataTimeout, this.metadataLastUpdateAttemptMS + metadata.refreshBackoffMs - now); if (!this.metadataFetchInProgress && metadataTimeout == 0) maybeUpdateMetadata(sends, now); @@ -351,6 +357,7 @@ public class NetworkClient implements KafkaClient { * Add a metadata request to the list of sends if we can make one */ private void maybeUpdateMetadata(List sends, long now) { + this.metadataLastUpdateAttemptMS = now; Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 4aa5b01..6da8e0a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -32,8 +32,8 @@ public final class Metadata { private static final Logger log = LoggerFactory.getLogger(Metadata.class); - private final long refreshBackoffMs; - private final long metadataExpireMs; + public final long refreshBackoffMs; + public final long metadataExpireMs; private int version; private long lastRefreshMs; private Cluster cluster; diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 15fd5bc..9e0a1af 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -21,26 +21,30 @@ import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.{Random, Properties} +import java.util.Random import java.lang.Integer import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ShutdownableThread, Utils, TestUtils} -import kafka.zk.ZooKeeperTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} +import kafka.integration.KafkaServerTestHarness import kafka.consumer.SimpleConsumer import org.apache.kafka.common.KafkaException import org.apache.kafka.clients.producer._ -class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness { - private val brokerId1 = 0 - private val brokerId2 = 1 - private val ports = TestUtils.choosePorts(2) - private val (port1, port2) = (ports(0), ports(1)) - private var server1: KafkaServer = null - private var server2: KafkaServer = null - private var servers = List.empty[KafkaServer] +class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness { + private val producerBufferSize = 30000 + private val serverMessageMaxBytes = producerBufferSize/2 + + val numServers = 2 + val configs = + for(props <- TestUtils.createBrokerConfigs(numServers, false)) + yield new KafkaConfig(props) { + override val zkConnect = TestZKUtils.zookeeperConnect + override val autoCreateTopicsEnable = false + override val messageMaxBytes = serverMessageMaxBytes + } private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null @@ -50,32 +54,19 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness private var producer3: KafkaProducer = null private var producer4: KafkaProducer = null - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) - props1.put("auto.create.topics.enable", "false") - props2.put("auto.create.topics.enable", "false") - private val config1 = new KafkaConfig(props1) - private val config2 = new KafkaConfig(props2) - private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)) - - private val bufferSize = 2 * config1.messageMaxBytes - private val topic1 = "topic-1" private val topic2 = "topic-2" override def setUp() { super.setUp() - server1 = TestUtils.createServer(config1) - server2 = TestUtils.createServer(config2) - servers = List(server1,server2) // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") + consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize); - producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) - producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize) + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize); + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) } override def tearDown() { @@ -87,9 +78,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness if (producer3 != null) producer3.close if (producer4 != null) producer4.close - server1.shutdown; Utils.rm(server1.config.logDirs) - server2.shutdown; Utils.rm(server2.config.logDirs) - super.tearDown() } @@ -102,7 +90,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) } @@ -115,7 +103,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) intercept[ExecutionException] { producer2.send(record).get } @@ -149,7 +137,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // producer with incorrect broker list - producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) + producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) // send a record with incorrect broker list val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) @@ -175,8 +163,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled - server1.requestHandlerPool.shutdown() - server2.requestHandlerPool.shutdown() + servers.foreach(server => server.requestHandlerPool.shutdown()) producer1.send(record1).get(5000, TimeUnit.MILLISECONDS) @@ -186,11 +173,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // TODO: expose producer configs after creating them // send enough messages to get buffer full - val msgSize = 10000 + val tooManyRecords = 10 + val msgSize = producerBufferSize / tooManyRecords val value = new Array[Byte](msgSize) new Random().nextBytes(value) val record2 = new ProducerRecord(topic1, null, "key".getBytes, value) - val tooManyRecords = bufferSize / ("key".getBytes.length + value.length) intercept[KafkaException] { for (i <- 1 to tooManyRecords) @@ -269,17 +256,13 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // rolling bounce brokers for (i <- 0 until 2) { - server1.shutdown() - server1.awaitShutdown() - server1.startup + for (server <- servers) { + server.shutdown() + server.awaitShutdown() + server.startup - Thread.sleep(2000) - - server2.shutdown() - server2.awaitShutdown() - server2.startup - - Thread.sleep(2000) + Thread.sleep(2000) + } // Make sure the producer do not see any exception // in returned metadata due to broker failures @@ -298,7 +281,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness // double check that the leader info has been propagated after consecutive bounces val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition) - val fetchResponse = if(leader == server1.config.brokerId) { + val fetchResponse = if(leader == configs(0).brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) @@ -317,7 +300,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness var sent = 0 var failed = false - val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries = 10) + val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10) override def doWork(): Unit = { val responses = diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 34a7db4..d407af9 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,7 +17,6 @@ package kafka.api.test -import java.util.Properties import java.lang.{Integer, IllegalArgumentException} import org.apache.kafka.clients.producer._ @@ -25,53 +24,41 @@ import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{Utils, TestUtils} -import kafka.zk.ZooKeeperTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message +import kafka.integration.KafkaServerTestHarness -class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { - private val brokerId1 = 0 - private val brokerId2 = 1 - private val ports = TestUtils.choosePorts(2) - private val (port1, port2) = (ports(0), ports(1)) - private var server1: KafkaServer = null - private var server2: KafkaServer = null - private var servers = List.empty[KafkaServer] +class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { + val numServers = 2 + val configs = + for(props <- TestUtils.createBrokerConfigs(numServers, false)) + yield new KafkaConfig(props) { + override val zkConnect = TestZKUtils.zookeeperConnect + override val numPartitions = 4 + } private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) - props1.put("num.partitions", "4") - props2.put("num.partitions", "4") - private val config1 = new KafkaConfig(props1) - private val config2 = new KafkaConfig(props2) - private val topic = "topic" private val numRecords = 100 override def setUp() { super.setUp() - // set up 2 brokers with 4 partitions each - server1 = TestUtils.createServer(config1) - server2 = TestUtils.createServer(config2) - servers = List(server1,server2) // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") + consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") } override def tearDown() { - server1.shutdown - server2.shutdown - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) + consumer1.close() + consumer2.close() + super.tearDown() } @@ -90,7 +77,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testSendOffset() { - var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + var producer = TestUtils.createNewProducer(brokerList) val callback = new CheckErrorCallback @@ -146,7 +133,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testClose() { - var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + var producer = TestUtils.createNewProducer(brokerList) try { // create topic @@ -182,7 +169,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testSendToPartition() { - var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + var producer = TestUtils.createNewProducer(brokerList) try { // create topic @@ -209,7 +196,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { } // make sure the fetched messages also respect the partitioning and ordering - val fetchResponse1 = if(leader1.get == server1.config.brokerId) { + val fetchResponse1 = if(leader1.get == configs(0).brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) @@ -237,8 +224,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { */ @Test def testAutoCreateTopic() { - var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), - retries = 5) + var producer = TestUtils.createNewProducer(brokerList, retries = 5) try { // Send a message to auto-create the topic diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 194dd70..3cf7c9b 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -30,11 +30,13 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { val configs: List[KafkaConfig] var servers: List[KafkaServer] = null + var brokerList: String = null override def setUp() { super.setUp if(configs.size <= 0) throw new KafkaException("Must suply at least one server config.") + brokerList = TestUtils.getBrokerListStrFromConfigs(configs) servers = configs.map(TestUtils.createServer(_)) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3faa884..4d01d25 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -385,7 +385,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) - producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000") + producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") return new KafkaProducer(producerProps) }