From 50e9a7b7a008f92fe1eeab06bbb93757b5947022 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Fri, 16 Jan 2015 11:48:57 -0800 Subject: [PATCH 1/2] make javaapi.SimpleConsumer.offsetCommits() only commit to ZK --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala | 1 + core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala | 1 + core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index e53ee51..ecf24c1 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -128,6 +128,7 @@ class SimpleConsumer(val host: String, /** * Commit offsets for a topic + * Version 0 of the request will commit offsets to Zookeeper and version 1 of the request will commit offsets to Kafka. * @param request a [[kafka.api.OffsetCommitRequest]] object. * @return a [[kafka.api.OffsetCommitResponse]] object. */ diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala index 27fc1eb..873f575 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala @@ -32,6 +32,7 @@ class OffsetCommitRequest(groupId: String, kafka.api.OffsetCommitRequest( groupId = groupId, requestInfo = scalaMap, + versionId = 0, // binds to version 0 so that it commits to Zookeeper correlationId = correlationId, clientId = clientId ) diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 0ab0195..d90f3ca 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -80,7 +80,7 @@ class SimpleConsumer(val host: String, } /** - * Commit offsets for a topic + * Commit offsets for a topic to Zookeeper * @param request a [[kafka.javaapi.OffsetCommitRequest]] object. * @return a [[kafka.javaapi.OffsetCommitResponse]] object. */ -- 1.8.5.2 (Apple Git-48) From 2bdb060febb0469b0f5406a87a701ed751cde01f Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Fri, 16 Jan 2015 12:07:16 -0800 Subject: [PATCH 2/2] making the fetcOffset() api consistent --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala | 1 + core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala | 2 +- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index ecf24c1..7bd03f2 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -140,6 +140,7 @@ class SimpleConsumer(val host: String, /** * Fetch offsets for a topic + * Version 0 of the request will fetch offsets from Zookeeper and version 1 of the request will fetch offsets from Kafka. * @param request a [[kafka.api.OffsetFetchRequest]] object. * @return a [[kafka.api.OffsetFetchResponse]] object. */ diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala index 5b4f4bb..1c25aa3 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala @@ -36,7 +36,7 @@ class OffsetFetchRequest(groupId: String, kafka.api.OffsetFetchRequest( groupId = groupId, requestInfo = scalaSeq, - versionId = versionId, + versionId = 0, // binds to version 0 so that it commits to Zookeeper correlationId = correlationId, clientId = clientId ) diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index d90f3ca..abf6069 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -90,7 +90,7 @@ class SimpleConsumer(val host: String, } /** - * Fetch offsets for a topic + * Fetch offsets for a topic from Zookeeper * @param request a [[kafka.javaapi.OffsetFetchRequest]] object. * @return a [[kafka.javaapi.OffsetFetchResponse]] object. */ -- 1.8.5.2 (Apple Git-48)