Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1381442)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -23,7 +23,8 @@
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, MockTime, TestUtils}
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{InvalidTopicException, OffsetOutOfRangeException}
+import collection.mutable.ArrayBuffer
 
 class LogManagerTest extends JUnitSuite {
 
@@ -71,6 +72,30 @@
   }
 
   @Test
+  def testInvalidTopicName() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 3 to 8)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "funny"
+    }
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        logManager.getOrCreateLog(invalidTopicNames(i), 0)
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+  }
+
+  @Test
   def testCleanupExpiredSegments() {
     val log = logManager.getOrCreateLog("cleanup", 0)
     var offset = 0L
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1381442)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -49,6 +49,7 @@
   private var zkActor: Actor = null
   private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
+  private val topicNameValidator = new TopicNameValidator(config)
   private val logFlushIntervalMap = config.flushIntervalMap
   private val logRetentionSizeMap = config.logRetentionSizeMap
   private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
@@ -157,14 +158,13 @@
       new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
     }
   }
-  
+
   /**
    * Return the Pool (partitions) for a specific log
    */
   private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = {
     awaitStartup
-    if (topic.length <= 0)
-      throw new InvalidTopicException("topic name can't be empty")
+    topicNameValidator.validate(topic)
     if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
       warn("Wrong partition " + partition + " valid partitions (0," +
               (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
Index: core/src/main/scala/kafka/utils/TopicNameValidator.scala
===================================================================
--- core/src/main/scala/kafka/utils/TopicNameValidator.scala	(revision 0)
+++ core/src/main/scala/kafka/utils/TopicNameValidator.scala	(working copy)
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**/
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import util.matching.Regex
+import kafka.server.KafkaConfig
+
+class TopicNameValidator(config: KafkaConfig) {
+  private val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
+                          '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
+  // Regex checks for illegal chars and "." and ".." filenames
+  private val rgx = new Regex("(^\\.{1,2}$)|[" + illegalChars + "]")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > config.maxTopicNameLen)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + config.maxTopicNameLen + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
+      case None =>
+    }
+  }
+}
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1381442)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -102,4 +102,7 @@
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+
+  /* the maximum length of topic name*/
+  val maxTopicNameLen = Utils.getIntInRange(props, "max.topic.len", 255, (1, Int.MaxValue))
 }
