diff --git core/src/main/scala/kafka/log/LogManager.scala core/src/main/scala/kafka/log/LogManager.scala
index c1cdf86..b5f6c93 100644
--- core/src/main/scala/kafka/log/LogManager.scala
+++ core/src/main/scala/kafka/log/LogManager.scala
@@ -42,6 +42,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
   private val logFileSizeMap = config.logFileSizeMap
   private val flushInterval = config.flushInterval
   private val logCreationLock = new Object
+  private val topicNameValidator = new TopicNameValidator(config)
   private val logFlushIntervals = config.flushIntervalMap
   private val logRetentionSizeMap = config.logRetentionSizeMap
   private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
@@ -92,11 +93,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
 
 
   /**
-   * Create a log for the given topic and the given partition
+   * Create a log for the given topic (should be validated already) and the given partition.
    */
   private def createLog(topic: String, partition: Int): Log = {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("Topic name can't be emtpy")
     if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
       val error = "Wrong partition %d, valid partitions (0, %d)."
               .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
@@ -124,6 +123,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
    * Get the log if it exists
    */
   def getLog(topic: String, partition: Int): Option[Log] = {
+    topicNameValidator.validate(topic)
     val parts = logs.get(topic)
     if (parts == null) None
     else {
@@ -138,6 +138,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
    */
   def getOrCreateLog(topic: String, partition: Int): Log = {
     var hasNewTopic = false
+    topicNameValidator.validate(topic)
     var parts = logs.get(topic)
     if (parts == null) {
       val found = logs.putIfNotExists(topic, new Pool[Int, Log])
diff --git core/src/main/scala/kafka/server/KafkaConfig.scala core/src/main/scala/kafka/server/KafkaConfig.scala
index feb7f2d..7473215 100644
--- core/src/main/scala/kafka/server/KafkaConfig.scala
+++ core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -112,6 +112,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(props.getString("topic.partition.count.map", ""))
 
+  /* the maximum length of topic name*/
+  val maxTopicNameLength = props.getIntInRange("max.topic.name.length", 255, (1, Int.MaxValue))
+
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
diff --git core/src/main/scala/kafka/utils/TopicNameValidator.scala core/src/main/scala/kafka/utils/TopicNameValidator.scala
new file mode 100644
index 0000000..951350a
--- /dev/null
+++ core/src/main/scala/kafka/utils/TopicNameValidator.scala
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**/
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import util.matching.Regex
+import kafka.server.KafkaConfig
+
+class TopicNameValidator(config: KafkaConfig) {
+  private val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F' +
+                          '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
+  // Regex checks for illegal chars and "." and ".." filenames
+  private val rgx = new Regex("(^\\.{1,2}$)|[" + illegalChars + "]")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > config.maxTopicNameLength)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + config.maxTopicNameLength + " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal, doesn't match expected regular expression")
+      case None =>
+    }
+  }
+}
diff --git core/src/test/scala/unit/kafka/log/LogManagerTest.scala core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 264d914..f1d7ddc 100644
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,12 +20,13 @@ package kafka.log
 import java.io._
 import junit.framework.Assert._
 import org.junit.Test
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{InvalidTopicException, OffsetOutOfRangeException}
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.server.KafkaConfig
 import kafka.utils._
+import collection.mutable.ArrayBuffer
 
 class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -82,6 +83,30 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   @Test
+  def testInvalidTopicName() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 3 to 8)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805', '\uFFFA')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "funny"
+    }
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        logManager.getOrCreateLog(invalidTopicNames(i), 0)
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+  }
+
+  @Test
   def testCleanupExpiredSegments() {
     val log = logManager.getOrCreateLog(name, 0)
     var offset = 0L
