From adc0fc849feaa50273131f6c74e15bb16f2ee65d Mon Sep 17 00:00:00 2001 From: Jon Natkins Date: Tue, 12 Aug 2014 16:42:09 -0700 Subject: [PATCH] KAFKA-1580 Reject producer requests to internal topics --- core/src/main/scala/kafka/server/KafkaApis.scala | 11 ++++++-- .../scala/unit/kafka/producer/ProducerTest.scala | 29 +++++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb94673..3b31a99 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -160,7 +160,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(produceRequest) + val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) @@ -236,11 +236,14 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Helper method for handling a parsed producer request */ - private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult] = { + private def appendToLocalLog(producerRequest: ProducerRequest, isOffsetCommit: Boolean): Iterable[ProduceResult] = { val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data trace("Append [%s] to local log ".format(partitionAndData.toString)) partitionAndData.map {case (topicAndPartition, messages) => try { + if (Topic.InternalTopics.contains(topicAndPartition.topic) && !isOffsetCommit) { + throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)) + } val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { case Some(partition) => @@ -268,6 +271,10 @@ class KafkaApis(val requestChannel: RequestChannel, fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) Runtime.getRuntime.halt(1) null + case ite: InvalidTopicException => + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, ite.getMessage)) + new ProduceResult(topicAndPartition, ite) case utpe: UnknownTopicOrPartitionException => warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index dd71d81..1d326d8 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -33,7 +33,7 @@ import kafka.api.FetchRequestBuilder import org.junit.Assert.assertTrue import org.junit.Assert.assertFalse import org.junit.Assert.assertEquals -import kafka.common.{ErrorMapping, FailedToSendMessageException} +import kafka.common.{InvalidTopicException, Topic, ErrorMapping, FailedToSendMessageException} import kafka.serializer.StringEncoder class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @@ -202,6 +202,33 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } } + @Test + def testCannotSendToInternalTopic() { + val props = new util.Properties() + props.put("request.required.acks", "2") + + val topic = Topic.InternalTopics.head + + val producer = TestUtils.createProducer[String, String]( + brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[StaticPartitioner].getName, + producerProps = props) + + try { + producer.send(new KeyedMessage[String, String](topic, "test", "test2")) + fail("Should not be able to send messages to an internal topic.") + } + catch { + case se: FailedToSendMessageException => + // this is expected + case e: Throwable => fail("Not expected", e) + } + finally { + producer.close() + } + } @Test def testSendWithDeadBroker() { -- 1.8.5.2 (Apple Git-48)