diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
index cb33e34..5c5c201 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
@@ -15,30 +15,35 @@ package org.apache.kafka.common.utils;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
 
 public class ClientUtils {
+
+    private static final Pattern HOST_AND_PORT = Pattern.compile("(.+?):(\\d+)");
+
     public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
         List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
         for (String url : urls) {
             if (url != null && url.length() > 0) {
-                String[] pieces = url.split(":");
-                if (pieces.length != 2)
+                Matcher hostAndPort = HOST_AND_PORT.matcher(url);
+                if (!hostAndPort.matches())
                     throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
                 try {
-                    InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
+                    InetSocketAddress address = new InetSocketAddress(hostAndPort.group(1), Integer.parseInt(hostAndPort.group(2)));
                     if (address.isUnresolved())
-                        throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
+                        throw new ConfigException("DNS resolution failed for bootstrap url: " + url);
                     addresses.add(address);
                 } catch (NumberFormatException e) {
-                    throw new ConfigException("Invalid port in metadata.broker.list: " + url);
+                    throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
                 }
             }
         }
         if (addresses.size() < 1)
-            throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
+            throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
         return addresses;
     }
 }
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
new file mode 100644
index 0000000..c52ca92
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class ClientUtilsTest {
+
+    @Test
+    public void testParseAndValidateAddresses() {
+        check("127.0.0.1:8000");
+        check("mydomain.com:8080");
+        check("[::1]:8000");
+        check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234");
+        check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testNoPort() {
+        check("127.0.0.1");
+    }
+
+    private void check(String... url) {
+        ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+    }
+}
\ No newline at end of file
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
index f3fb3fd..d5e00aa 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -27,9 +27,9 @@ import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLRequest;
 import kafka.etl.Props;
 import kafka.javaapi.producer.Producer;
-import kafka.message.Message;
 import kafka.producer.ProducerConfig;
 import kafka.producer.KeyedMessage;
+import kafka.utils.Utils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -70,7 +70,7 @@ public class DataGenerator {
 		
 		System.out.println("server uri:" + _uri.toString());
         Properties producerProps = new Properties();
-        producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
+        producerProps.put("metadata.broker.list", Utils.addressString(_uri.getHost(), _uri.getPort()));
         producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
         producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
         producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
@@ -108,7 +108,7 @@ public class DataGenerator {
         if (fs.exists(outPath)) fs.delete(outPath);
         
         KafkaETLRequest request =
-            new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0);
+            new KafkaETLRequest(_topic, "tcp://" + Utils.addressString(_uri.getHost(), _uri.getPort()), 0);
 
         System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
         byte[] bytes = request.toString().getBytes("UTF-8");
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 003a09c..bddbabb 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -174,7 +174,7 @@ object TopicCommand {
     }
   }
   
-  def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
+  def formatBroker(broker: Broker) = broker.id + " (" + Utils.addressString(broker.host, broker.port) + ")"
   
   def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
     val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 51380a6..ba62634 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -20,8 +20,7 @@ package kafka.api
 import kafka.cluster.Broker
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
-import kafka.utils.Logging
-import collection.mutable.ArrayBuffer
+import kafka.utils.{Logging, Utils}
 import kafka.common._
 
 object TopicMetadata {
@@ -149,7 +148,7 @@ case class PartitionMetadata(partitionId: Int,
     partitionMetadataString.toString()
   }
 
-  private def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
+  private def formatBroker(broker: Broker) = broker.id + " (" + Utils.addressString(broker.host, broker.port) + ")"
 }
 
 
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ce7ede3..640057a 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -99,12 +99,9 @@ object ClientUtils extends Logging{
     val brokersStr = Utils.parseCsvList(brokerListStr)
 
     brokersStr.zipWithIndex.map(b =>{
-      val brokerStr = b._1
       val brokerId = b._2
-      val brokerInfos = brokerStr.split(":")
-      val hostName = brokerInfos(0)
-      val port = brokerInfos(1).toInt
-      new Broker(brokerId, hostName, port)
+      val (host, port) = Utils.parseHostPort(b._1)
+      new Broker(brokerId, host, port)
     })
   }
 
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 9407ed2..b45ac77 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -20,6 +20,7 @@ package kafka.cluster
 import kafka.utils.Utils._
 import kafka.utils.Json
 import kafka.api.ApiUtils._
+import kafka.utils.Utils
 import java.nio.ByteBuffer
 import kafka.common.{KafkaException, BrokerNotAvailableException}
 
@@ -54,11 +55,11 @@ private[kafka] object Broker {
   }
 }
 
-private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
+private[kafka] case class Broker(id: Int, host: String, port: Int) {
   
   override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
 
-  def getConnectionString(): String = host + ":" + port
+  def getConnectionString(): String = Utils.addressString(host, port)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 8db9203..b9d033c 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -46,7 +46,7 @@ class SimpleConsumer(val host: String,
   }
 
   private def disconnect() = {
-    debug("Disconnecting from " + host + ":" + port)
+    debug("Disconnecting from " + Utils.addressString(host, port))
     blockingChannel.disconnect()
   }
 
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 489f007..e9e5f28 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -126,7 +126,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    */
   private def disconnect() {
     try {
-      info("Disconnecting from " + config.host + ":" + config.port)
+      info("Disconnecting from " + Utils.addressString(config.host, config.port))
       blockingChannel.disconnect()
     } catch {
       case e: Exception => error("Error on disconnect: ", e)
@@ -137,11 +137,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
     if (!blockingChannel.isConnected && !shutdown) {
       try {
         blockingChannel.connect()
-        info("Connected to " + config.host + ":" + config.port + " for producing")
+        info("Connected to " + Utils.addressString(config.host, config.port) + " for producing")
       } catch {
         case e: Exception => {
           disconnect()
-          error("Producer connection to " +  config.host + ":" + config.port + " unsuccessful", e)
+          error("Producer connection to " +  Utils.addressString(config.host, config.port) + " unsuccessful", e)
           throw e
         }
       }
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index da52b42..3562e8d 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -395,21 +395,31 @@ object Utils extends Logging {
   }
 
   /**
-   * Parse a host and port out of a string
+   * Parse a host and port from a string
    */
-  def parseHostPort(hostport: String) : (String, Int) = {
-    val splits = hostport.split(":")
-    (splits(0), splits(1).toInt)
+  def parseHostPort(hostport: String): (String, Int) = {
+    val matched = """\[?(.+?)\]?:(\d+)""".r.unapplySeq(hostport).get
+    (matched(0), matched(1).toInt)
+  }
+
+  /**
+   * Convert host and port to a string, surrounding IPv6 addresses with braces '[', ']'
+   */
+  def addressString(host: String, port: Int): String = {
+    if (host.contains(":"))
+      "[" + host + "]:" + port
+    else
+      host + ":" + port
   }
 
   /**
    * Get the stack trace from an exception as a string
    */
   def stackTrace(e: Throwable): String = {
-    val sw = new StringWriter;
-    val pw = new PrintWriter(sw);
-    e.printStackTrace(pw);
-    sw.toString();
+    val sw = new StringWriter
+    val pw = new PrintWriter(sw)
+    e.printStackTrace(pw)
+    sw.toString()
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c4e13c5..e45e45c 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -142,7 +142,7 @@ object TestUtils extends Logging {
   }
 
   def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
-    configs.map(c => c.hostName + ":" + c.port).mkString(",")
+    configs.map(c => Utils.addressString(c.hostName, c.port)).mkString(",")
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index a502349..17a2d35 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -73,7 +73,7 @@ class UtilsTest extends JUnitSuite {
     assertEquals(1, Utils.abs(1))
     assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
   }
-  
+
   @Test
   def testReplaceSuffix() {
     assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
@@ -81,7 +81,7 @@ class UtilsTest extends JUnitSuite {
     assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""))
     assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"))
   }
-  
+
   @Test
   def testReadInt() {
     val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
@@ -90,7 +90,6 @@ class UtilsTest extends JUnitSuite {
       buffer.putInt(i*4, values(i))
       assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4))
     }
-
   }
 
   @Test
@@ -105,7 +104,7 @@ class UtilsTest extends JUnitSuite {
     assertTrue(emptyStringList.equals(emptyListFromNullString))
     assertTrue(emptyStringList.equals(emptyList))
   }
-  
+
   @Test
   def testInLock() {
     val lock = new ReentrantLock()
@@ -115,6 +114,21 @@ class UtilsTest extends JUnitSuite {
     }
     assertEquals(2, result)
     assertFalse("Should be unlocked", lock.isLocked)
-    
+  }
+
+  @Test
+  def testParseHostPort() {
+    assertEquals(("127.0.0.1", 8000), Utils.parseHostPort("127.0.0.1:8000"))
+    assertEquals(("mydomain.com", 8080), Utils.parseHostPort("mydomain.com:8080"))
+    assertEquals(("::1", 1234), Utils.parseHostPort("[::1]:1234"))
+    assertEquals(("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678), Utils.parseHostPort("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"))
+  }
+
+  @Test
+  def testAddressString() {
+    assertEquals("127.0.0.1:8000", Utils.addressString("127.0.0.1", 8000))
+    assertEquals("mydomain.com:8080", Utils.addressString("mydomain.com", 8080))
+    assertEquals("[::1]:1234", Utils.addressString("::1", 1234))
+    assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", Utils.addressString("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678))
   }
 }
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index d883bde..22dc287 100644
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -28,7 +28,7 @@ class EmbeddedZookeeper(val connectString: String) {
   val logDir = TestUtils.tempDir()
   val tickTime = 500
   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
-  val port = connectString.split(":")(1).toInt
+  val (_, port) = Utils.parseHostPort(connectString)
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
   factory.startup(zookeeper)
 
-- 
1.7.10.4

