diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3d180e8..ccb5068 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1,14 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.clients.producer; @@ -33,6 +37,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; @@ -45,6 +50,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; + /** * A Kafka client that publishes records to the Kafka cluster. *

@@ -89,8 +95,7 @@ public class KafkaProducer implements Producer { new SystemTime()); this.partitioner = new Partitioner(); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG), - config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG)); + this.metadata = new Metadata(); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), @@ -215,9 +220,17 @@ public class KafkaProducer implements Producer { this.sender.wakeup(); return future; } catch (Exception e) { - if (callback != null) - callback.onCompletion(null, e); - return new FutureFailure(e); + // For API exceptions return them in the future; + // for other exceptions throw directly + if (e instanceof ApiException) { + if (callback != null) + callback.onCompletion(null, e); + return new FutureFailure(e); + } else if (e instanceof KafkaException) { + throw (KafkaException) e; + } else { + throw new KafkaException(e); + } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index dca9802..502af5c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -1,14 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.kafka.clients.producer; @@ -21,6 +25,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Type; + /** * The producer configuration keys */ @@ -43,17 +48,6 @@ public class ProducerConfig extends AbstractConfig { public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; /** - * The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly. - */ - public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms"; - - /** - * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any - * leadership changes. - */ - public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms"; - - /** * The buffer size allocated for a partition. When records are received which are smaller than this size the * producer will attempt to optimistically group them together until this size is reached. */ @@ -131,8 +125,6 @@ public class ProducerConfig extends AbstractConfig { /* TODO: add docs */ config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah") .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah") - .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), "blah blah") - .define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), "blah blah") .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah") .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah") /* TODO: should be a string to handle acks=in-sync */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java index 8c77698..e752997 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java @@ -32,6 +32,12 @@ public final class RecordMetadata { this.topicPartition = topicPartition; } + public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) { + // ignore the relativeOffset if the base offset is -1, + // since this indicates the offset is unknown + this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset); + } + /** * The offset of the record in the topic/partition. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 22d4c79..f7a33fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -31,10 +31,12 @@ public final class FutureRecordMetadata implements Future { private final ProduceRequestResult result; private final long relativeOffset; + private RecordMetadata value; public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) { this.result = result; this.relativeOffset = relativeOffset; + this.value = null; } @Override @@ -57,10 +59,12 @@ public final class FutureRecordMetadata implements Future { } private RecordMetadata valueOrError() throws ExecutionException { - if (this.result.error() != null) + if (this.result.error() != null) { throw new ExecutionException(this.result.error()); - else - return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset); + } else { + if (value == null) value = new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); + return value; + } } public long relativeOffset() { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 7a440a3..839e3bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -59,7 +59,7 @@ public final class RecordBatch { this.records.append(0L, key, value, compression); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) - thunks.add(new Thunk(callback, this.recordCount)); + thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } @@ -78,7 +78,8 @@ public final class RecordBatch { try { Thunk thunk = this.thunks.get(i); if (exception == null) - thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset), + // get() should not throw any exception here + thunk.callback.onCompletion(thunk.future.get(), null); else thunk.callback.onCompletion(null, exception); @@ -89,15 +90,15 @@ public final class RecordBatch { } /** - * A callback and the associated RecordSend argument to pass to it. + * A callback and the associated FutureRecordMetadata argument to pass to it. */ final private static class Thunk { final Callback callback; - final long relativeOffset; + final FutureRecordMetadata future; - public Thunk(Callback callback, long relativeOffset) { + public Thunk(Callback callback, FutureRecordMetadata future) { this.callback = callback; - this.relativeOffset = relativeOffset; + this.future = future; } } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java index ce95ca0..94099f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.errors; -public class RecordTooLargeException extends ApiException { +public class RecordTooLargeException extends FatalException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f88992a..3374bd9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -41,17 +41,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; public enum Errors { UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")), NONE(0, null), - OFFSET_OUT_OF_RANGE(1, - new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), - CORRUPT_MESSAGE(2, - new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), + OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), + CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), - LEADER_NOT_AVAILABLE(5, - new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), + // TODO: errorCode 4 for InvalidFetchSize + LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), - MESSAGE_TOO_LARGE(10, - new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), + // TODO: errorCode 8, 9, 11 + MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 36cfc0f..f62ed9a 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -108,5 +108,4 @@ public class TestUtils { b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length()))); return b.toString(); } - } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 882b6da..1087a2e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -28,10 +28,8 @@ import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping} +import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} import java.io.IOException -import scala.Some -import kafka.common.TopicAndPartition /** @@ -192,11 +190,10 @@ class Partition(val topic: String, /** * Make the local replica the follower by setting the new leader and ISR to empty - * If the leader replica id does not change, return false to indicate the replica manager */ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, - correlationId: Int): Boolean = { + leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -205,18 +202,23 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // add replicas that are new - allReplicas.foreach(r => getOrCreateReplica(r)) - // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) - inSyncReplicas = Set.empty[Replica] - leaderEpoch = leaderAndIsr.leaderEpoch - zkVersion = leaderAndIsr.zkVersion - - if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) - return false; - - leaderReplicaIdOpt = Some(newLeaderBrokerId) + // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 + leaders.find(_.id == newLeaderBrokerId) match { + case Some(leaderBroker) => + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) + inSyncReplicas = Set.empty[Replica] + leaderEpoch = leaderAndIsr.leaderEpoch + zkVersion = leaderAndIsr.zkVersion + leaderReplicaIdOpt = Some(newLeaderBrokerId) + case None => // we should not come here + stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] new leader %d") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, + topic, partitionId, newLeaderBrokerId)) + } true } } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 37a4800..5e016d5 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -231,9 +231,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case Some(currLeaderIsrAndControllerEpoch) => controller.removeReplicaFromIsr(topic, partition, replicaId) match { case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk ISR state change request to all the remaining alive replicas of the partition. - val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), + // send the shrunk ISR state change request only to the leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) replicaState.put(partitionAndReplica, OfflineReplica) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3dd562c..21bba48 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,21 +16,21 @@ */ package kafka.server +import kafka.cluster.{Broker, Partition, Replica} import collection._ import mutable.HashMap -import kafka.cluster.{Broker, Partition, Replica} +import org.I0Itec.zkclient.ZkClient +import java.io.{File, IOException} +import java.util.concurrent.atomic.AtomicBoolean import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import java.util.concurrent.TimeUnit import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController import org.apache.log4j.Logger -import org.I0Itec.zkclient.ZkClient -import com.yammer.metrics.core.Gauge -import java.util.concurrent.atomic.AtomicBoolean -import java.io.{IOException, File} -import java.util.concurrent.TimeUnit object ReplicaManager { @@ -215,9 +215,9 @@ class ReplicaManager(val config: KafkaConfig, val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " + - "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) + stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + + " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, + leaderAndISRRequest.controllerEpoch, controllerEpoch)) } (responseMap, ErrorMapping.StaleControllerEpochCode) } else { @@ -236,17 +236,17 @@ class ReplicaManager(val config: KafkaConfig, if(partitionStateInfo.allReplicas.contains(config.brokerId)) partitionState.put(partition, partitionStateInfo) else { - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + - "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") - .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, - topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(","))) + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + + "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") + .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, + partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) } } else { // Otherwise record the error code in response - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + - "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d") - .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, - topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch)) + stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + + "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") + .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, + partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } } @@ -345,11 +345,10 @@ class ReplicaManager(val config: KafkaConfig, */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { - partitionState.foreach { state => + partitionState.foreach(state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) @@ -359,63 +358,47 @@ class ReplicaManager(val config: KafkaConfig, leaderPartitions --= partitionState.keySet } - var partitionsToMakeFollower: Set[Partition] = Set() + partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => + partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} - // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 - partitionState.foreach{ case (partition, partitionStateInfo) => - val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch - val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader - leaders.find(_.id == newLeaderBrokerId) match { - case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) - partitionsToMakeFollower += partition - else - stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - partition.topic, partition.partitionId, newLeaderBrokerId)) - case None => - // The leader broker should always be present in the leaderAndIsrRequest. - // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - partition.topic, partition.partitionId, newLeaderBrokerId)) - } - } - - replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) - partitionsToMakeFollower.foreach { partition => + replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) + partitionState.foreach { state => stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) } - logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap) - - partitionsToMakeFollower.foreach { partition => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + + logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => + new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark + }) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - partition.topic, partition.partitionId, correlationId, controllerId, epoch)) + TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) } - - if (isShuttingDown.get()) { - partitionsToMakeFollower.foreach { partition => - stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, partition.topic, partition.partitionId)) + if (!isShuttingDown.get()) { + val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() + partitionState.foreach { + case (partition, partitionStateInfo) => + val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + leaders.find(_.id == leader) match { + case Some(leaderBroker) => + partitionAndOffsets.put(new TopicAndPartition(partition), + BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) + case None => + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + + "controller %d epoch %d for partition %s since the designated leader %d " + + "cannot be found in live or shutting down brokers %s").format(localBrokerId, + correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) + } } + replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { - // we do not need to check if the leader exists again since this has been done at the beginning of this process - val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap - replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) - - partitionsToMakeFollower.foreach { partition => - stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " + - "%d epoch %d with correlation id %d for partition [%s,%d]") - .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId)) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, + controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) } } } catch { diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala new file mode 100644 index 0000000..b8a4c05 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import org.junit.Assert._ + +import java.util.Properties +import java.lang.Integer +import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} + +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import kafka.consumer.SimpleConsumer + +import org.apache.kafka.common.KafkaException +import org.apache.kafka.clients.producer._ + +class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] + + private var producer1: KafkaProducer = null + private var producer2: KafkaProducer = null + private var producer3: KafkaProducer = null + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) + props1.put("auto.create.topics.enable", "false") + props2.put("auto.create.topics.enable", "false") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + + private val bufferSize = 2 * config1.messageMaxBytes + + private val topic1 = "topic-1" + private val topic2 = "topic-2" + private val numRecords = 100 + + // TODO: create a CreateProducer function in TestUtil after it has been migrated to clients + // TODO: expose producer configs after creating them + private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)) + private val producerProps1 = new Properties() + private val producerProps2 = new Properties() + private val producerProps3 = new Properties() + producerProps1.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps2.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps3.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps1.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + producerProps2.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + producerProps3.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + producerProps1.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, (bufferSize).toString) + producerProps2.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, (bufferSize).toString) + producerProps3.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, (bufferSize).toString) + producerProps1.put(ProducerConfig.BLOCK_ON_BUFFER_FULL, "false") + producerProps2.put(ProducerConfig.BLOCK_ON_BUFFER_FULL, "false") + producerProps3.put(ProducerConfig.BLOCK_ON_BUFFER_FULL, "false") + producerProps1.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "0") + producerProps2.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "1") + producerProps3.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1") + + override def setUp() { + super.setUp() + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) + + producer1 = new KafkaProducer(producerProps1) + producer2 = new KafkaProducer(producerProps2) + producer3 = new KafkaProducer(producerProps3) + + // TODO: we need to migrate to new consumers when 0.9 is final + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") + } + + override def tearDown() { + server1.shutdown + server2.shutdown + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + producer1.close + producer2.close + producer3.close + consumer1.close + consumer2.close + super.tearDown() + } + + /** + * testErrorInResponse + * + * 1. With ack == 0 the future metadata will have no exceptions with offset -1 + * 2. With ack != 0 the future metadata will throw ExecutionException caused by RecordTooLargeException + * + * TODO: exceptions that can throw in ExecutionException: + * UnknownTopicOrPartitionException + * NotLeaderForPartitionException + * LeaderNotAvailableException + * CorruptRecordException + * TimeoutException + */ + @Test + def testErrorInResponse() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // create a too-large record + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) + + try { + assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) + } catch { + case e: Throwable => fail("Returned metadata should not contain errors", e) + } + + try { + producer2.send(record).get + fail("ExecutionException should be thrown on getting metadata") + } catch { + case ee: ExecutionException => // this is OK + case e: Throwable => fail("Returned metadata only expect ExecutionException", e) + } + } + + /** + * testNoResponse + * + * 1. With ack=0, the future metadata should not be blocked. + * 2. With ack=1, the future metadata should block, + * and subsequent calls will eventually cause buffer full + */ + @Test + def testNoResponse() { + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // first send a message to make sure the metadata is refreshed + val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + + try { + producer1.send(record).get + producer2.send(record).get + } catch { + case e: Throwable => fail("Should not throw any exceptions", e) + } + + // stop IO threads and request handling, but leave networking operational + // any requests should be accepted and queue up, but not handled + server1.requestHandlerPool.shutdown() + server2.requestHandlerPool.shutdown() + + try { + producer1.send(record).get(5000, TimeUnit.MILLISECONDS) + } catch { + case e: Throwable => fail("Should not throw any exceptions", e) + } + + try { + producer2.send(record).get(5000, TimeUnit.MILLISECONDS) + fail("TimeoutException should be thrown") + } catch { + case ee: TimeoutException => // this is OK + case e: Throwable => fail("Only expect TimeoutException", e) + } + + // send enough messages to get buffer full + val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length) + + try { + for (i <- 1 to tooManyRecords) + producer2.send(record) + fail("An exception should be thrown") + } catch { + case ke: KafkaException => assertTrue("Only expect BufferExhaustedException", ke.isInstanceOf[BufferExhaustedException]) + case e: Throwable => fail("Only expect BufferExhaustedException", e) + } + } + + /** + * testSendException + * + * 1. The send call to not-exist-topic should throw KafkaException cased by TimeoutException + * 2. The send call with invalid partition id should throw KafkaException cased by IllegalArgumentException + * 3. The send call with too large message will throw BufferExhaustedException + * 4. The send call with incorrect broker-list should throw KafkaException cased by TimeoutException + * 5. The send call after producer closed should throw KafkaException cased by IllegalStateException + */ + @Test + def testSendException() { + // do not create topic and send should fail + val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + + try { + producer1.send(record1) + fail("This message response should throw an exception") + } catch { + case ke: KafkaException => // this is OK + case e: Throwable => fail("Only expect KafkaException", e) + } + + // create topic + TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + + // create a record with incorrect partition id, send should fail + val record2 = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes) + + try { + producer1.send(record2) + fail("This message response should throw an exception") + } catch { + case ke: KafkaException => // this is OK + case e: Throwable => fail("Only expect KafkaException", e) + } + + // create a very large record which will exhaust the buffer, send should fail + val record3 = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](bufferSize)) + + try { + producer1.send(record3) + fail("This message response should throw an exception") + } catch { + case ke: KafkaException => assertTrue("Only expect BufferExhaustedException", ke.isInstanceOf[BufferExhaustedException]) + case e: Throwable => fail("Only expect BufferExhaustedException", e) + } + + // create another produce with incorrect broker-list, its send call should throw an exception + val producerProps = new Properties() + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, "localhost:8686,localhost:4242") + producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, (3000).toString) + val producer = new KafkaProducer(producerProps) + + try { + producer.send(record1) + fail("This message response should throw an exception") + } catch { + case ke: KafkaException => // this is OK + case e: Throwable => fail("Only expect KafkaException", e) + } + + // close the producer, its send call should throw an exception due to IllegalStateException + producer.close() + + try { + producer.send(record1) + fail("Send after closed should throw an exception directly") + } catch { + case ke: KafkaException => { + val cause = ke.getCause + assertTrue("Only expecting IllegalStateException", cause != null) + assertTrue("Only expecting IllegalStateException", cause.isInstanceOf[IllegalStateException]) + } + case e: Throwable => fail("Only expecting KafkaException", e) + } + } + + /** + * testIncorrectBrokerList + * + * The send call should block + */ + // TODO: this test needs to be modified with retry + @Test + def testDeadBroker() { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers) + val leader = leaders.get(0) + assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined) + + val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val response1 = producer2.send(record1) + + // shutdown broker + val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2 + serverToShutdown.shutdown() + serverToShutdown.awaitShutdown() + + assertEquals("yah", response1.get.offset, 0L) + + // send to another topic to enforce metadata refresh + TestUtils.createTopic(zkClient, topic2, 1, 2, servers) + val record2 = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) + assertEquals("yah", producer2.send(record2).get.offset, 0L) + + // re-send to the first topic, this time it should succeed + assertEquals("yah", producer2.send(record1).get.offset, 0L) + } +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 34baa8c..9a8815e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -18,9 +18,8 @@ package kafka.test import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ZkUtils, Utils, TestUtils, Logging} +import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message @@ -33,7 +32,6 @@ import org.junit.Assert._ import java.util.Properties import java.lang.{Integer, IllegalArgumentException} -import org.apache.log4j.Logger class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -110,29 +108,25 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { // send a normal record val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes) - val response0 = producer.send(record0, callback) - assertEquals("Should have offset 0", 0L, response0.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null) - val response1 = producer.send(record1, callback) - assertEquals("Should have offset 1", 1L, response1.get.offset) + assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes) - val response2 = producer.send(record2, callback) - assertEquals("Should have offset 2", 2L, response2.get.offset) + assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response3 = producer.send(record3, callback) - assertEquals("Should have offset 3", 3L, response3.get.offset) + assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) // send a record with null topic should fail try { val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes) - val response4 = producer.send(record4, callback) - response4.wait + producer.send(record4, callback) + fail("Should not allow sending a record without topic") } catch { case iae: IllegalArgumentException => // this is ok case e: Throwable => fail("Only expecting IllegalArgumentException", e) @@ -143,8 +137,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { producer.send(record0) // check that all messages have been acked via offset - val response5 = producer.send(record0, callback) - assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset) + assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) } finally { if (producer != null) { @@ -158,6 +151,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { * testClose checks the closing behavior * * 1. After close() returns, all messages should be sent with correct returned offset metadata + * 2. After close() returns, send() should throw an exception immediately */ @Test def testClose() { @@ -195,7 +189,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { /** * testSendToPartition checks the partitioning behavior * - * 1. The specified partition-id should be respected + * The specified partition-id should be respected */ @Test def testSendToPartition() { @@ -217,7 +211,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { for (i <- 0 until numRecords) yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList - futures.map(_.wait) + futures.map(_.get) for (future <- futures) assertTrue("Request should have completed", future.isDone) @@ -250,6 +244,11 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { } } + /** + * testAutoCreateTopic + * + * The topic should be created upon sending the first message + */ @Test def testAutoCreateTopic() { val props = new Properties() @@ -259,8 +258,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { try { // Send a message to auto-create the topic val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - val response = producer.send(record) - assertEquals("Should have offset 0", 0L, response.get.offset) + assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1c7a450..772d214 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,24 +23,27 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties -import junit.framework.AssertionFailedError -import junit.framework.Assert._ +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.TimeUnit + +import collection.mutable.Map +import collection.mutable.ListBuffer + +import org.I0Itec.zkclient.ZkClient + import kafka.server._ import kafka.producer._ import kafka.message._ -import org.I0Itec.zkclient.ZkClient +import kafka.api._ import kafka.cluster.Broker -import collection.mutable.ListBuffer import kafka.consumer.ConsumerConfig -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit -import kafka.api._ -import collection.mutable.Map import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition -import junit.framework.Assert import kafka.admin.AdminUtils +import kafka.producer.ProducerConfig +import junit.framework.AssertionFailedError +import junit.framework.Assert._ /** * Utility functions to help with testing @@ -526,7 +529,7 @@ object TestUtils extends Logging { } def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { - Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), + assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), TestUtils.waitUntilTrue(() => servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic, partition))), timeout)) }