From 23a4537d5ca197ea380fa67995fdae5358a61f25 Mon Sep 17 00:00:00 2001 From: Paul Pearcy Date: Mon, 12 Jan 2015 17:25:11 -0500 Subject: [PATCH 1/1] KAFKA-1835 - New producer updates to make blocking behavior explicit --- .../kafka/clients/producer/KafkaProducer.java | 70 ++++++- .../kafka/clients/producer/ProducerConfig.java | 10 + .../kafka/api/ProducerBlockingTest.scala | 202 +++++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 8 +- 4 files changed, 286 insertions(+), 4 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/api/ProducerBlockingTest.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a61c56c..f8d69ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -14,9 +14,7 @@ package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.producer.internals.Metadata; @@ -78,6 +76,9 @@ public class KafkaProducer implements Producer { private final Serializer valueSerializer; private final ProducerConfig producerConfig; + private Boolean initialized = false; + ExecutorService initExecutor = null; + /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. Values can be @@ -219,9 +220,61 @@ public class KafkaProducer implements Producer { this.valueSerializer = valueSerializer; config.logUnused(); + + final List preInitTopics = new ArrayList(); + for (String topic : config.getString(ProducerConfig.PRE_INITIALIZE_TOPICS_CONFIG).split(",")) { + if (!topic.isEmpty()) + preInitTopics.add(topic); + } + final Long preInitTimeout = config.getLong(ProducerConfig.PRE_INITIALIZE_TIMEOUT_MS_CONFIG); + + // Kick off a thread to get meta for any specified topics. This will ensure that the producer doesn't block + if (preInitTopics.size() > 0) { + FutureTask future = + new FutureTask(new Callable() { + public Boolean call() { + return initializeProducer(preInitTopics); + } + }); + initExecutor = Executors.newSingleThreadExecutor(); + initExecutor.execute(future); + + if (preInitTimeout > 0) { + try { + future.get(preInitTimeout, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.error("Failed to initialize the producer meta for topics(" + preInitTopics + ") within preinit timeout: " + preInitTimeout.toString() + "ms. Sends will fail until initialization completes.", e); + } + } + } else { + // Nothing to initialize + initialized = true; + } + log.debug("Kafka producer started"); } + /** + * If the preinit metadata settings are used, will indicate that the producer was successfully initialized + * + */ + public Boolean isInitialized() { + return initialized; + } + + private Boolean initializeProducer(List preInitTopics) { + for (String topic : preInitTopics) { + // We wait forever here, this method should always be running in its own thread + waitOnMetadata(topic, Long.MAX_VALUE); + } + + log.info("Successfully initialized kafka producer for topics: " + preInitTopics); + + // If we got here, we are good. + initialized = true; + return true; + } + private static int parseAcks(String acksString) { try { return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim()); @@ -302,6 +355,10 @@ public class KafkaProducer implements Producer { @Override public Future send(ProducerRecord record, Callback callback) { try { + if (!initialized) { + return new FutureFailure(new IllegalStateException("Producer is not yet initialized")); + } + // first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); byte[] serializedKey; @@ -409,11 +466,18 @@ public class KafkaProducer implements Producer { public void close() { log.trace("Closing the Kafka producer."); this.sender.initiateClose(); + + if (this.initExecutor != null) { + this.initExecutor.shutdown(); + } + try { this.ioThread.join(); } catch (InterruptedException e) { throw new KafkaException(e); } + + this.metrics.close(); this.keySerializer.close(); this.valueSerializer.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 8b3e565..54d7fc2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -182,6 +182,14 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + /** pre.initialize.topics */ + public static final String PRE_INITIALIZE_TOPICS_CONFIG = "pre.initialize.topics"; + private static final String PRE_INITIALIZE_TOPICS_DOC = "Comma delimited list of topics that have metadata populated on producer creation. By passing in a list and specifying pre.initialize.timeout, it can be guaranteed that the producer will never block."; + + /** pre.initialize.timeout */ + public static final String PRE_INITIALIZE_TIMEOUT_MS_CONFIG = "pre.initialize.timeout.ms"; + private static final String PRE_INITIALIZE_TIMEOUT_MS_DOC = "Only used along with pre.initialize.topics. If the producer is unable to populate meta within this amount of time producer creation will throw an exception."; + static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -206,6 +214,8 @@ public class ProducerConfig extends AbstractConfig { Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) + .define(PRE_INITIALIZE_TOPICS_CONFIG, Type.STRING, "", Importance.LOW, PRE_INITIALIZE_TOPICS_DOC) + .define(PRE_INITIALIZE_TIMEOUT_MS_CONFIG, Type.LONG, 0L, atLeast(0L), Importance.LOW, PRE_INITIALIZE_TIMEOUT_MS_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) diff --git a/core/src/test/scala/integration/kafka/api/ProducerBlockingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBlockingTest.scala new file mode 100644 index 0000000..0e02809 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerBlockingTest.scala @@ -0,0 +1,202 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api.test + +import org.junit.Test +import org.junit.Assert._ + +import java.util.concurrent.{TimeUnit, ExecutionException} + +import kafka.server.KafkaConfig +import kafka.integration.KafkaServerTestHarness +import kafka.utils.{TestZKUtils, TestUtils} + +import org.apache.kafka.clients.producer._ + +import System.{currentTimeMillis => _time} + +class ProducerBlockingTest extends KafkaServerTestHarness { + def profile[R](code: => R, t: Long = _time) = (code, _time - t) + + val configs = + for(props <- TestUtils.createBrokerConfigs(numConfigs = 1, enableControlledShutdown = false)) + yield new KafkaConfig(props) { + override val zkConnect = TestZKUtils.zookeeperConnect + override val autoCreateTopicsEnable = false + } + + var producer: KafkaProducer[Array[Byte],Array[Byte]] = null + var duration: Long = 0 + + val topic = "topic-blockingtest" + + val expectedBlockPeriod = 5000 + val fudge = 500 + val fastResponse = 30 + + def record(t: String = topic) = new ProducerRecord[Array[Byte],Array[Byte]](t, null, "key".getBytes, new Array[Byte](100)) + + override def tearDown() { + if (producer != null) producer.close() + + super.tearDown() + } + + def assertDuration(operation: String, duration: Long, expected: Long) { + assertTrue(s"$operation - Time duration expected: $expected, actual: $duration", duration < expected + fudge && duration > expected - fudge) + } + + def assertFast(operation: String, duration: Long) { + assertTrue(s"$operation - Expected time duration for to be fast (< $fastResponse). Actual: $duration", duration < fastResponse) + } + + /** + * This test shows the default blocking behavior that can occur. This default behavior is questionable for an API that returns a future as it is not guaranteed to not block + */ + @Test + def testBlockingWhenSendAndServerDown() { + servers.map(_.shutdown()) + + producer = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, metadataFetchTimeout = expectedBlockPeriod) + assertTrue("Always indicate things are initialized unless pre-init is specified", producer.isInitialized) + + val (_, duration) = profile { + intercept[ExecutionException] { + producer.send(record()).get + } + } + + assertDuration("Default send blocking", duration, expectedBlockPeriod) + } + + /** + * If auto create == false and topic doesn't exist things should fast fail. Instead, the API blocks for metadataFetchTimeout + * Should open a different ticket for this. + */ + @Test + def testBlockingOnMetaOnMissingTopic() { + producer = TestUtils.createNewProducer( + brokerList, acks = 0, + blockOnBufferFull = false, + metadataFetchTimeout = expectedBlockPeriod) + assertTrue("Always indicate things are initialized unless pre-init is specified", producer.isInitialized) + + // Since auto create is false, let's see how long it takes for send to fail + val (_, duration) = profile { + intercept[ExecutionException] { + producer.send(record()).get(10000, TimeUnit.MILLISECONDS) + } + } + + assertDuration("Blocking when up, but topic not created", duration, expectedBlockPeriod) + } + + /** + * When preInitFailureAction is fail and kafka server is down expect producer creation to fail within preInitTimeout + */ + @Test + def testFailInitWhenPreInitSpecified() { + TestUtils.createTopic(zkClient, topic, 1, 1, servers) + + servers.map(_.shutdown()) + + val (_, duration) = profile { + producer = TestUtils.createNewProducer( + brokerList, acks = 0, + blockOnBufferFull = false, + preInitTimeout = Some(expectedBlockPeriod), + preInitTopics = Some(topic)) + } + + assertFalse("Producer should indicate it is not initialized", producer.isInitialized) + assertDuration("Blocking during producer creation failure", duration, expectedBlockPeriod) + } + + + /** + * When preInitFailureAction is ignore and kafka server is down expect sends to fast fail and then sends to succeed once server is up + */ + @Test + def testPreInitIgnoreFailAndSend() { + TestUtils.createTopic(zkClient, topic, 1, 1, servers) + + servers.map(_.shutdown()) + + val (_, duration) = profile { + producer = TestUtils.createNewProducer( + brokerList, acks = 0, + blockOnBufferFull = false, + preInitTimeout = Some(expectedBlockPeriod), + preInitTopics = Some(topic)) + } + + assertDuration("Blocking during producer init failure ignore", duration, expectedBlockPeriod) + assertFalse("Producer is not yet initialized", producer.isInitialized) + + // This send should return very quickly + val e = intercept[ExecutionException] { + producer.send(record()).get(10, TimeUnit.MILLISECONDS) + } + assertEquals(e.getCause.getMessage, "Producer is not yet initialized") + + servers.map(_.startup()) + + // Give producer time to correctly init + while (!producer.isInitialized) { + Thread.sleep(100) + } + + val (_, sendDuration) = profile { + producer.send(record()) + } + assertFast("Valid send after server started", sendDuration) + } + + @Test + def testStillBlockIfAttemptingToSendToNewTopicAfterPreInit() { + TestUtils.createTopic(zkClient, topic, 1, 1, servers) + + producer = TestUtils.createNewProducer( + brokerList, acks = 0, + blockOnBufferFull = false, + metadataFetchTimeout = expectedBlockPeriod, // This should be set low when using pre-init + preInitTimeout = Some(1000), + preInitTopics = Some(topic)) + + assertTrue("Producer is initialized", producer.isInitialized) + + servers.map(_.shutdown()) + + val (_, expectedFast) = profile { + producer.send(record()) + } + + assertFast("Valid send after servers shutdown", expectedFast) + + val (_, blockTime) = profile { + producer.send(record("nometadata")) + } + + assertDuration("Blocking in initialized client for a topic with no meta data", blockTime, expectedBlockPeriod) + + // Start the server backup so that the sends that are queued can progress + // Could alternatively just null out producer + servers.map(_.startup()) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c9e8ba2..bc37c09 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -383,7 +383,9 @@ object TestUtils extends Logging { metadataFetchTimeout: Long = 3000L, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, - retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { + retries: Int = 0, + preInitTimeout: Option[Long] = None, + preInitTopics: Option[String] = None) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -397,6 +399,10 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + + preInitTimeout.map(v => producerProps.put(ProducerConfig.PRE_INITIALIZE_TIMEOUT_MS_CONFIG, v.toString)) + preInitTopics.map(producerProps.put(ProducerConfig.PRE_INITIALIZE_TOPICS_CONFIG, _)) + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } -- 1.9.3 (Apple Git-50)