From bd6915d1a9891ffdc998c242a751a5bffe405325 Mon Sep 17 00:00:00 2001 From: Jon Natkins Date: Tue, 12 Aug 2014 16:42:09 -0700 Subject: [PATCH] KAFKA-1580 Reject producer requests to internal topics --- .../kafka/common/errors/InvalidTopicException.java | 38 ++++++++++++++++++++++ .../org/apache/kafka/common/protocol/Errors.java | 16 +++------ .../src/main/scala/kafka/common/ErrorMapping.scala | 4 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 11 +++++-- .../kafka/api/ProducerFailureHandlingTest.scala | 12 +++++++ 5 files changed, 66 insertions(+), 15 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java new file mode 100644 index 0000000..1d90b59 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * The client has attempted to perform an operation on an invalid topic. + */ +public class InvalidTopicException extends ApiException { + + private static final long serialVersionUID = 1L; + + public InvalidTopicException() { + super(); + } + + public InvalidTopicException(String message, Throwable cause) { + super(message, cause); + } + + public InvalidTopicException(String message) { + super(message); + } + + public InvalidTopicException(Throwable cause) { + super(cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 3374bd9..d434f42 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -19,17 +19,7 @@ package org.apache.kafka.common.protocol; import java.util.HashMap; import java.util.Map; -import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.errors.CorruptRecordException; -import org.apache.kafka.common.errors.LeaderNotAvailableException; -import org.apache.kafka.common.errors.NetworkException; -import org.apache.kafka.common.errors.NotLeaderForPartitionException; -import org.apache.kafka.common.errors.OffsetMetadataTooLarge; -import org.apache.kafka.common.errors.OffsetOutOfRangeException; -import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.*; /** @@ -51,7 +41,9 @@ public enum Errors { // TODO: errorCode 8, 9, 11 MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), - NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")); + NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), + // TODO: errorCode 14, 15, 16 + INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 5559d26..3fae791 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -46,6 +46,7 @@ object ErrorMapping { val OffsetsLoadInProgressCode: Short = 14 val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 + val InvalidTopicCode : Short = 17 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -63,7 +64,8 @@ object ErrorMapping { classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode, classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode, classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode, - classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode + classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode, + classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode ).withDefaultValue(UnknownCode) /* invert the mapping */ diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb94673..3b31a99 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -160,7 +160,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(produceRequest) + val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) @@ -236,11 +236,14 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Helper method for handling a parsed producer request */ - private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult] = { + private def appendToLocalLog(producerRequest: ProducerRequest, isOffsetCommit: Boolean): Iterable[ProduceResult] = { val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data trace("Append [%s] to local log ".format(partitionAndData.toString)) partitionAndData.map {case (topicAndPartition, messages) => try { + if (Topic.InternalTopics.contains(topicAndPartition.topic) && !isOffsetCommit) { + throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)) + } val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { case Some(partition) => @@ -268,6 +271,10 @@ class KafkaApis(val requestChannel: RequestChannel, fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) Runtime.getRuntime.halt(1) null + case ite: InvalidTopicException => + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, ite.getMessage)) + new ProduceResult(topicAndPartition, ite) case utpe: UnknownTopicOrPartitionException => warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 789e74c..2f1182c 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -17,6 +17,8 @@ package kafka.api +import kafka.common.Topic +import org.apache.kafka.common.errors.InvalidTopicException import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ @@ -295,6 +297,16 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) } + @Test + def testCannotSendToInternalTopic() { + val producer = TestUtils.createNewProducer(brokerList) + + val thrown = intercept[ExecutionException] { + producer.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get + } + assert(thrown.getCause.isInstanceOf[InvalidTopicException]) + } + private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { val numRecords = 1000 -- 1.8.5.2 (Apple Git-48)