diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 5bed607..b69866a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -65,7 +65,8 @@ public final class BufferPool { } /** - * Allocate a buffer of the given size + * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool + * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @return The buffer diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java index 368e8f3..678d1c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java @@ -20,11 +20,13 @@ import org.slf4j.LoggerFactory; public class ErrorLoggingCallback implements Callback { private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class); + private String topic; private byte[] key; private byte[] value; private boolean logAsString; - public ErrorLoggingCallback(byte[] key, byte[] value, boolean logAsString) { + public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logAsString) { + this.topic = topic; this.key = key; this.value = value; this.logAsString = logAsString; @@ -36,8 +38,8 @@ public class ErrorLoggingCallback implements Callback { logAsString ? new String(key) : key.length + " bytes"; String valueString = (value == null) ? "null" : logAsString ? new String(value) : value.length + " bytes"; - log.error("Error when sending message with key: " + keyString + ", value: " + valueString + - " with error " + e.getMessage()); + log.error("Error when sending message to topic {} with key: {}, value: {} with error: {}", + topic, keyString, valueString, e.getMessage()); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index db6e3a1..33d62a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -79,6 +79,7 @@ public final class Metadata { public synchronized Cluster fetch(String topic, long maxWaitMs) { List partitions = null; long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; do { partitions = cluster.partitionsFor(topic); if (partitions == null) { @@ -86,12 +87,13 @@ public final class Metadata { forceUpdate = true; try { log.trace("Requesting metadata update for topic {}.", topic); - wait(maxWaitMs); + wait(remainingWaitMs); } catch (InterruptedException e) { /* this is fine, just try again */ } - long ellapsed = System.currentTimeMillis() - begin; - if (ellapsed >= maxWaitMs) + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; } else { return cluster; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 7a03f38..673b296 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -143,9 +143,9 @@ public final class RecordAccumulator { log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size); synchronized (dq) { - RecordBatch first = dq.peekLast(); - if (first != null) { - FutureRecordMetadata future = first.tryAppend(key, value, compression, callback); + RecordBatch last = dq.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(key, value, compression, callback); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen // often... diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index c989e25..84a327e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -65,10 +65,14 @@ public class AbstractConfig { return (Integer) get(key); } - public Long getLong(String key) { + public long getLong(String key) { return (Long) get(key); } + public double getDouble(String key) { + return (Double) get(key); + } + @SuppressWarnings("unchecked") public List getList(String key) { return (List) get(key); diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 61257d1..67b349d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -181,7 +181,7 @@ public class ConfigDef { else if (value instanceof String) return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); else - throw new ConfigException(name, value, "Expected a comma seperated list."); + throw new ConfigException(name, value, "Expected a comma separated list."); case CLASS: if (value instanceof Class) return (Class) value; diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f83189d..9839632 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -96,7 +96,7 @@ public class Selector implements Selectable { * @param sendBufferSize The send buffer for the new connection * @param receiveBufferSize The receive buffer for the new connection * @throws IllegalStateException if there is already a connection for that id - * @throws UnresolvedAddressException if DNS resolution fails on the hostname + * @throws IOException if DNS resolution fails on the hostname or if the broker is down */ @Override public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { @@ -111,6 +111,9 @@ public class Selector implements Selectable { channel.connect(address); } catch (UnresolvedAddressException e) { channel.close(); + throw new IOException("Can't resolve address: " + address, e); + } catch (IOException e) { + channel.close(); throw e; } SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index dc03fd0..8cecba5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -150,7 +150,9 @@ public class Struct { } /** - * Create a struct for the schema of a container type (struct or array) + * Create a struct for the schema of a container type (struct or array). + * Note that for array type, this method assumes that the type is an array of schema and creates a struct + * of that schema. Arrays of other types can't be instantiated with this method. * * @param field The field to create an instance of * @return The struct diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 865996c..8ccad14 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -125,7 +125,7 @@ public class SelectorTest { /** * Sending a request to a node with a bad hostname should result in an exception during connect */ - @Test(expected = UnresolvedAddressException.class) + @Test(expected = IOException.class) public void testNoRouteToHost() throws Exception { selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE); } @@ -224,7 +224,7 @@ public class SelectorTest { } /* connect and wait for the connection to complete */ - private void blockingConnect(int node) throws IOException { + private void blockingConnect(int node) throws Exception { selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); while (!selector.connected().contains(node)) selector.poll(10000L, EMPTY); diff --git a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala index b66c8fc..a1e1279 100644 --- a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala +++ b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala @@ -20,8 +20,7 @@ package kafka.common /** * This exception is thrown by the leader elector in the controller when leader election fails for a partition since - * all the leader candidate replicas for a partition are offline; the set of candidates may or may not be limited - * to just the in sync replicas depending upon whether unclean leader election is allowed to occur. + * all the replicas for a partition are offline */ class NoReplicaOnlineException(message: String, cause: Throwable) extends RuntimeException(message, cause) { def this(message: String) = this(message, null) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7dc2718..5db24a7 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -21,12 +21,10 @@ import collection.Set import com.yammer.metrics.core.Gauge import java.lang.{IllegalStateException, Object} import java.util.concurrent.TimeUnit -import kafka.admin.AdminUtils import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ import kafka.cluster.Broker import kafka.common._ -import kafka.log.LogConfig import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ @@ -166,7 +164,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) var deleteTopicManager: TopicDeletionManager = null - val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) + val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) @@ -974,19 +972,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if (leaderAndIsr.isr.contains(replicaId)) { // if the replica to be removed from the ISR is also the leader, set the new leader value to -1 val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader - var newIsr = leaderAndIsr.isr.filter(b => b != replicaId) - - // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election - // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can - // eventually be restored as the leader. - if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { - info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition)) - newIsr = leaderAndIsr.isr - } - val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1, - newIsr, leaderAndIsr.zkVersion + 1) + leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( zkClient, diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index d3b25fa..fa29bbe 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -16,12 +16,9 @@ */ package kafka.controller -import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr -import kafka.log.LogConfig import kafka.utils.Logging import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.server.KafkaConfig trait PartitionLeaderSelector { @@ -40,14 +37,12 @@ trait PartitionLeaderSelector { * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest): * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live * isr as the new isr. - * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException. - * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. - * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException + * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. + * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException * Replicas to receive LeaderAndIsr request = live assigned replicas * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ -class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) - extends PartitionLeaderSelector with Logging { +class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { @@ -59,15 +54,6 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { case true => - // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration - // for unclean leader election. - if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { - throw new NoReplicaOnlineException(("No broker in ISR for partition " + - "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + - " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) - } - debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString(","))) liveAssignedReplicas.isEmpty match { @@ -91,7 +77,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) case None => - throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) + throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") } } } diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..18c86fe 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,23 +21,6 @@ import java.util.Properties import scala.collection._ import kafka.common._ -object Defaults { - val SegmentSize = 1024 * 1024 - val SegmentMs = Long.MaxValue - val FlushInterval = Long.MaxValue - val FlushMs = Long.MaxValue - val RetentionSize = Long.MaxValue - val RetentionMs = Long.MaxValue - val MaxMessageSize = Int.MaxValue - val MaxIndexSize = 1024 * 1024 - val IndexInterval = 4096 - val FileDeleteDelayMs = 60 * 1000L - val DeleteRetentionMs = 24 * 60 * 60 * 1000L - val MinCleanableDirtyRatio = 0.5 - val Compact = false - val UncleanLeaderElectionEnable = true -} - /** * Configuration settings for a log * @param segmentSize The soft maximum for the size of a segment file in the log @@ -52,23 +35,20 @@ object Defaults { * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned * @param compact Should old segments in this log be deleted or deduplicated? - * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property - * but included here for topic-specific configuration validation purposes */ -case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, - val segmentMs: Long = Defaults.SegmentMs, - val flushInterval: Long = Defaults.FlushInterval, - val flushMs: Long = Defaults.FlushMs, - val retentionSize: Long = Defaults.RetentionSize, - val retentionMs: Long = Defaults.RetentionMs, - val maxMessageSize: Int = Defaults.MaxMessageSize, - val maxIndexSize: Int = Defaults.MaxIndexSize, - val indexInterval: Int = Defaults.IndexInterval, - val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, - val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, - val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, - val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { +case class LogConfig(val segmentSize: Int = 1024*1024, + val segmentMs: Long = Long.MaxValue, + val flushInterval: Long = Long.MaxValue, + val flushMs: Long = Long.MaxValue, + val retentionSize: Long = Long.MaxValue, + val retentionMs: Long = Long.MaxValue, + val maxMessageSize: Int = Int.MaxValue, + val maxIndexSize: Int = 1024*1024, + val indexInterval: Int = 4096, + val fileDeleteDelayMs: Long = 60*1000, + val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L, + val minCleanableRatio: Double = 0.5, + val compact: Boolean = false) { def toProps: Properties = { val props = new Properties() @@ -86,7 +66,6 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") - props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) props } @@ -106,7 +85,6 @@ object LogConfig { val FileDeleteDelayMsProp = "file.delete.delay.ms" val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" - val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" val ConfigNames = Set(SegmentBytesProp, SegmentMsProp, @@ -120,31 +98,26 @@ object LogConfig { FileDeleteDelayMsProp, DeleteRetentionMsProp, MinCleanableDirtyRatioProp, - CleanupPolicyProp, - UncleanLeaderElectionEnableProp) + CleanupPolicyProp) /** * Parse the given properties instance into a LogConfig object */ def fromProps(props: Properties): LogConfig = { - new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt, - segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong, - maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt, - flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong, - flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong, - retentionSize = props.getProperty(RetentionBytesProp, Defaults.RetentionSize.toString).toLong, - retentionMs = props.getProperty(RententionMsProp, Defaults.RetentionMs.toString).toLong, - maxMessageSize = props.getProperty(MaxMessageBytesProp, Defaults.MaxMessageSize.toString).toInt, - indexInterval = props.getProperty(IndexIntervalBytesProp, Defaults.IndexInterval.toString).toInt, - fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString).toInt, - deleteRetentionMs = props.getProperty(DeleteRetentionMsProp, Defaults.DeleteRetentionMs.toString).toLong, - minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp, - Defaults.MinCleanableDirtyRatio.toString).toDouble, - compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") - .trim.toLowerCase != "delete", - uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt, + segmentMs = props.getProperty(SegmentMsProp).toLong, + maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt, + flushInterval = props.getProperty(FlushMessagesProp).toLong, + flushMs = props.getProperty(FlushMsProp).toLong, + retentionSize = props.getProperty(RetentionBytesProp).toLong, + retentionMs = props.getProperty(RententionMsProp).toLong, + maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt, + indexInterval = props.getProperty(IndexIntervalBytesProp).toInt, + fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, + deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong, + minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, + compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete") } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 08de0ef..d07796e 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -231,9 +231,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which the partition rebalance check is triggered by the controller */ val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300) - /* indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though - * doing so may result in data loss */ - val uncleanLeaderElectionEnable = props.getBoolean("unclean.leader.election.enable", true) /*********** Controlled shutdown configuration ***********/ diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 75ae1e1..73e605e 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,9 +17,7 @@ package kafka.server -import kafka.admin.AdminUtils import kafka.cluster.Broker -import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} @@ -83,21 +81,9 @@ class ReplicaFetcherThread(name:String, */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset) { - // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. - // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, - // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!LogConfig.fromProps(brokerConfig.props.props, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { - // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. - fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + - " Current leader %d's latest offset %d is less than replica %d's latest offset %d" - .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset)) - Runtime.getRuntime.halt(1) - } - replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) - warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset)) + warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d" + .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -108,8 +94,8 @@ class ReplicaFetcherThread(name:String, */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) - warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset)) + warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d" + .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala index 6f90549..a969a22 100644 --- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala @@ -170,11 +170,11 @@ object MirrorMaker extends Logging { trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId)) val producer = producers(producerId) producer.send(producerRecord, - new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) } else { val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size) producers(producerId).send(producerRecord, - new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false)) + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) trace("Sent message to producer " + producerId) } } diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index e86ee80..b585f0e 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.integration +package kafka.server import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness @@ -27,7 +27,6 @@ import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.api._ import kafka.admin.AdminUtils -import kafka.server.{KafkaConfig, KafkaServer} class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala deleted file mode 100644 index c5f2da9..0000000 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.integration - -import scala.collection.mutable.MutableList -import scala.util.Random -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite -import java.util.Properties -import junit.framework.Assert._ -import kafka.admin.AdminUtils -import kafka.common.FailedToSendMessageException -import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} -import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.{DefaultEncoder, StringEncoder} -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.Utils -import kafka.utils.TestUtils._ -import kafka.zk.ZooKeeperTestHarness - -class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { - val brokerId1 = 0 - val brokerId2 = 1 - - val port1 = choosePort() - val port2 = choosePort() - - // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to - // reduce test execution time - val enableControlledShutdown = true - val configProps1 = createBrokerConfig(brokerId1, port1) - val configProps2 = createBrokerConfig(brokerId2, port2) - - for (configProps <- List(configProps1, configProps2)) { - configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) - configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) - configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) - } - - var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig] - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - - val random = new Random() - val topic = "topic" + random.nextLong - val partitionId = 0 - - val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis]) - val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor]) - val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer]) - val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]]) - - override def setUp() { - super.setUp() - - // temporarily set loggers to a higher level so that tests run quietly - kafkaApisLogger.setLevel(Level.FATAL) - networkProcessorLogger.setLevel(Level.FATAL) - syncProducerLogger.setLevel(Level.FATAL) - eventHandlerLogger.setLevel(Level.FATAL) - } - - override def tearDown() { - servers.map(server => shutdownServer(server)) - servers.map(server => Utils.rm(server.config.logDirs)) - - // restore log levels - kafkaApisLogger.setLevel(Level.ERROR) - networkProcessorLogger.setLevel(Level.ERROR) - syncProducerLogger.setLevel(Level.ERROR) - eventHandlerLogger.setLevel(Level.ERROR) - - super.tearDown() - } - - private def startBrokers(cluster: Seq[Properties]) { - for (props <- cluster) { - val config = new KafkaConfig(props) - val server = createServer(config) - configs ++= List(config) - servers ++= List(server) - } - } - - def testUncleanLeaderElectionEnabled { - // unclean leader election is enabled by default - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) - - verifyUncleanLeaderElectionEnabled - } - - def testUncleanLeaderElectionDisabled { - // disable unclean leader election - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) - - verifyUncleanLeaderElectionDisabled - } - - def testUncleanLeaderElectionEnabledByTopicOverride { - // disable unclean leader election globally, but enable for our specific test topic - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled - val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", String.valueOf(true)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) - - verifyUncleanLeaderElectionEnabled - } - - def testCleanLeaderElectionDisabledByTopicOverride { - // enable unclean leader election globally, but disable for our specific test topic - configProps1.put("unclean.leader.election.enable", String.valueOf(true)) - configProps2.put("unclean.leader.election.enable", String.valueOf(true)) - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled - val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", String.valueOf(false)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) - - verifyUncleanLeaderElectionDisabled - } - - def testUncleanLeaderElectionInvalidTopicOverride { - startBrokers(Seq(configProps1)) - - // create topic with an invalid value for unclean leader election - val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", "invalid") - - intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps) - } - } - - def verifyUncleanLeaderElectionEnabled { - // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get - debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) - assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) - - // the non-leader broker is the follower - val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 - debug("Follower for " + topic + " is: %s".format(followerId)) - - produceMessage(topic, "first") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) - assertEquals(List("first"), consumeAllMessages(topic)) - - // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - - produceMessage(topic, "second") - assertEquals(List("first", "second"), consumeAllMessages(topic)) - - // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) - - // wait until new leader is (uncleanly) elected - val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) - assertTrue("New leader should get elected", newLeaderIdOpt.isDefined) - assertEquals(followerId, newLeaderIdOpt.get) - - produceMessage(topic, "third") - - // second message was lost due to unclean election - assertEquals(List("first", "third"), consumeAllMessages(topic)) - } - - def verifyUncleanLeaderElectionDisabled { - // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get - debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) - assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) - - // the non-leader broker is the follower - val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 - debug("Follower for " + topic + " is: %s".format(followerId)) - - produceMessage(topic, "first") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) - assertEquals(List("first"), consumeAllMessages(topic)) - - // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - - produceMessage(topic, "second") - assertEquals(List("first", "second"), consumeAllMessages(topic)) - - // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) - - // verify that unclean election to non-ISR follower does not occur - val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) - assertTrue("Leader should be defined", newLeaderIdOpt.isDefined) - assertEquals("No leader should be elected", -1, newLeaderIdOpt.get) - - // message production and consumption should both fail while leader is down - intercept[FailedToSendMessageException] { - produceMessage(topic, "third") - } - assertEquals(List.empty[String], consumeAllMessages(topic)) - - // restart leader temporarily to send a successfully replicated message - servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) - val newLeaderIdOpt2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(-1)) - assertTrue("Leader should be defined", newLeaderIdOpt2.isDefined) - assertEquals("Original leader should be reelected", leaderId, newLeaderIdOpt2.get) - produceMessage(topic, "third") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - - // verify clean leader transition to ISR follower - val newLeaderIdOpt3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) - assertTrue("Leader should be defined", newLeaderIdOpt3.isDefined) - assertEquals("New leader should be elected", followerId, newLeaderIdOpt3.get) - - // verify messages can be consumed from ISR follower that was just promoted to leader - assertEquals(List("first", "second", "third"), consumeAllMessages(topic)) - } - - private def shutdownServer(server: KafkaServer) = { - server.shutdown() - server.awaitShutdown() - } - - private def produceMessage(topic: String, message: String) = { - val props = new Properties() - props.put("request.required.acks", String.valueOf(-1)) - val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), new StringEncoder(), props) - producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) - producer.close() - } - - private def consumeAllMessages(topic: String) : List[String] = { - // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or - // resetting the ZK offset - val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) - val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) - val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head - - val messages = new MutableList[String] - val iter = messageStream.iterator - try { - while(iter.hasNext()) { - messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there - } - } catch { - case e: ConsumerTimeoutException => - debug("consumer timed out after receiving " + messages.length + " message(s).") - } finally { - consumerConnector.shutdown - } - messages.toList - } -} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 6f4809d..89c207a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -93,40 +93,5 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals(serverConfig.advertisedHostName, advertisedHostName) assertEquals(serverConfig.advertisedPort, advertisedPort) } - - @Test - def testUncleanLeaderElectionDefault() { - val props = TestUtils.createBrokerConfig(0, 8181) - val serverConfig = new KafkaConfig(props) - - assertEquals(serverConfig.uncleanLeaderElectionEnable, true) - } - - @Test - def testUncleanElectionDisabled() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("unclean.leader.election.enable", String.valueOf(false)) - val serverConfig = new KafkaConfig(props) - - assertEquals(serverConfig.uncleanLeaderElectionEnable, false) - } - - @Test - def testUncleanElectionEnabled() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("unclean.leader.election.enable", String.valueOf(true)) - val serverConfig = new KafkaConfig(props) - - assertEquals(serverConfig.uncleanLeaderElectionEnable, true) - } - - @Test - def testUncleanElectionInvalid() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("unclean.leader.election.enable", "invalid") - - intercept[IllegalArgumentException] { - new KafkaConfig(props) - } - } + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2054c25..772d214 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -312,8 +312,8 @@ object TestUtils extends Logging { */ def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder(), - keyEncoder: Encoder[K] = new DefaultEncoder(), - props: Properties = new Properties()): Producer[K, V] = { + keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { + val props = new Properties() props.put("metadata.broker.list", brokerList) props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000") diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index f12a45b..3df0d13 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -220,7 +220,7 @@ object ProducerPerformance extends Logging { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get() } else { this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes), - new ErrorLoggingCallback(null, bytes, if (config.seqIdMode) true else false)) + new ErrorLoggingCallback(topic, null, bytes, if (config.seqIdMode) true else false)) } }