diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index e4bc972..721403c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; @@ -210,17 +211,26 @@ public class KafkaProducer implements Producer { @Override public Future send(ProducerRecord record, Callback callback) { try { + System.out.println("Finding Dest for record " + record.toString()); Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); + System.out.println("Cluster for record " + record.toString()); int partition = partitioner.partition(record, cluster); ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); + System.out.println("Found Dest Partition: " + tp.toString() + " for record " + record.toString()); FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); this.sender.wakeup(); return future; - } catch (Exception e) { + // For API exceptions return them in the future; + // for other exceptions throw directly + } catch (ApiException e) { if (callback != null) callback.onCompletion(null, e); return new FutureFailure(e); + } catch (InterruptedException e) { + throw new KafkaException(e); + } catch (KafkaException e) { + throw e; } } @@ -255,7 +265,6 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - this.accumulator.close(); this.sender.initiateClose(); try { this.ioThread.join(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java index 8c77698..e752997 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java @@ -32,6 +32,12 @@ public final class RecordMetadata { this.topicPartition = topicPartition; } + public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) { + // ignore the relativeOffset if the base offset is -1, + // since this indicates the offset is unknown + this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset); + } + /** * The offset of the record in the topic/partition. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 22d4c79..9444c87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -20,6 +20,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.producer.RecordMetadata; @@ -60,7 +61,7 @@ public final class FutureRecordMetadata implements Future { if (this.result.error() != null) throw new ExecutionException(this.result.error()); else - return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset); + return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); } public long relativeOffset() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 62613a3..ce23168 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -74,10 +74,10 @@ public final class Metadata { */ public synchronized Cluster fetch(String topic, long maxWaitMs) { List partitions = null; + long begin = System.currentTimeMillis(); do { partitions = cluster.partitionsFor(topic); if (partitions == null) { - long begin = System.currentTimeMillis(); topics.add(topic); forceUpdate = true; try { @@ -85,7 +85,7 @@ public final class Metadata { } catch (InterruptedException e) { /* this is fine, just try again */ } long ellapsed = System.currentTimeMillis() - begin; - if (ellapsed > maxWaitMs) + if (ellapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } else { return cluster; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index eb16f6d..4e21b0d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -55,7 +55,7 @@ public final class RecordBatch { this.records.append(0L, key, value, compression); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) - thunks.add(new Thunk(callback, this.recordCount)); + thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } @@ -74,8 +74,8 @@ public final class RecordBatch { try { Thunk thunk = this.thunks.get(i); if (exception == null) - thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset), - null); + // get() should not throw any exception here + thunk.callback.onCompletion(thunk.future.get(), null); else thunk.callback.onCompletion(null, exception); } catch (Exception e) { @@ -85,15 +85,15 @@ public final class RecordBatch { } /** - * A callback and the associated RecordSend argument to pass to it. + * A callback and the associated FutureRecordMetadata argument to pass to it. */ final private static class Thunk { final Callback callback; - final long relativeOffset; + final FutureRecordMetadata future; - public Thunk(Callback callback, long relativeOffset) { + public Thunk(Callback callback, FutureRecordMetadata future) { this.callback = callback; - this.relativeOffset = relativeOffset; + this.future = future; } } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e373265..541c5e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -324,6 +324,7 @@ public class Sender implements Runnable { private void handleDisconnects(List disconnects, long now) { // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { + nodeStates.disconnected(node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { if (request.batches != null) { for (RecordBatch batch : request.batches.values()) { @@ -335,7 +336,6 @@ public class Sender implements Runnable { } } } - nodeStates.disconnected(request.request.destination()); } } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f1e474c..678bfcc 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -299,6 +299,7 @@ public class Selector implements Selectable { Transmissions trans = transmissions(key); if (trans != null) { this.disconnected.add(trans.id); + this.keys.remove(trans.id); trans.clearReceive(); trans.clearSend(); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f88992a..3374bd9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; public enum Errors { UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")), NONE(0, null), - OFFSET_OUT_OF_RANGE(1, - new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), - CORRUPT_MESSAGE(2, - new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), + OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), + CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), - LEADER_NOT_AVAILABLE(5, - new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), + // TODO: errorCode 4 for InvalidFetchSize + LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), - MESSAGE_TOO_LARGE(10, - new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), + // TODO: errorCode 8, 9, 11 + MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 06261b9..69293d9 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -41,7 +41,15 @@ object ProducerResponse { } } -case class ProducerResponseStatus(error: Short, offset: Long) +case class ProducerResponseStatus(var error: Short, offset: Long) + +case class DelayedProduceResponseStatus(var acksPending: Boolean, + status: ProducerResponseStatus, + requiredOffset: Long) { + override def toString = + "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( + acksPending, status.error, status.offset, requiredOffset) +} case class ProducerResponse(override val correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ae2df20..1f3b6f3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -231,7 +231,8 @@ class KafkaApis(val requestChannel: RequestChannel, // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.map( topicAndPartition => new RequestKey(topicAndPartition)).toSeq - val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap + val statuses = localProduceResults.map(r => + r.key -> DelayedProduceResponseStatus(false, ProducerResponseStatus(r.errorCode, r.start), r.end + 1)).toMap val delayedProduce = new DelayedProduce(producerRequestKeys, request, statuses, @@ -255,7 +256,7 @@ class KafkaApis(val requestChannel: RequestChannel, produceRequest.emptyData() } } - + case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { def this(key: TopicAndPartition, throwable: Throwable) = this(key, -1L, -1L, Some(throwable)) @@ -762,41 +763,31 @@ class KafkaApis(val requestChannel: RequestChannel, class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, - initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus], + val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], val produce: ProducerRequest, delayMs: Long) extends DelayedRequest(keys, request, delayMs) with Logging { - /** - * Map of (topic, partition) -> partition status - * The values in this map don't need to be synchronized since updates to the - * values are effectively synchronized by the ProducerRequestPurgatory's - * update method - */ - private [kafka] val partitionStatus = keys.map(requestKey => { - val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition)) - // if there was an error in writing to the local replica's log, then don't - // wait for acks on this partition - val (acksPending, error, nextOffset) = - if (producerResponseStatus.error == ErrorMapping.NoError) { - // Timeout error state will be cleared when requiredAcks are received - (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.offset) - } - else (false, producerResponseStatus.error, producerResponseStatus.offset) + // first update the acks pending variable according to error code + partitionStatus foreach { case (topicAndPartition, delayedStatus) => + if (delayedStatus.status.error == ErrorMapping.NoError) { + // Timeout error state will be cleared when requiredAcks are received + delayedStatus.acksPending = true + delayedStatus.status.error = ErrorMapping.RequestTimedOutCode + } else { + delayedStatus.acksPending = false + } + + trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) + } - val initialStatus = PartitionStatus(acksPending, error, nextOffset) - trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus)) - (requestKey, initialStatus) - }).toMap def respond() { - val finalErrorsAndOffsets = initialErrorsAndOffsets.map( - status => { - val pstat = partitionStatus(new RequestKey(status._1)) - (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset)) - }) - - val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets) + val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) => + topicAndPartition -> delayedStatus.status + } + + val response = ProducerResponse(produce.correlationId, responseStatus) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) @@ -816,8 +807,7 @@ class KafkaApis(val requestChannel: RequestChannel, def isSatisfied(followerFetchRequestKey: RequestKey) = { val topic = followerFetchRequestKey.topic val partitionId = followerFetchRequestKey.partition - val key = RequestKey(topic, partitionId) - val fetchPartitionStatus = partitionStatus(key) + val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId)) trace("Checking producer request satisfaction for %s-%d, acksPending = %b" .format(topic, partitionId, fetchPartitionStatus.acksPending)) if (fetchPartitionStatus.acksPending) { @@ -830,10 +820,10 @@ class KafkaApis(val requestChannel: RequestChannel, } if (errorCode != ErrorMapping.NoError) { fetchPartitionStatus.acksPending = false - fetchPartitionStatus.error = errorCode + fetchPartitionStatus.status.error = errorCode } else if (hasEnough) { fetchPartitionStatus.acksPending = false - fetchPartitionStatus.error = ErrorMapping.NoError + fetchPartitionStatus.status.error = ErrorMapping.NoError } if (!fetchPartitionStatus.acksPending) { val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) @@ -846,20 +836,6 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied)) satisfied } - - case class PartitionStatus(var acksPending: Boolean, - var error: Short, - requiredOffset: Long) { - def setThisBrokerNotLeader() { - error = ErrorMapping.NotLeaderForPartitionCode - acksPending = false - } - - override def toString = - "acksPending:%b, error: %d, requiredOffset: %d".format( - acksPending, error, requiredOffset - ) - } } /** @@ -878,7 +854,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ protected def expire(delayedProduce: DelayedProduce) { for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending) - delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1) + delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(partitionStatus._1.topic, partitionStatus._1.partition)) delayedProduce.respond() } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 34baa8c..5648c6e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package kafka.test +package kafka.api.test import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ZkUtils, Utils, TestUtils, Logging} +import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message @@ -33,7 +32,6 @@ import org.junit.Assert._ import java.util.Properties import java.lang.{Integer, IllegalArgumentException} -import org.apache.log4j.Logger class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { // send a normal record val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes) - val response0 = producer.send(record0, callback) - assertEquals("Should have offset 0", 0L, response0.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null) - val response1 = producer.send(record1, callback) - assertEquals("Should have offset 1", 1L, response1.get.offset) + assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes) - val response2 = producer.send(record2, callback) - assertEquals("Should have offset 2", 2L, response2.get.offset) + assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response3 = producer.send(record3, callback) - assertEquals("Should have offset 3", 3L, response3.get.offset) + assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) // send a record with null topic should fail try { val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes) - val response4 = producer.send(record4, callback) - response4.wait + producer.send(record4, callback) + fail("Should not allow sending a record without topic") } catch { case iae: IllegalArgumentException => // this is ok case e: Throwable => fail("Only expecting IllegalArgumentException", e) @@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { producer.send(record0) // check that all messages have been acked via offset - val response5 = producer.send(record0, callback) - assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset) + assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) } finally { if (producer != null) { @@ -158,6 +151,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { * testClose checks the closing behavior * * 1. After close() returns, all messages should be sent with correct returned offset metadata + * 2. After close() returns, send() should throw an exception immediately */ @Test def testClose() { @@ -195,7 +189,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { /** * testSendToPartition checks the partitioning behavior * - * 1. The specified partition-id should be respected + * The specified partition-id should be respected */ @Test def testSendToPartition() { @@ -207,40 +201,40 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { try { // create topic val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val partition = 1 // make sure leaders exist - val leader1 = leaders.get(1) + val leader1 = leaders(partition) assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) - val partition = 1 val responses = - for (i <- 0 until numRecords) + for (i <- 1 to numRecords) yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList - futures.map(_.wait) + futures.map(_.get) for (future <- futures) assertTrue("Request should have completed", future.isDone) // make sure all of them end up in the same partition with increasing offset values for ((future, offset) <- futures zip (0 until numRecords)) { - assertEquals(offset, future.get.offset) + assertEquals(offset.toLong, future.get.offset) assertEquals(topic, future.get.topic) - assertEquals(1, future.get.partition) + assertEquals(partition, future.get.partition) } // make sure the fetched messages also respect the partitioning and ordering val fetchResponse1 = if(leader1.get == server1.config.brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) - }else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build()) + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) } - val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer + val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) // TODO: also check topic and partition after they are added in the return messageSet for (i <- 0 to numRecords - 1) { assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message) - assertEquals(i, messageSet1(i).offset) + assertEquals(i.toLong, messageSet1(i).offset) } } finally { if (producer != null) { @@ -250,6 +244,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { } } + /** + * testAutoCreateTopic + * + * The topic should be created upon sending the first message + */ @Test def testAutoCreateTopic() { val props = new Properties() @@ -259,8 +258,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { try { // Send a message to auto-create the topic val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response = producer.send(record) - assertEquals("Should have offset 0", 0L, response.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1c7a450..772d214 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,24 +23,27 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties -import junit.framework.AssertionFailedError -import junit.framework.Assert._ +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.TimeUnit + +import collection.mutable.Map +import collection.mutable.ListBuffer + +import org.I0Itec.zkclient.ZkClient + import kafka.server._ import kafka.producer._ import kafka.message._ -import org.I0Itec.zkclient.ZkClient +import kafka.api._ import kafka.cluster.Broker -import collection.mutable.ListBuffer import kafka.consumer.ConsumerConfig -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit -import kafka.api._ -import collection.mutable.Map import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition -import junit.framework.Assert import kafka.admin.AdminUtils +import kafka.producer.ProducerConfig +import junit.framework.AssertionFailedError +import junit.framework.Assert._ /** * Utility functions to help with testing @@ -526,7 +529,7 @@ object TestUtils extends Logging { } def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { - Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), + assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), TestUtils.waitUntilTrue(() => servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) }