diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 1135f5d..1dfc75c 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -38,7 +38,8 @@ class ConsumerFetcherThread(name: String, fetchSize = config.fetchMessageMaxBytes, fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.fetchWaitMaxMs, - minBytes = config.fetchMinBytes) { + minBytes = config.fetchMinBytes, + isInterruptible = true) { // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 1ccf578..a7d39b1 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -22,7 +22,6 @@ import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping} import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.message.MessageAndOffset -import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicLong @@ -30,14 +29,16 @@ import kafka.utils.{Pool, ShutdownableThread} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} +import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} /** * Abstract class for fetching data from multiple partitions from the same broker. */ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) - extends ShutdownableThread(name) { + fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, + isInterruptible: Boolean = true) + extends ShutdownableThread(name, isInterruptible) { private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() @@ -72,8 +73,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { partitionMapLock.lock() try { - while (partitionMap.isEmpty) - partitionMapCond.await() + if (partitionMap.isEmpty) + partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { case((topicAndPartition, offset)) => fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, @@ -84,6 +85,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } val fetchRequest = fetchRequestBuilder.build() + if (!fetchRequest.requestInfo.isEmpty) + processFetchRequest(fetchRequest) + } + + private def processFetchRequest(fetchRequest: FetchRequest) { val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index c03f758..37b71be 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -19,8 +19,8 @@ package kafka.server import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet -import kafka.common.{TopicAndPartition, ErrorMapping} -import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData} +import kafka.common.{KafkaStorageException, TopicAndPartition, ErrorMapping} class ReplicaFetcherThread(name:String, sourceBroker: Broker, @@ -34,26 +34,33 @@ class ReplicaFetcherThread(name:String, fetchSize = brokerConfig.replicaFetchMaxBytes, fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaFetchWaitMaxMs, - minBytes = brokerConfig.replicaFetchMinBytes) { + minBytes = brokerConfig.replicaFetchMinBytes, + isInterruptible = false) { // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { - val topic = topicAndPartition.topic - val partitionId = topicAndPartition.partition - val replica = replicaMgr.getReplica(topic, partitionId).get - val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + try { + val topic = topicAndPartition.topic + val partitionId = topicAndPartition.partition + val replica = replicaMgr.getReplica(topic, partitionId).get + val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - if (fetchOffset != replica.logEndOffset) - throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) - trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) - replica.log.get.append(messageSet, assignOffsets = false) - trace("Follower %d has replica log end offset %d after appending %d bytes of messages" - .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) - val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) - replica.highWatermark = followerHighWatermark - trace("Follower %d set replica highwatermark for topic %s partition %d to %d" - .format(replica.brokerId, topic, partitionId, followerHighWatermark)) + if (fetchOffset != replica.logEndOffset) + throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) + trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d" + .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) + replica.log.get.append(messageSet, assignOffsets = false) + trace("Follower %d has replica log end offset %d after appending %d bytes of messages" + .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) + val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) + replica.highWatermark = followerHighWatermark + trace("Follower %d set replica highwatermark for topic %s partition %d to %d" + .format(replica.brokerId, topic, partitionId, followerHighWatermark)) + } catch { + case e: KafkaStorageException => + fatal("Disk error while replicating data.", e) + Runtime.getRuntime.halt(1) + } } // handle a partition whose offset is out of range and return a new fetch offset diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 3aaf38e..afe9e22 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -85,7 +85,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge /* background thread expiring requests that have been waiting too long */ private val expiredRequestReaper = new ExpiredRequestReaper - private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper) + private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) expirationThread.start() /** @@ -241,7 +241,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge def shutdown() { debug("Shutting down.") running.set(false) - expirationThread.interrupt() shutdownLatch.await() debug("Shut down complete.") } diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index 4cca338..cf8adc9 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -20,7 +20,7 @@ package kafka.utils import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.CountDownLatch -abstract class ShutdownableThread(val name: String) +abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true) extends Thread(name) with Logging { this.setDaemon(false) this.logIdent = "[" + name + "], " @@ -31,7 +31,8 @@ abstract class ShutdownableThread(val name: String) def shutdown(): Unit = { info("Shutting down") isRunning.set(false) - interrupt() + if (isInterruptible) + interrupt() shutdownLatch.await() info("Shutdown completed") }