From 3257e07bc83157ab682e7114686b99301340813d Mon Sep 17 00:00:00 2001
From: Jiankang Liu <jiankliu@ebay.com>
Date: Wed, 17 Jun 2015 21:13:34 +0800
Subject: [PATCH] KAFKA-2282 ConsumerConnector enhance

---
 core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java  | 7 +++++++
 .../scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala  | 5 +++--
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index ca74ca8..c73a3e1 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -76,6 +76,13 @@ public interface ConsumerConnector {
   public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
 
   /**
+   *  set a consumer rebalance listener to be executed when consumer rebalance occurs.
+   *
+   *  @param listener The consumer rebalance listener to set
+   */
+  public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
+
+    /**
    *  Shut down the connector
    */
   public void shutdown();
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index bfd8d37..b653c18 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -19,7 +19,7 @@ package kafka.javaapi.consumer
 import kafka.serializer._
 import kafka.consumer._
 import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException}
-import scala.collection.{immutable, mutable, JavaConversions}
+import scala.collection.{mutable, JavaConversions}
 import java.util.concurrent.atomic.AtomicBoolean
 
 /**
@@ -115,7 +115,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) {
-    underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition, OffsetAndMetadata]], retryOnFailure)
+    import JavaConversions._
+    underlying.commitOffsets(offsetsToCommit.toMap, retryOnFailure)
   }
 
   def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener) {
-- 
1.7.12.4

