Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1380747)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -23,7 +23,7 @@
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, MockTime, TestUtils}
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{InvalidTopicException, OffsetOutOfRangeException}
 
 class LogManagerTest extends JUnitSuite {
 
@@ -71,6 +71,38 @@
   }
 
   @Test
+  def testInvalidTopicName() {
+    try {
+      logManager.getOrCreateLog("", 0)
+      fail("Should throw InvalidTopicException.")
+    }
+    catch {
+      case e: InvalidTopicException => "This is good."
+    }
+
+    try {
+      var longName = "ATCG"
+      for (i <- 3 to 8)
+        longName += longName
+      logManager.getOrCreateLog(longName, 0)
+      fail("Should throw InvalidTopicException.")
+    }
+    catch {
+      case e: InvalidTopicException => "This is good."
+    }
+
+    for (weirdChar <- config.blacklistedChars) {
+      try {
+        logManager.getOrCreateLog("Always" + weirdChar + "Never", 0)
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+  }
+
+  @Test
   def testCleanupExpiredSegments() {
     val log = logManager.getOrCreateLog("cleanup", 0)
     var offset = 0L
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1380747)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -157,7 +157,7 @@
       new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
     }
   }
-  
+
   /**
    * Return the Pool (partitions) for a specific log
    */
@@ -165,6 +165,12 @@
     awaitStartup
     if (topic.length <= 0)
       throw new InvalidTopicException("topic name can't be empty")
+    else if (topic.length > config.maxTopicNameLen)
+      throw new InvalidTopicException("topic name can't be longer than " + config.maxTopicNameLen + " characters")
+    for (weirdChar <- config.blacklistedChars) {
+       if (topic.contains(weirdChar))
+         throw new InvalidTopicException("topic name " + topic + " cannot contain " + weirdChar)
+    }
     if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
       warn("Wrong partition " + partition + " valid partitions (0," +
               (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1380747)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -102,4 +102,11 @@
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+
+  /* characters blacklisted for topic names, these exclude all alphanumeric characters, '_', '-', '[', '{', ']', '}', '(', ')'
+   * from the American English keyboard. */
+  val blacklistedChars = Utils.getString(props, "blacklisted.chars", "`~!@#$%^&*+=\\|;:'\",<.>/? \t\r\n")
+
+  /* the maximum length of topic name*/
+  val maxTopicNameLen = Utils.getIntInRange(props, "max.topic.len", 255, (1, Int.MaxValue))
 }
