diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 89ff382..0f0eabe 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -120,10 +120,14 @@ object ConsoleConsumer extends Logging {
     val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
     val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
             "set, the csv metrics will be outputed here")
-      .withRequiredArg
-      .describedAs("metrics dictory")
-      .ofType(classOf[java.lang.String])
-
+            .withRequiredArg
+            .describedAs("metrics dictory")
+            .ofType(classOf[java.lang.String])
+    val zkSessionTimeoutOpt = parser.accepts("zk-session-timeout-ms", "Zookeeper session timeout")
+            .withRequiredArg
+            .describedAs("ms")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(ZkConfig.ZkSessionTimeoutMs)
 
     val options: OptionSet = tryParse(parser, args)
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
@@ -166,6 +170,7 @@ object ConsoleConsumer extends Logging {
     props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
     props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString)
+    props.put("zk.session.timeout.ms", options.valueOf(zkSessionTimeoutOpt).toString)
 
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d53d511..bb6d8d4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -762,17 +762,24 @@ class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group)
   def consumerOwnerDir = consumerGroupDir + "/owners/" + topic
 }
 
+object ZkConfig {
+  val ZkSessionTimeoutMs = 6000
+  val ZkSyncTimeMs = 2000
+}
 
 class ZKConfig(props: VerifiableProperties) {
+  import ZkConfig._
+
   /** ZK host string */
   val zkConnect = props.getString("zookeeper.connect")
 
   /** zookeeper session timeout */
-  val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)
+  val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", ZkSessionTimeoutMs)
 
   /** the max time that the client waits to establish a connection to zookeeper */
   val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs)
 
   /** how far a ZK follower can be behind a ZK leader */
-  val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
+  val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", ZkSyncTimeMs)
 }
+
