Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -101,6 +101,7 @@
     // topic metadata request only requires 1 call from the replica manager
     val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
+    EasyMock.expectLastCall().times(2)
     EasyMock.replay(replicaManager)
 
     // create a topic metadata request
Index: core/src/test/scala/unit/kafka/utils/TopicTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TopicTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/utils/TopicTest.scala	(working copy)
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.InvalidTopicException
+import org.junit.Test
+
+class TopicTest {
+
+  @Test
+  def testInvalidTopicNames() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "funny"
+    }
+
+    val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        topicNameValidator.validate(invalidTopicNames(i))
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+  }
+}
Index: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala	(revision 1383199)
+++ core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala	(working copy)
@@ -22,7 +22,7 @@
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.network.{BoundedByteBufferReceive, RequestChannel}
-import kafka.utils.{Time, TestUtils, MockTime}
+import kafka.utils.{Topic, Time, TestUtils, MockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
@@ -78,6 +78,7 @@
     partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L
 
     EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
@@ -151,6 +152,7 @@
     partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]
 
     EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO))
     EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId))
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1383199)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -21,7 +21,7 @@
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
+import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
 
 /**
  * Configuration settings for the kafka server
@@ -115,6 +115,9 @@
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
+  /* the maximum length of topic name*/
+  val maxTopicNameLength = props.getIntInRange("max.topic.name.length", Topic.maxNameLength, (1, Int.MaxValue))
+
   /**
    * Following properties are relevant to Kafka replication
    */
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1383199)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -23,7 +23,7 @@
 import kafka.common._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
@@ -45,6 +45,7 @@
   private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
   private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
+  private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength)
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
   this.logIdent = "[KafkaApi on Broker " + brokerId + "], "
@@ -419,7 +420,8 @@
             case ErrorMapping.UnknownTopicOrPartitionCode =>
               /* check if auto creation of topics is turned on */
               if(config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor,
+                                               topicNameValidator = topicNameValidator)
                 info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                              .format(topic, config.numPartitions, config.defaultReplicationFactor))
                 val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1383199)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -23,7 +23,7 @@
 import kafka.server.KafkaConfig
 import kafka.api.OffsetRequest
 import kafka.log.Log._
-import kafka.common.{KafkaException, InvalidTopicException, UnknownTopicOrPartitionException}
+import kafka.common.{KafkaException, UnknownTopicOrPartitionException}
 
 /**
  * The guy who creates and hands out logs
@@ -95,8 +95,6 @@
    * Create a log for the given topic and the given partition
    */
   private def createLog(topic: String, partition: Int): Log = {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("Topic name can't be emtpy")
     if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
       val error = "Wrong partition %d, valid partitions (0, %d)."
               .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
Index: core/src/main/scala/kafka/admin/CreateTopicCommand.scala
===================================================================
--- core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(revision 1383199)
+++ core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.admin
 
 import joptsimple.OptionParser
-import kafka.utils.{Logging, Utils, ZKStringSerializer, ZkUtils}
+import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import scala.collection.mutable
 
@@ -51,6 +51,11 @@
                                         "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
+    val maxTopicNameLenOpt = parser.accepts("max-name-len", "maximum length of the topic name")
+                           .withRequiredArg
+                           .describedAs("max topic name length")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(Topic.maxNameLength)
 
     val options = parser.parse(args : _*)
 
@@ -63,6 +68,7 @@
     }
 
     val topic = options.valueOf(topicOpt)
+    val maxTopicNameLength = options.valueOf(maxTopicNameLenOpt).intValue
     val zkConnect = options.valueOf(zkConnectOpt)
     val nPartitions = options.valueOf(nPartitionsOpt).intValue
     val replicationFactor = options.valueOf(replicationFactorOpt).intValue
@@ -70,7 +76,8 @@
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
+      val topicNameValidator = new TopicNameValidator(maxTopicNameLength)
+      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr, topicNameValidator)
       println("creation succeeded!")
     } catch {
       case e =>
@@ -82,7 +89,10 @@
     }
   }
 
-  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "",
+                  topicNameValidator: TopicNameValidator = new TopicNameValidator(Topic.maxNameLength)) {
+    topicNameValidator.validate(topic)
+
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
 
     val partitionReplicaAssignment = if (replicaAssignmentStr == "")
Index: core/src/main/scala/kafka/utils/Topic.scala
===================================================================
--- core/src/main/scala/kafka/utils/Topic.scala	(revision 0)
+++ core/src/main/scala/kafka/utils/Topic.scala	(working copy)
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import util.matching.Regex
+
+object Topic {
+  val maxNameLength = 255
+  val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
+                     '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
+}
+
+class TopicNameValidator(maxLen: Int) {
+  // Regex checks for illegal chars and "." and ".." filenames
+  private val rgx = new Regex("(^\\.{1,2}$)|[" + Topic.illegalChars + "]")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > maxLen)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxLen + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
+      case None =>
+    }
+  }
+}
