diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3d180e8..ccb5068 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1,14 +1,18 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.producer;
@@ -33,6 +37,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -45,6 +50,7 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
+
/**
* A Kafka client that publishes records to the Kafka cluster.
*
@@ -89,8 +95,7 @@ public class KafkaProducer implements Producer {
new SystemTime());
this.partitioner = new Partitioner();
this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
- this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG),
- config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG));
+ this.metadata = new Metadata();
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
@@ -215,9 +220,17 @@ public class KafkaProducer implements Producer {
this.sender.wakeup();
return future;
} catch (Exception e) {
- if (callback != null)
- callback.onCompletion(null, e);
- return new FutureFailure(e);
+ // For API exceptions return them in the future;
+ // for other exceptions throw directly
+ if (e instanceof ApiException) {
+ if (callback != null)
+ callback.onCompletion(null, e);
+ return new FutureFailure(e);
+ } else if (e instanceof KafkaException) {
+ throw (KafkaException) e;
+ } else {
+ throw new KafkaException(e);
+ }
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index dca9802..502af5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -1,14 +1,18 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.kafka.clients.producer;
@@ -21,6 +25,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
+
/**
* The producer configuration keys
*/
@@ -43,17 +48,6 @@ public class ProducerConfig extends AbstractConfig {
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
/**
- * The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly.
- */
- public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms";
-
- /**
- * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any
- * leadership changes.
- */
- public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms";
-
- /**
* The buffer size allocated for a partition. When records are received which are smaller than this size the
* producer will attempt to optimistically group them together until this size is reached.
*/
@@ -131,8 +125,6 @@ public class ProducerConfig extends AbstractConfig {
/* TODO: add docs */
config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah")
.define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah")
- .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), "blah blah")
- .define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), "blah blah")
.define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah")
.define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah")
/* TODO: should be a string to handle acks=in-sync */
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index 8c77698..e752997 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -32,6 +32,12 @@ public final class RecordMetadata {
this.topicPartition = topicPartition;
}
+ public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
+ // ignore the relativeOffset if the base offset is -1,
+ // since this indicates the offset is unknown
+ this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset);
+ }
+
/**
* The offset of the record in the topic/partition.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 22d4c79..f7a33fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -31,10 +31,12 @@ public final class FutureRecordMetadata implements Future {
private final ProduceRequestResult result;
private final long relativeOffset;
+ private RecordMetadata value;
public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
this.result = result;
this.relativeOffset = relativeOffset;
+ this.value = null;
}
@Override
@@ -57,10 +59,12 @@ public final class FutureRecordMetadata implements Future {
}
private RecordMetadata valueOrError() throws ExecutionException {
- if (this.result.error() != null)
+ if (this.result.error() != null) {
throw new ExecutionException(this.result.error());
- else
- return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
+ } else {
+ if (value == null) value = new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
+ return value;
+ }
}
public long relativeOffset() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 7a440a3..839e3bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -59,7 +59,7 @@ public final class RecordBatch {
this.records.append(0L, key, value, compression);
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
- thunks.add(new Thunk(callback, this.recordCount));
+ thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
@@ -78,7 +78,8 @@ public final class RecordBatch {
try {
Thunk thunk = this.thunks.get(i);
if (exception == null)
- thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
+ // get() should not throw any exception here
+ thunk.callback.onCompletion(thunk.future.get(),
null);
else
thunk.callback.onCompletion(null, exception);
@@ -89,15 +90,15 @@ public final class RecordBatch {
}
/**
- * A callback and the associated RecordSend argument to pass to it.
+ * A callback and the associated FutureRecordMetadata argument to pass to it.
*/
final private static class Thunk {
final Callback callback;
- final long relativeOffset;
+ final FutureRecordMetadata future;
- public Thunk(Callback callback, long relativeOffset) {
+ public Thunk(Callback callback, FutureRecordMetadata future) {
this.callback = callback;
- this.relativeOffset = relativeOffset;
+ this.future = future;
}
}
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
index ce95ca0..94099f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.errors;
-public class RecordTooLargeException extends ApiException {
+public class RecordTooLargeException extends FatalException {
private static final long serialVersionUID = 1L;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f88992a..3374bd9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
public enum Errors {
UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
NONE(0, null),
- OFFSET_OUT_OF_RANGE(1,
- new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
- CORRUPT_MESSAGE(2,
- new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+ OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+ CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
- LEADER_NOT_AVAILABLE(5,
- new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
+ // TODO: errorCode 4 for InvalidFetchSize
+ LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
- MESSAGE_TOO_LARGE(10,
- new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
+ // TODO: errorCode 8, 9, 11
+ MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 36cfc0f..f62ed9a 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -108,5 +108,4 @@ public class TestUtils {
b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
return b.toString();
}
-
}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 882b6da..1087a2e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,10 +28,8 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import org.apache.log4j.Logger
import kafka.message.ByteBufferMessageSet
-import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping}
+import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
import java.io.IOException
-import scala.Some
-import kafka.common.TopicAndPartition
/**
@@ -192,11 +190,10 @@ class Partition(val topic: String,
/**
* Make the local replica the follower by setting the new leader and ISR to empty
- * If the leader replica id does not change, return false to indicate the replica manager
*/
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
- correlationId: Int): Boolean = {
+ leaders: Set[Broker], correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -205,18 +202,23 @@ class Partition(val topic: String,
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
- // add replicas that are new
- allReplicas.foreach(r => getOrCreateReplica(r))
- // remove assigned replicas that have been removed by the controller
- (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
- inSyncReplicas = Set.empty[Replica]
- leaderEpoch = leaderAndIsr.leaderEpoch
- zkVersion = leaderAndIsr.zkVersion
-
- if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId)
- return false;
-
- leaderReplicaIdOpt = Some(newLeaderBrokerId)
+ // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
+ leaders.find(_.id == newLeaderBrokerId) match {
+ case Some(leaderBroker) =>
+ // add replicas that are new
+ allReplicas.foreach(r => getOrCreateReplica(r))
+ // remove assigned replicas that have been removed by the controller
+ (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
+ inSyncReplicas = Set.empty[Replica]
+ leaderEpoch = leaderAndIsr.leaderEpoch
+ zkVersion = leaderAndIsr.zkVersion
+ leaderReplicaIdOpt = Some(newLeaderBrokerId)
+ case None => // we should not come here
+ stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] new leader %d")
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+ topic, partitionId, newLeaderBrokerId))
+ }
true
}
}
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 37a4800..5e016d5 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -231,9 +231,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case Some(currLeaderIsrAndControllerEpoch) =>
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
- // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
- val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
+ // send the shrunk ISR state change request only to the leader
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
replicaState.put(partitionAndReplica, OfflineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3dd562c..21bba48 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,21 +16,21 @@
*/
package kafka.server
+import kafka.cluster.{Broker, Partition, Replica}
import collection._
import mutable.HashMap
-import kafka.cluster.{Broker, Partition, Replica}
+import org.I0Itec.zkclient.ZkClient
+import java.io.{File, IOException}
+import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils._
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
import kafka.controller.KafkaController
import org.apache.log4j.Logger
-import org.I0Itec.zkclient.ZkClient
-import com.yammer.metrics.core.Gauge
-import java.util.concurrent.atomic.AtomicBoolean
-import java.io.{IOException, File}
-import java.util.concurrent.TimeUnit
object ReplicaManager {
@@ -215,9 +215,9 @@ class ReplicaManager(val config: KafkaConfig,
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
- "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
- leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
+ stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." +
+ " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId,
+ leaderAndISRRequest.controllerEpoch, controllerEpoch))
}
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
@@ -236,17 +236,17 @@ class ReplicaManager(val config: KafkaConfig,
if(partitionStateInfo.allReplicas.contains(config.brokerId))
partitionState.put(partition, partitionStateInfo)
else {
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
- "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
- .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
- topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
+ stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " +
+ "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]")
+ .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
+ partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId))
}
} else {
// Otherwise record the error code in response
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
- "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
- .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
- topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch))
+ stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " +
+ "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d")
+ .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
+ partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch))
responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
}
}
@@ -345,11 +345,10 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
- partitionState.foreach { state =>
+ partitionState.foreach(state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")
- .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
- }
+ .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
@@ -359,63 +358,47 @@ class ReplicaManager(val config: KafkaConfig,
leaderPartitions --= partitionState.keySet
}
- var partitionsToMakeFollower: Set[Partition] = Set()
+ partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) =>
+ partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
- // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
- partitionState.foreach{ case (partition, partitionStateInfo) =>
- val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
- val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
- leaders.find(_.id == newLeaderBrokerId) match {
- case Some(leaderBroker) =>
- if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
- partitionsToMakeFollower += partition
- else
- stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
- "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader")
- .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
- partition.topic, partition.partitionId, newLeaderBrokerId))
- case None =>
- // The leader broker should always be present in the leaderAndIsrRequest.
- // If not, we should record the error message and abort the transition process for this partition
- stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
- "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available")
- .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
- partition.topic, partition.partitionId, newLeaderBrokerId))
- }
- }
-
- replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
- partitionsToMakeFollower.foreach { partition =>
+ replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
+ partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
"%d epoch %d with correlation id %d for partition %s")
- .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
+ .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
- logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap)
-
- partitionsToMakeFollower.foreach { partition =>
- stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
+ logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) =>
+ new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
+ })
+ partitionState.foreach { state =>
+ stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " +
"become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
- partition.topic, partition.partitionId, correlationId, controllerId, epoch))
+ TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch))
}
-
- if (isShuttingDown.get()) {
- partitionsToMakeFollower.foreach { partition =>
- stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
- "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId,
- controllerId, epoch, partition.topic, partition.partitionId))
+ if (!isShuttingDown.get()) {
+ val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]()
+ partitionState.foreach {
+ case (partition, partitionStateInfo) =>
+ val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+ leaders.find(_.id == leader) match {
+ case Some(leaderBroker) =>
+ partitionAndOffsets.put(new TopicAndPartition(partition),
+ BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset))
+ case None =>
+ stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " +
+ "controller %d epoch %d for partition %s since the designated leader %d " +
+ "cannot be found in live or shutting down brokers %s").format(localBrokerId,
+ correlationId, controllerId, epoch, partition, leader, leaders.mkString(",")))
+ }
}
+ replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
}
else {
- // we do not need to check if the leader exists again since this has been done at the beginning of this process
- val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
- new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap
- replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
-
- partitionsToMakeFollower.foreach { partition =>
- stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
- "%d epoch %d with correlation id %d for partition [%s,%d]")
- .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId))
+ partitionState.foreach { state =>
+ stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +
+ "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
+ controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
}
} catch {
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
new file mode 100644
index 0000000..b8a4c05
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Test
+import org.junit.Assert._
+
+import java.util.Properties
+import java.lang.Integer
+import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.consumer.SimpleConsumer
+
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.clients.producer._
+
+class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness {
+ private val brokerId1 = 0
+ private val brokerId2 = 1
+ private val ports = TestUtils.choosePorts(2)
+ private val (port1, port2) = (ports(0), ports(1))
+ private var server1: KafkaServer = null
+ private var server2: KafkaServer = null
+ private var servers = List.empty[KafkaServer]
+
+ private var producer1: KafkaProducer = null
+ private var producer2: KafkaProducer = null
+ private var producer3: KafkaProducer = null
+
+ private var consumer1: SimpleConsumer = null
+ private var consumer2: SimpleConsumer = null
+
+ private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ props1.put("auto.create.topics.enable", "false")
+ props2.put("auto.create.topics.enable", "false")
+ private val config1 = new KafkaConfig(props1)
+ private val config2 = new KafkaConfig(props2)
+
+ private val bufferSize = 2 * config1.messageMaxBytes
+
+ private val topic1 = "topic-1"
+ private val topic2 = "topic-2"
+ private val numRecords = 100
+
+ // TODO: create a CreateProducer function in TestUtil after it has been migrated to clients
+ // TODO: expose producer configs after creating them
+ private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))
+ private val producerProps1 = new Properties()
+ private val producerProps2 = new Properties()
+ private val producerProps3 = new Properties()
+ producerProps1.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
+ producerProps2.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
+ producerProps3.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
+ producerProps1.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString)
+ producerProps2.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString)
+ producerProps3.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString)
+ producerProps1.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, (bufferSize).toString)
+ producerProps2.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, (bufferSize).toString)
+ producerProps3.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, (bufferSize).toString)
+ producerProps1.put(ProducerConfig.BLOCK_ON_BUFFER_FULL, "false")
+ producerProps2.put(ProducerConfig.BLOCK_ON_BUFFER_FULL, "false")
+ producerProps3.put(ProducerConfig.BLOCK_ON_BUFFER_FULL, "false")
+ producerProps1.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "0")
+ producerProps2.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "1")
+ producerProps3.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1")
+
+ override def setUp() {
+ super.setUp()
+ server1 = TestUtils.createServer(config1)
+ server2 = TestUtils.createServer(config2)
+ servers = List(server1,server2)
+
+ producer1 = new KafkaProducer(producerProps1)
+ producer2 = new KafkaProducer(producerProps2)
+ producer3 = new KafkaProducer(producerProps3)
+
+ // TODO: we need to migrate to new consumers when 0.9 is final
+ consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
+ consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+ }
+
+ override def tearDown() {
+ server1.shutdown
+ server2.shutdown
+ Utils.rm(server1.config.logDirs)
+ Utils.rm(server2.config.logDirs)
+ producer1.close
+ producer2.close
+ producer3.close
+ consumer1.close
+ consumer2.close
+ super.tearDown()
+ }
+
+ /**
+ * testErrorInResponse
+ *
+ * 1. With ack == 0 the future metadata will have no exceptions with offset -1
+ * 2. With ack != 0 the future metadata will throw ExecutionException caused by RecordTooLargeException
+ *
+ * TODO: exceptions that can throw in ExecutionException:
+ * UnknownTopicOrPartitionException
+ * NotLeaderForPartitionException
+ * LeaderNotAvailableException
+ * CorruptRecordException
+ * TimeoutException
+ */
+ @Test
+ def testErrorInResponse() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // create a too-large record
+ val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1))
+
+ try {
+ assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
+ } catch {
+ case e: Throwable => fail("Returned metadata should not contain errors", e)
+ }
+
+ try {
+ producer2.send(record).get
+ fail("ExecutionException should be thrown on getting metadata")
+ } catch {
+ case ee: ExecutionException => // this is OK
+ case e: Throwable => fail("Returned metadata only expect ExecutionException", e)
+ }
+ }
+
+ /**
+ * testNoResponse
+ *
+ * 1. With ack=0, the future metadata should not be blocked.
+ * 2. With ack=1, the future metadata should block,
+ * and subsequent calls will eventually cause buffer full
+ */
+ @Test
+ def testNoResponse() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // first send a message to make sure the metadata is refreshed
+ val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+
+ try {
+ producer1.send(record).get
+ producer2.send(record).get
+ } catch {
+ case e: Throwable => fail("Should not throw any exceptions", e)
+ }
+
+ // stop IO threads and request handling, but leave networking operational
+ // any requests should be accepted and queue up, but not handled
+ server1.requestHandlerPool.shutdown()
+ server2.requestHandlerPool.shutdown()
+
+ try {
+ producer1.send(record).get(5000, TimeUnit.MILLISECONDS)
+ } catch {
+ case e: Throwable => fail("Should not throw any exceptions", e)
+ }
+
+ try {
+ producer2.send(record).get(5000, TimeUnit.MILLISECONDS)
+ fail("TimeoutException should be thrown")
+ } catch {
+ case ee: TimeoutException => // this is OK
+ case e: Throwable => fail("Only expect TimeoutException", e)
+ }
+
+ // send enough messages to get buffer full
+ val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length)
+
+ try {
+ for (i <- 1 to tooManyRecords)
+ producer2.send(record)
+ fail("An exception should be thrown")
+ } catch {
+ case ke: KafkaException => assertTrue("Only expect BufferExhaustedException", ke.isInstanceOf[BufferExhaustedException])
+ case e: Throwable => fail("Only expect BufferExhaustedException", e)
+ }
+ }
+
+ /**
+ * testSendException
+ *
+ * 1. The send call to not-exist-topic should throw KafkaException cased by TimeoutException
+ * 2. The send call with invalid partition id should throw KafkaException cased by IllegalArgumentException
+ * 3. The send call with too large message will throw BufferExhaustedException
+ * 4. The send call with incorrect broker-list should throw KafkaException cased by TimeoutException
+ * 5. The send call after producer closed should throw KafkaException cased by IllegalStateException
+ */
+ @Test
+ def testSendException() {
+ // do not create topic and send should fail
+ val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+
+ try {
+ producer1.send(record1)
+ fail("This message response should throw an exception")
+ } catch {
+ case ke: KafkaException => // this is OK
+ case e: Throwable => fail("Only expect KafkaException", e)
+ }
+
+ // create topic
+ TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+
+ // create a record with incorrect partition id, send should fail
+ val record2 = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes)
+
+ try {
+ producer1.send(record2)
+ fail("This message response should throw an exception")
+ } catch {
+ case ke: KafkaException => // this is OK
+ case e: Throwable => fail("Only expect KafkaException", e)
+ }
+
+ // create a very large record which will exhaust the buffer, send should fail
+ val record3 = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](bufferSize))
+
+ try {
+ producer1.send(record3)
+ fail("This message response should throw an exception")
+ } catch {
+ case ke: KafkaException => assertTrue("Only expect BufferExhaustedException", ke.isInstanceOf[BufferExhaustedException])
+ case e: Throwable => fail("Only expect BufferExhaustedException", e)
+ }
+
+ // create another produce with incorrect broker-list, its send call should throw an exception
+ val producerProps = new Properties()
+ producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, "localhost:8686,localhost:4242")
+ producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString)
+ val producer = new KafkaProducer(producerProps)
+
+ try {
+ producer.send(record1)
+ fail("This message response should throw an exception")
+ } catch {
+ case ke: KafkaException => // this is OK
+ case e: Throwable => fail("Only expect KafkaException", e)
+ }
+
+ // close the producer, its send call should throw an exception due to IllegalStateException
+ producer.close()
+
+ try {
+ producer.send(record1)
+ fail("Send after closed should throw an exception directly")
+ } catch {
+ case ke: KafkaException => {
+ val cause = ke.getCause
+ assertTrue("Only expecting IllegalStateException", cause != null)
+ assertTrue("Only expecting IllegalStateException", cause.isInstanceOf[IllegalStateException])
+ }
+ case e: Throwable => fail("Only expecting KafkaException", e)
+ }
+ }
+
+ /**
+ * testIncorrectBrokerList
+ *
+ * The send call should block
+ */
+ // TODO: this test needs to be modified with retry
+ @Test
+ def testDeadBroker() {
+ // create topic
+ val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+ val leader = leaders.get(0)
+ assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
+
+ val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+ val response1 = producer2.send(record1)
+
+ // shutdown broker
+ val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2
+ serverToShutdown.shutdown()
+ serverToShutdown.awaitShutdown()
+
+ assertEquals("yah", response1.get.offset, 0L)
+
+ // send to another topic to enforce metadata refresh
+ TestUtils.createTopic(zkClient, topic2, 1, 2, servers)
+ val record2 = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
+ assertEquals("yah", producer2.send(record2).get.offset, 0L)
+
+ // re-send to the first topic, this time it should succeed
+ assertEquals("yah", producer2.send(record1).get.offset, 0L)
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 34baa8c..9a8815e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -18,9 +18,8 @@
package kafka.test
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ZkUtils, Utils, TestUtils, Logging}
+import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.AdminUtils
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequestBuilder
import kafka.message.Message
@@ -33,7 +32,6 @@ import org.junit.Assert._
import java.util.Properties
import java.lang.{Integer, IllegalArgumentException}
-import org.apache.log4j.Logger
class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
// send a normal record
val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
- val response0 = producer.send(record0, callback)
- assertEquals("Should have offset 0", 0L, response0.get.offset)
+ assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
// send a record with null value should be ok
val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
- val response1 = producer.send(record1, callback)
- assertEquals("Should have offset 1", 1L, response1.get.offset)
+ assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
// send a record with null key should be ok
val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
- val response2 = producer.send(record2, callback)
- assertEquals("Should have offset 2", 2L, response2.get.offset)
+ assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
// send a record with null part id should be ok
val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
- val response3 = producer.send(record3, callback)
- assertEquals("Should have offset 3", 3L, response3.get.offset)
+ assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
// send a record with null topic should fail
try {
val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
- val response4 = producer.send(record4, callback)
- response4.wait
+ producer.send(record4, callback)
+ fail("Should not allow sending a record without topic")
} catch {
case iae: IllegalArgumentException => // this is ok
case e: Throwable => fail("Only expecting IllegalArgumentException", e)
@@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.send(record0)
// check that all messages have been acked via offset
- val response5 = producer.send(record0, callback)
- assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset)
+ assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset)
} finally {
if (producer != null) {
@@ -158,6 +151,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
* testClose checks the closing behavior
*
* 1. After close() returns, all messages should be sent with correct returned offset metadata
+ * 2. After close() returns, send() should throw an exception immediately
*/
@Test
def testClose() {
@@ -195,7 +189,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
/**
* testSendToPartition checks the partitioning behavior
*
- * 1. The specified partition-id should be respected
+ * The specified partition-id should be respected
*/
@Test
def testSendToPartition() {
@@ -217,7 +211,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
for (i <- 0 until numRecords)
yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
val futures = responses.toList
- futures.map(_.wait)
+ futures.map(_.get)
for (future <- futures)
assertTrue("Request should have completed", future.isDone)
@@ -250,6 +244,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
}
}
+ /**
+ * testAutoCreateTopic
+ *
+ * The topic should be created upon sending the first message
+ */
@Test
def testAutoCreateTopic() {
val props = new Properties()
@@ -259,8 +258,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
- val response = producer.send(record)
- assertEquals("Should have offset 0", 0L, response.get.offset)
+ assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
// double check that the topic is created with leader elected
assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1c7a450..772d214 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,24 +23,27 @@ import java.nio._
import java.nio.channels._
import java.util.Random
import java.util.Properties
-import junit.framework.AssertionFailedError
-import junit.framework.Assert._
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.TimeUnit
+
+import collection.mutable.Map
+import collection.mutable.ListBuffer
+
+import org.I0Itec.zkclient.ZkClient
+
import kafka.server._
import kafka.producer._
import kafka.message._
-import org.I0Itec.zkclient.ZkClient
+import kafka.api._
import kafka.cluster.Broker
-import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.TimeUnit
-import kafka.api._
-import collection.mutable.Map
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
-import junit.framework.Assert
import kafka.admin.AdminUtils
+import kafka.producer.ProducerConfig
+import junit.framework.AssertionFailedError
+import junit.framework.Assert._
/**
* Utility functions to help with testing
@@ -526,7 +529,7 @@ object TestUtils extends Logging {
}
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = {
- Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
+ assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
TestUtils.waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout))
}