From 5fef324082d90fc8e523bb1ec71f793b839113d7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 19 Nov 2014 10:56:49 -0800 Subject: [PATCH] KAFKA-1580 v1 --- core/src/main/scala/kafka/admin/AdminUtils.scala | 21 ++++-- core/src/main/scala/kafka/server/KafkaApis.scala | 5 ++ .../main/scala/kafka/server/OffsetManager.scala | 13 ++-- .../main/scala/kafka/server/ReplicaManager.scala | 88 ++++++++++++---------- .../kafka/api/ProducerFailureHandlingTest.scala | 23 +++--- 5 files changed, 87 insertions(+), 63 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 94c5332..28b12c7 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -17,25 +17,30 @@ package kafka.admin -import java.util.Random -import java.util.Properties -import kafka.api.{TopicMetadata, PartitionMetadata} +import kafka.common._ import kafka.cluster.Broker import kafka.log.LogConfig import kafka.utils.{Logging, ZkUtils, Json} -import org.I0Itec.zkclient.ZkClient -import org.I0Itec.zkclient.exception.ZkNodeExistsException +import kafka.api.{TopicMetadata, PartitionMetadata} + +import java.util.Random +import java.util.Properties +import scala.Some +import scala.Predef._ import scala.collection._ import mutable.ListBuffer import scala.collection.mutable -import kafka.common._ -import scala.Predef._ import collection.Map -import scala.Some import collection.Set +import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.exception.ZkNodeExistsException + object AdminUtils extends Logging { val rand = new Random + + val AdminClientId = "__admin_client" + val TopicConfigChangeZnodePrefix = "config_change_" /** diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 968b0c4..2a1c032 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -188,10 +188,15 @@ class KafkaApis(val requestChannel: RequestChannel, } } + // only allow appending to internal topic partitions + // if the client is not from admin + val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId + // call the replica manager to append messages to the replicas replicaManager.appendMessages( produceRequest.ackTimeoutMs.toLong, produceRequest.requiredAcks, + internalTopicsAllowed, produceRequest.data, sendResponseCallback) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 2957bc4..3c79428 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -29,6 +29,7 @@ import kafka.message._ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import kafka.tools.MessageFormatter +import kafka.api.ProducerResponseStatus import scala.Some import scala.collection._ @@ -40,7 +41,6 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.ZkClient -import kafka.api.ProducerResponseStatus /** @@ -206,7 +206,7 @@ class OffsetManager(val config: OffsetManagerConfig, * Store offsets by appending it to the replicated log and then inserting to cache */ // TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future - def storeOffsets(groupName: String, + def storeOffsets(groupId: String, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], @@ -221,12 +221,12 @@ class OffsetManager(val config: OffsetManagerConfig, // construct the message set to append val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => new Message( - key = OffsetManager.offsetCommitKey(groupName, topicAndPartition.topic, topicAndPartition.partition), + key = OffsetManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition), bytes = OffsetManager.offsetCommitValue(offsetAndMetadata) ) }.toSeq - val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupName)) + val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) @@ -245,12 +245,12 @@ class OffsetManager(val config: OffsetManagerConfig, val responseCode = if (status.error == ErrorMapping.NoError) { filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => - putOffset(GroupTopicPartition(groupName, topicAndPartition), offsetAndMetadata) + putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata) } ErrorMapping.NoError } else { debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" - .format(filteredOffsetMetadata, groupName, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error))) + .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error))) // transform the log append error code to the corresponding the commit status error code if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) @@ -278,6 +278,7 @@ class OffsetManager(val config: OffsetManagerConfig, replicaManager.appendMessages( config.offsetCommitTimeoutMs.toLong, config.offsetCommitRequiredAcks, + true, // allow appending to internal offset topic offsetsAndMetadataMessageSet, putCacheCallback) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3007a6d..47b727e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -250,11 +250,12 @@ class ReplicaManager(val config: KafkaConfig, */ def appendMessages(timeout: Long, requiredAcks: Short, + internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(messagesPerPartition, requiredAcks) + val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) val produceStatus = localProduceResults.map{ case (topicAndPartition, result) => @@ -293,50 +294,59 @@ class ReplicaManager(val config: KafkaConfig, /** * Append the messages to the local replica logs */ - private def appendToLocalLog(messagesPerPartition: Map[TopicAndPartition, MessageSet], + private def appendToLocalLog(internalTopicsAllowed: Boolean, + messagesPerPartition: Map[TopicAndPartition, MessageSet], requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(messagesPerPartition)) messagesPerPartition.map { case (topicAndPartition, messages) => - try { - val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition) - val info = partitionOpt match { - case Some(partition) => - partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks) - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, localBrokerId)) - } + // reject appending to internal topics if it is not allowed + if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) { - val numAppendedMessages = - if (info.firstOffset == -1L || info.lastOffset == -1L) - 0 - else - info.lastOffset - info.firstOffset + 1 + (topicAndPartition, LogAppendResult( + LogAppendInfo.UnknownLogAppendInfo, + Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))))) + } else { + try { + val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition) + val info = partitionOpt match { + case Some(partition) => + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, localBrokerId)) + } + + val numAppendedMessages = + if (info.firstOffset == -1L || info.lastOffset == -1L) + 0 + else + info.lastOffset - info.firstOffset + 1 - // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + // update stats for successfully appended bytes and messages as bytesInRate and messageInRate + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) - trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) - (topicAndPartition, LogAppendResult(info)) - } catch { - // NOTE: Failed produce requests metric is not incremented for known exceptions - // it is supposed to indicate un-expected failures of a broker in handling a produce request - case e: KafkaStorageException => - fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) - Runtime.getRuntime.halt(1) - (topicAndPartition, null) - case utpe: UnknownTopicOrPartitionException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) - case nle: NotLeaderForPartitionException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) - case e: Throwable => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing append operation on partition %s".format(topicAndPartition), e) - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" + .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) + (topicAndPartition, LogAppendResult(info)) + } catch { + // NOTE: Failed produce requests metric is not incremented for known exceptions + // it is supposed to indicate un-expected failures of a broker in handling a produce request + case e: KafkaStorageException => + fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) + Runtime.getRuntime.halt(1) + (topicAndPartition, null) + case utpe: UnknownTopicOrPartitionException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) + case nle: NotLeaderForPartitionException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) + case e: Throwable => + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + error("Error processing append operation on partition %s".format(topicAndPartition), e) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + } } } } diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 8531f53..a913fe5 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -15,27 +15,27 @@ * limitations under the License. */ -package kafka.api +package kafka.api.test -import kafka.common.Topic -import org.apache.kafka.common.errors.{InvalidTopicException,NotEnoughReplicasException} -import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.{Properties, Random} import java.lang.Integer +import java.util.{Properties, Random} import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} +import kafka.api.FetchRequestBuilder +import kafka.common.Topic +import kafka.consumer.SimpleConsumer import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} import kafka.integration.KafkaServerTestHarness -import kafka.consumer.SimpleConsumer +import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException} import org.apache.kafka.clients.producer._ -class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness { +class ProducerFailureHandlingTest extends KafkaServerTestHarness { private val producerBufferSize = 30000 private val serverMessageMaxBytes = producerBufferSize/2 @@ -297,9 +297,12 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) } - @Test(expected = classOf[InvalidTopicException]) + @Test def testCannotSendToInternalTopic() { - producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get + val thrown = intercept[ExecutionException] { + producer2.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get + } + assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException]) } @Test -- 1.7.12.4