Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(working copy)
@@ -45,6 +45,7 @@
   val brokerPort: Int = 9099
   var simpleConsumer: SimpleConsumer = null
   var time: Time = new MockTime()
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   @Before
   override def setUp() {
@@ -82,7 +83,7 @@
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 1, "1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
@@ -119,7 +120,7 @@
     val topic = topicPartition.split("-").head
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 1, "1")
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     var offsetChanged = false
@@ -141,7 +142,7 @@
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 3, 1, "1,1,1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
@@ -170,7 +171,7 @@
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 3, 1, "1,1,1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -39,6 +39,7 @@
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
   val scheduler = new KafkaScheduler(2)
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   override def setUp() {
     super.setUp()
@@ -55,7 +56,7 @@
     TestUtils.createBrokersInZk(zkClient, List(config.brokerId))
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, name, 3, 1, "0,0,0")
+    CreateTopicCommand.createTopic(zkClient, name, topicNameValidator, 3, 1, "0,0,0")
   }
 
   override def tearDown() {
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -25,7 +25,7 @@
 import kafka.integration.KafkaServerTestHarness
 import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
+import kafka.utils._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.api.TopicData
@@ -34,6 +34,7 @@
   private var messageBytes =  new Array[Byte](2);
   val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1).head))
   val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   @Test
   def testReachableServer() {
@@ -158,9 +159,9 @@
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
     // #2 - test that we get correct offsets when partition is owned by broker
-    CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
+    CreateTopicCommand.createTopic(zkClient, "topic1", topicNameValidator, 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500)
-    CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
+    CreateTopicCommand.createTopic(zkClient, "topic3", topicNameValidator, 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500)
 
     val response2 = producer.send(request)
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -43,6 +43,7 @@
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
   private val config1 = new KafkaConfig(props1) {
@@ -86,7 +87,7 @@
 
 
   def testUpdateBrokerPartitionInfo() {
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    CreateTopicCommand.createTopic(zkClient, "new-topic", topicNameValidator, 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
         zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
@@ -152,7 +153,7 @@
     val producerConfig2 = new ProducerConfig(props2)
 
     // create topic with 1 partition and await leadership
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    CreateTopicCommand.createTopic(zkClient, "new-topic", topicNameValidator, 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
         zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
@@ -209,7 +210,7 @@
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
 
     // create topic
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+    CreateTopicCommand.createTopic(zkClient, "new-topic", topicNameValidator, 4, 2, "0,0,0,0")
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
         zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
@@ -270,7 +271,7 @@
     val producer = new Producer[String, String](config)
 
     // create topics in ZK
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+    CreateTopicCommand.createTopic(zkClient, "new-topic", topicNameValidator, 4, 2, "0:1,0:1,0:1,0:1")
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
         zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -25,7 +25,7 @@
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
 import kafka.message.Message
-import kafka.utils.TestUtils
+import kafka.utils.{Topic, TopicNameValidator, TestUtils}
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
@@ -46,6 +46,7 @@
   }
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   override def setUp() {
     super.setUp
@@ -332,7 +333,7 @@
 
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
-    CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+    CreateTopicCommand.createTopic(zkClient, newTopic, topicNameValidator, 1, 1, config.brokerId.toString)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List(newTopic),
         zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
@@ -347,7 +348,7 @@
    */
   def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
     for( topic <- topics ) {
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
+      CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 1, brokerId.toString)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     }
   }
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -25,7 +25,7 @@
 import org.easymock.EasyMock
 import kafka.network._
 import kafka.cluster.Broker
-import kafka.utils.TestUtils
+import kafka.utils.{Topic, TopicNameValidator, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
 import kafka.common.ErrorMapping
@@ -36,6 +36,7 @@
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
   var brokers: Seq[Broker] = null
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   override def setUp() {
     super.setUp()
@@ -49,7 +50,7 @@
   def testTopicMetadataRequest {
     // create topic
     val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1)
 
     // create a topic metadata request
     val topicMetadataRequest = new TopicMetadataRequest(List(topic))
@@ -65,7 +66,7 @@
   def testBasicTopicMetadata {
     // create topic
     val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1)
     // set up leader for topic partition 0
     val leaderForPartitionMap = Map(
       0 -> configs.head.brokerId
@@ -112,7 +113,7 @@
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1, 255)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -29,7 +29,7 @@
 import kafka.integration.KafkaServerTestHarness
 import kafka.producer.{ProducerData, Producer}
 import kafka.utils.TestUtils._
-import kafka.utils.TestUtils
+import kafka.utils.{Topic, TopicNameValidator, TestUtils}
 import kafka.admin.CreateTopicCommand
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -40,6 +40,7 @@
     yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
@@ -55,7 +56,7 @@
 
   override def setUp() {
     super.setUp
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 1, configs.head.brokerId.toString)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
     fetcher.stopAllConnections()
Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(working copy)
@@ -41,6 +41,7 @@
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
     yield new KafkaConfig(props) {
@@ -324,7 +325,7 @@
     val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
 
     // create topic topic1 with 1 partition on broker 0
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 1, "0")
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
Index: core/src/test/scala/unit/kafka/utils/TopicTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TopicTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/utils/TopicTest.scala	(working copy)
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.InvalidTopicException
+import org.junit.Test
+
+class TopicTest {
+
+  @Test
+  def testInvalidTopicNames() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 3 to 8)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "funny"
+    }
+
+    val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        topicNameValidator.validate(invalidTopicNames(i))
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+  }
+}
Index: core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(working copy)
@@ -4,7 +4,7 @@
 import org.junit.Assert._
 import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{Topic, TopicNameValidator, Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.message.Message
 import kafka.producer.{ProducerConfig, ProducerData, Producer}
@@ -18,6 +18,7 @@
     override val replicaMinBytes = 20
   })
   val topic = "new-topic"
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
   val partitionId = 0
 
   val brokerId1 = 0
@@ -52,7 +53,7 @@
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -87,7 +88,7 @@
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -157,7 +158,7 @@
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
@@ -201,7 +202,7 @@
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
     var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(working copy)
@@ -27,10 +27,11 @@
 import kafka.utils.TestUtils._
 import kafka.admin.CreateTopicCommand
 import kafka.api.FetchRequestBuilder
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{Topic, TopicNameValidator, TestUtils, Utils}
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   @Test
   def testCleanShutdown() {
@@ -47,7 +48,7 @@
       server.startup()
 
       // create topic
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+      CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 1, "0")
 
       val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
 
Index: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala	(working copy)
@@ -84,7 +84,7 @@
     // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
     // don't provide replica or leader callbacks since they will not be tested here
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, 255)
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -157,7 +157,7 @@
     EasyMock.replay(replicaManager)
 
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, 255)
 
     /**
      * This fetch, coming from a replica, requests all data at offset "15".  Because the request is coming
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -22,7 +22,7 @@
 import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.utils._
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val brokerId1 = 0
@@ -34,6 +34,7 @@
   val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   override def setUp() {
     super.setUp()
@@ -55,7 +56,7 @@
     val partitionId = 0
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+    CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 2, "0:1")
 
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
Index: core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(working copy)
@@ -23,7 +23,7 @@
 import kafka.producer.ProducerData
 import kafka.serializer.StringEncoder
 import kafka.admin.CreateTopicCommand
-import kafka.utils.TestUtils
+import kafka.utils.{Topic, TopicNameValidator, TestUtils}
 import junit.framework.Assert._
 
 class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
@@ -32,6 +32,7 @@
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
   val topic2 = "bar"
+  val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
 
   override def setUp() {
     super.setUp()
@@ -50,7 +51,7 @@
 
     // create a topic and partition and await leadership
     for (topic <- List(topic1,topic2)) {
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+      CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, 1, 2, configs.map(c => c.brokerId).mkString(":"))
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     }
 
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1383199)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -21,7 +21,7 @@
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
+import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
 
 /**
  * Configuration settings for the kafka server
@@ -115,6 +115,9 @@
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
+  /* the maximum length of topic name*/
+  val maxTopicNameLength = props.getIntInRange("max.topic.name.length", Topic.maxNameLength, (1, Int.MaxValue))
+
   /**
    * Following properties are relevant to Kafka replication
    */
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1383199)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -94,7 +94,7 @@
     replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
 
     kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config.maxTopicNameLength)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1383199)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -23,7 +23,7 @@
 import kafka.common._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
@@ -39,12 +39,14 @@
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val zkClient: ZkClient,
-                brokerId: Int) extends Logging {
+                brokerId: Int,
+                maxTopicNameLength: Int) extends Logging {
 
   private val metricsGroup = brokerId.toString
   private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
   private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
+  private val topicNameValidator = new TopicNameValidator(maxTopicNameLength)
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
   this.logIdent = "[KafkaApi on Broker " + brokerId + "], "
@@ -419,7 +421,8 @@
             case ErrorMapping.UnknownTopicOrPartitionCode =>
               /* check if auto creation of topics is turned on */
               if(config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+                CreateTopicCommand.createTopic(zkClient, topic, topicNameValidator, config.numPartitions,
+                                               config.defaultReplicationFactor)
                 info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                              .format(topic, config.numPartitions, config.defaultReplicationFactor))
                 val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1383199)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -23,7 +23,7 @@
 import kafka.server.KafkaConfig
 import kafka.api.OffsetRequest
 import kafka.log.Log._
-import kafka.common.{KafkaException, InvalidTopicException, UnknownTopicOrPartitionException}
+import kafka.common.{KafkaException, UnknownTopicOrPartitionException}
 
 /**
  * The guy who creates and hands out logs
@@ -95,8 +95,6 @@
    * Create a log for the given topic and the given partition
    */
   private def createLog(topic: String, partition: Int): Log = {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("Topic name can't be emtpy")
     if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
       val error = "Wrong partition %d, valid partitions (0, %d)."
               .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
Index: core/src/main/scala/kafka/admin/CreateTopicCommand.scala
===================================================================
--- core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(revision 1383199)
+++ core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.admin
 
 import joptsimple.OptionParser
-import kafka.utils.{Logging, Utils, ZKStringSerializer, ZkUtils}
+import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import scala.collection.mutable
 
@@ -51,6 +51,11 @@
                                         "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
+    val maxTopicNameLenOpt = parser.accepts("max-name-len", "maximum length of the topic name")
+      .withRequiredArg
+      .describedAs("max topic name length")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(Topic.maxNameLength)
 
     val options = parser.parse(args : _*)
 
@@ -63,6 +68,7 @@
     }
 
     val topic = options.valueOf(topicOpt)
+    val maxTopicNameLength = options.valueOf(maxTopicNameLenOpt).intValue
     val zkConnect = options.valueOf(zkConnectOpt)
     val nPartitions = options.valueOf(nPartitionsOpt).intValue
     val replicationFactor = options.valueOf(replicationFactorOpt).intValue
@@ -70,7 +76,8 @@
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
+      val topicNameValidator = new TopicNameValidator(maxTopicNameLength)
+      createTopic(zkClient, topic, topicNameValidator, nPartitions, replicationFactor, replicaAssignmentStr)
       println("creation succeeded!")
     } catch {
       case e =>
@@ -82,7 +89,10 @@
     }
   }
 
-  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+  def createTopic(zkClient: ZkClient, topic: String,  topicNameValidator: TopicNameValidator, numPartitions: Int = 1,
+                  replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+    topicNameValidator.validate(topic)
+
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
 
     val partitionReplicaAssignment = if (replicaAssignmentStr == "")
Index: core/src/main/scala/kafka/utils/Topic.scala
===================================================================
--- core/src/main/scala/kafka/utils/Topic.scala	(revision 0)
+++ core/src/main/scala/kafka/utils/Topic.scala	(working copy)
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import util.matching.Regex
+
+object Topic {
+  val maxNameLength = 255
+  val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
+                     '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
+}
+
+class TopicNameValidator(maxLen: Int) {
+  // Regex checks for illegal chars and "." and ".." filenames
+  private val rgx = new Regex("(^\\.{1,2}$)|[" + Topic.illegalChars + "]")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > maxLen)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxLen + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
+      case None =>
+    }
+  }
+}
