diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala new file mode 100644 index 0000000..ab99a59 --- /dev/null +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetCommitRequest extends Logging { + val CurrentVersion: Short = 1 + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetRequest + val consumerGroupId = readShortString(buffer) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val offset = buffer.getLong + (TopicAndPartition(topic, partitionId), offset) + }) + }) + OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId) + } +} + +case class OffsetCommitRequest(groupId: String, + requestInfo: Map[TopicAndPartition, Long], + versionId: Short = OffsetCommitRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetCommitRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetCommitRequest + writeShortString(buffer, groupId) // consumer group + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Long] + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { + buffer.putInt(t2._1.partition) // partition + buffer.putLong(t2._2) // offset + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + shortStringLength(groupId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { + val (topic, offsets) = topicAndOffsets + count + + shortStringLength(topic) + /* topic */ + 4 + /* number of partitions */ + offsets.size * ( + 4 + /* partition */ + 8 /* offset */ + ) + }) +} diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala new file mode 100644 index 0000000..035c21f --- /dev/null +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetCommitResponse extends Logging { + val CurrentVersion: Short = 1 + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): OffsetCommitResponse = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetResponse + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val error = buffer.getShort + (TopicAndPartition(topic, partitionId), error) + }) + }) + OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId) + } +} + +case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], + versionId: Short = OffsetCommitResponse.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetCommitResponse.DefaultClientId) + extends RequestOrResponse { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetCommitResponse + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short] + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { // TopicAndPartition -> Short + buffer.putInt(t2._1.partition) + buffer.putShort(t2._2) //error + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { + val (topic, offsets) = topicAndOffsets + count + + shortStringLength(topic) + /* topic */ + 4 + /* number of partitions */ + offsets.size * ( + 4 + /* partition */ + 2 /* error */ + ) + }) +} + diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala new file mode 100644 index 0000000..b58780f --- /dev/null +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetFetchRequest extends Logging { + val CurrentVersion: Short = 1 + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): OffsetFetchRequest = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetFetchRequest + val consumerGroupId = readShortString(buffer) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + TopicAndPartition(topic, partitionId) + }) + }) + OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId) + } +} + +case class OffsetFetchRequest(groupId: String, + requestInfo: Seq[TopicAndPartition], + versionId: Short = OffsetFetchRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetFetchRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetFetchRequest + writeShortString(buffer, groupId) // consumer group + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // (topic, Seq[TopicAndPartition]) + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { + buffer.putInt(t2.partition) + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + shortStringLength(groupId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, t) => { + count + shortStringLength(t._1) + /* topic */ + 4 + /* number of partitions */ + t._2.size * 4 /* partition */ + }) +} diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala new file mode 100644 index 0000000..4366bc1 --- /dev/null +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetFetchResponse extends Logging { + val CurrentVersion: Short = 1 + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): OffsetFetchResponse = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetResponse + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val offset = buffer.getLong + val error = buffer.getShort + (TopicAndPartition(topic, partitionId), (offset, error)) + }) + }) + OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId) + } +} + +case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, Tuple2[Long, Short]], + versionId: Short = OffsetFetchResponse.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetFetchResponse.DefaultClientId) + extends RequestOrResponse { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetFetchResponse + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Tuple2[Long, Short]] + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { // TopicAndPartition -> Tuple2[Long, Short] + buffer.putInt(t2._1.partition) + buffer.putLong(t2._2._1) + buffer.putShort(t2._2._2) + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, t) => { + count + + shortStringLength(t._1) + /* topic */ + 4 + /* number of partitions */ + t._2.size * ( + 4 + /* partition */ + 8 + /* offset */ + 2 /* error */ + ) + }) +} + diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index b000eb7..89ce92a 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -27,6 +27,8 @@ object RequestKeys { val MetadataKey: Short = 3 val LeaderAndIsrKey: Short = 4 val StopReplicaKey: Short = 5 + val OffsetCommitKey: Short = 6 + val OffsetFetchKey: Short = 7 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -34,7 +36,9 @@ object RequestKeys { OffsetsKey -> ("Offsets", OffsetRequest.readFrom), MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), - StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom)) + StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom), + OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), + OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 6b83deb..cd8ef0b 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -175,6 +175,20 @@ class SimpleConsumer(val host: String, */ def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) + /** + * Commit offsets for a topic + * @param request a [[kafka.api.OffsetCommitRequest]] object. + * @return a [[kafka.api.OffsetCommitResponse]] object. + */ + def commitOffsets(request: OffsetCommitRequest) = OffsetCommitResponse.readFrom(sendRequest(request).buffer) + + /** + * Fetch offsets for a topic + * @param request a [[kafka.api.OffsetFetchRequest]] object. + * @return a [[kafka.api.OffsetFetchResponse]] object. + */ + def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) + private def getOrMakeConnection() { if(!blockingChannel.isConnected) { connect() diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index cf82b38..66ab821 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -34,6 +34,12 @@ private[javaapi] object Implicits extends Logging { implicit def toJavaOffsetResponse(response: kafka.api.OffsetResponse): kafka.javaapi.OffsetResponse = new kafka.javaapi.OffsetResponse(response) + implicit def toJavaOffsetFetchResponse(response: kafka.api.OffsetFetchResponse): kafka.javaapi.OffsetFetchResponse = + new kafka.javaapi.OffsetFetchResponse(response) + + implicit def toJavaOffsetCommitResponse(response: kafka.api.OffsetCommitResponse): kafka.javaapi.OffsetCommitResponse = + new kafka.javaapi.OffsetCommitResponse(response) + implicit def optionToJavaRef[T](opt: Option[T]): T = { opt match { case Some(obj) => obj diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala new file mode 100644 index 0000000..4a1c53d --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.common.TopicAndPartition +import collection.JavaConversions +import java.nio.ByteBuffer + +class OffsetCommitRequest(groupId: String, + requestInfo: java.util.Map[TopicAndPartition, Long], + versionId: Short, + correlationId: Int, + clientId: String) { + val underlying = { + val scalaMap = JavaConversions.asMap(requestInfo).toMap + kafka.api.OffsetCommitRequest( + groupId = groupId, + requestInfo = scalaMap, + versionId = versionId, + correlationId = correlationId, + clientId = clientId + ) + } + + def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } + + + def sizeInBytes = underlying.sizeInBytes + + + override def toString = underlying.toString + + + override def equals(other: Any) = canEqual(other) && { + val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetCommitRequest] + this.underlying.equals(otherOffsetRequest.underlying) + } + + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetCommitRequest] + + + override def hashCode = underlying.hashCode + +} diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala new file mode 100644 index 0000000..f7083be --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.common.TopicAndPartition +import collection.JavaConversions +import java.nio.ByteBuffer + +class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) { + + def errors: java.util.Map[TopicAndPartition, Short] = { + JavaConversions.asMap(underlying.requestInfo) + } + + def sizeInBytes = underlying.sizeInBytes +} + diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala new file mode 100644 index 0000000..88e92c7 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.common.TopicAndPartition +import collection.JavaConversions +import java.nio.ByteBuffer + +class OffsetFetchRequest(groupId: String, + requestInfo: java.util.List[TopicAndPartition], + versionId: Short, + correlationId: Int, + clientId: String) { + + val underlying = { + val scalaSeq = JavaConversions.asBuffer(requestInfo) + kafka.api.OffsetFetchRequest( + groupId = groupId, + requestInfo = scalaSeq, + versionId = versionId, + correlationId = correlationId, + clientId = clientId + ) + } + + def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } + + + def sizeInBytes = underlying.sizeInBytes + + + override def toString = underlying.toString + + + override def equals(other: Any) = canEqual(other) && { + val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetFetchRequest] + this.underlying.equals(otherOffsetRequest.underlying) + } + + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetFetchRequest] + + + override def hashCode = underlying.hashCode + +} + + diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala new file mode 100644 index 0000000..98e5047 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.common.TopicAndPartition +import collection.JavaConversions +import java.nio.ByteBuffer + +class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) { + + def offsets: java.util.Map[TopicAndPartition, Tuple2[Long, Short]] = { + JavaConversions.asMap(underlying.requestInfo) + } + + def sizeInBytes = underlying.sizeInBytes + +} + diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 58c7081..0ab0195 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -21,7 +21,6 @@ import kafka.utils.threadsafe import kafka.javaapi.FetchResponse import kafka.javaapi.OffsetRequest - /** * A consumer of kafka messages */ @@ -80,6 +79,26 @@ class SimpleConsumer(val host: String, underlying.getOffsetsBefore(request.underlying) } + /** + * Commit offsets for a topic + * @param request a [[kafka.javaapi.OffsetCommitRequest]] object. + * @return a [[kafka.javaapi.OffsetCommitResponse]] object. + */ + def commitOffsets(request: kafka.javaapi.OffsetCommitRequest): kafka.javaapi.OffsetCommitResponse = { + import kafka.javaapi.Implicits._ + underlying.commitOffsets(request.underlying) + } + + /** + * Fetch offsets for a topic + * @param request a [[kafka.javaapi.OffsetFetchRequest]] object. + * @return a [[kafka.javaapi.OffsetFetchResponse]] object. + */ + def fetchOffsets(request: kafka.javaapi.OffsetFetchRequest): kafka.javaapi.OffsetFetchResponse = { + import kafka.javaapi.Implicits._ + underlying.fetchOffsets(request.underlying) + } + def close() { underlying.close } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ef3b66e..8d16920 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -22,7 +22,7 @@ import kafka.api._ import kafka.message._ import kafka.network._ import kafka.log._ -import kafka.utils.{Pool, SystemTime, Logging} +import kafka.utils.{Pool, SystemTime, Logging, ZkUtils, ZKGroupTopicDirs} import org.apache.log4j.Logger import scala.collection._ import kafka.network.RequestChannel.Response @@ -62,6 +62,8 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) + case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) + case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -118,6 +120,22 @@ class KafkaApis(val requestChannel: RequestChannel, error("error when handling request %s".format(apiRequest), e) val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + case RequestKeys.OffsetCommitKey => + val apiRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + val responseMap = apiRequest.requestInfo.map { + case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + }.toMap + error("error when handling request %s".format(apiRequest), e) + val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=apiRequest.correlationId) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + case RequestKeys.OffsetFetchKey => + val apiRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] + val responseMap = apiRequest.requestInfo.map { + case (topicAndPartition) => (topicAndPartition, Tuple2(-1L, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))) + }.toMap + error("error when handling request %s".format(apiRequest), e) + val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=apiRequest.correlationId) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds @@ -525,6 +543,57 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + /* + * Service the Offset commit API + */ + def handleOffsetCommitRequest(request: RequestChannel.Request) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + if(requestLogger.isTraceEnabled) + requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString) + trace("Handling offset commit request " + offsetCommitRequest.toString) + val responseInfo = offsetCommitRequest.requestInfo.map( t => { + val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic) + try { + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + + t._1.partition, t._2.toString) + (t._1, ErrorMapping.NoError) + } catch { + case e => + (t._1, ErrorMapping.UnknownCode) + } + }) + val response = new OffsetCommitResponse(responseInfo, + offsetCommitRequest.versionId, + offsetCommitRequest.correlationId, + offsetCommitRequest.clientId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + /* + * Service the Offset fetch API + */ + def handleOffsetFetchRequest(request: RequestChannel.Request) { + val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] + if(requestLogger.isTraceEnabled) + requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString) + trace("Handling offset fetch request " + offsetFetchRequest.toString) + val responseInfo = offsetFetchRequest.requestInfo.map( t => { + val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic) + try { + val offsetStr = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition) + (t, (offsetStr._1.toLong, ErrorMapping.NoError)) + } catch { + case e => + (t, (-1L, ErrorMapping.UnknownCode)) + } + }) + val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), + offsetFetchRequest.versionId, + offsetFetchRequest.correlationId, + offsetFetchRequest.clientId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + def close() { debug("Shutting down.") fetchRequestPurgatory.shutdown() diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 509b020..079a734 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -144,6 +144,35 @@ object SerializationTestUtils{ def createTestTopicMetadataResponse: TopicMetadataResponse = { new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1) } + + def createTestOffsetCommitRequest: OffsetCommitRequest = { + new OffsetCommitRequest("group 1", collection.immutable.Map( + TopicAndPartition(topic1, 0) -> 42L, + TopicAndPartition(topic1, 1) -> 100L + )) + } + + def createTestOffsetCommitResponse: OffsetCommitResponse = { + new OffsetCommitResponse(collection.immutable.Map( + TopicAndPartition(topic1, 0) -> ErrorMapping.NoError, + TopicAndPartition(topic1, 1) -> ErrorMapping.UnknownTopicOrPartitionCode + )) + } + + def createTestOffsetFetchRequest: OffsetFetchRequest = { + new OffsetFetchRequest("group 1", Seq( + TopicAndPartition(topic1, 0), + TopicAndPartition(topic1, 1) + )) + } + + def createTestOffsetFetchResponse: OffsetFetchResponse = { + new OffsetFetchResponse(collection.immutable.Map( + TopicAndPartition(topic1, 0) -> (42L, ErrorMapping.NoError), + TopicAndPartition(topic1, 1) -> (100L, ErrorMapping.UnknownTopicOrPartitionCode) + )) + } + } class RequestResponseSerializationTest extends JUnitSuite { @@ -158,6 +187,10 @@ class RequestResponseSerializationTest extends JUnitSuite { private val offsetResponse = SerializationTestUtils.createTestOffsetResponse private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse + private val offsetCommitRequest = SerializationTestUtils.createTestOffsetCommitRequest + private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse + private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest + private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse @Test @@ -238,5 +271,34 @@ class RequestResponseSerializationTest extends JUnitSuite { val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer) assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse, deserializedTopicMetadataResponse) + + buffer = ByteBuffer.allocate(offsetCommitRequest.sizeInBytes) + offsetCommitRequest.writeTo(buffer) + buffer.rewind() + val deserializedOffsetCommitRequest = OffsetCommitRequest.readFrom(buffer) + assertEquals("The original and deserialzed offsetCommitRequest should be the same", offsetCommitRequest, + deserializedOffsetCommitRequest) + + buffer = ByteBuffer.allocate(offsetCommitResponse.sizeInBytes) + offsetCommitResponse.writeTo(buffer) + buffer.rewind() + val deserializedOffsetCommitResponse = OffsetCommitResponse.readFrom(buffer) + assertEquals("The original and deserialzed offsetCommitResponse should be the same", offsetCommitResponse, + deserializedOffsetCommitResponse) + + buffer = ByteBuffer.allocate(offsetFetchRequest.sizeInBytes) + offsetFetchRequest.writeTo(buffer) + buffer.rewind() + val deserializedOffsetFetchRequest = OffsetFetchRequest.readFrom(buffer) + assertEquals("The original and deserialzed offsetFetchRequest should be the same", offsetFetchRequest, + deserializedOffsetFetchRequest) + + buffer = ByteBuffer.allocate(offsetFetchResponse.sizeInBytes) + offsetFetchResponse.writeTo(buffer) + buffer.rewind() + val deserializedOffsetFetchResponse = OffsetFetchResponse.readFrom(buffer) + assertEquals("The original and deserialzed offsetFetchResponse should be the same", offsetFetchResponse, + deserializedOffsetFetchResponse) + } } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala new file mode 100644 index 0000000..b67fc5d --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io.File +import kafka.utils._ +import junit.framework.Assert._ +import java.util.{Random, Properties} +import kafka.consumer.SimpleConsumer +import org.junit.{After, Before, Test} +import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import kafka.zk.ZooKeeperTestHarness +import org.scalatest.junit.JUnit3Suite +import kafka.admin.CreateTopicCommand +import kafka.api.{OffsetCommitRequest, OffsetFetchRequest} +import kafka.utils.TestUtils._ +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.utils.nonthreadsafe +import kafka.utils.threadsafe +import kafka.utils.ZkUtils +import org.junit.After +import org.junit.Before +import org.junit.Test + +class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { + val random = new Random() + var logDir: File = null + var topicLogDir: File = null + var server: KafkaServer = null + var logSize: Int = 100 + val brokerPort: Int = 9099 + var simpleConsumer: SimpleConsumer = null + var time: Time = new MockTime() + + @Before + override def setUp() { + super.setUp() + val config: Properties = createBrokerConfig(1, brokerPort) + val logDirPath = config.getProperty("log.dir") + logDir = new File(logDirPath) + time = new MockTime() + server = TestUtils.createServer(new KafkaConfig(config), time) + simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client") + } + + @After + override def tearDown() { + simpleConsumer.close + server.shutdown + Utils.rm(logDir) + super.tearDown() + } + + @Test + def testCommitOffsetsForUnknownTopic() { + val topicAndPartition = TopicAndPartition("offsets-unknown-topic", 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(topicAndPartition).get) + } + + @Test + def testCommitOffsetsSimple() { + val topic = "offsets-simple" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val topicAndPartition = TopicAndPartition(topic, 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(topicAndPartition).get) + } + + @Test + def testCommitOffsetsMulti() { + val topic = "offsets-multi" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val request = OffsetCommitRequest("test-group", Map( + TopicAndPartition(topic, 0) -> 42L, + TopicAndPartition(topic, 1) -> 43L, + TopicAndPartition("foo", 0) -> 44L + )) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(TopicAndPartition(topic, 0)).get) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(TopicAndPartition(topic, 1)).get) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(TopicAndPartition("foo", 0)).get) + } + + @Test + def testCommitOffsetsForUnknownPartition() { + val topic = "offsets-unknown-part" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val request = OffsetCommitRequest("test-group", Map( + TopicAndPartition(topic, 0) -> 42L, + TopicAndPartition(topic, 1) -> 43L + )) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(TopicAndPartition(topic, 0)).get) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(TopicAndPartition(topic, 1)).get) + } + + @Test + def testCommitAndFetchOffsetsSimple() { + val topic = "offsets-simple" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val topicAndPartition = TopicAndPartition(topic, 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(topicAndPartition).get) + + val request1 = OffsetFetchRequest("test-group", Seq(topicAndPartition)) + val response1 = simpleConsumer.fetchOffsets(request1) + assertEquals(ErrorMapping.NoError, response1.requestInfo.get(topicAndPartition).get._2) + assertEquals(42L, response1.requestInfo.get(topicAndPartition).get._1) + } + + @Test + def testFetchUnknownTopic() { + val topicAndPartition = TopicAndPartition("foo", 0) + val request = OffsetFetchRequest("test-group", Seq(topicAndPartition)) + val response = simpleConsumer.fetchOffsets(request) + assertEquals(ErrorMapping.UnknownCode, response.requestInfo.get(topicAndPartition).get._2) + } + + @Test + def testFetchUnknownPartition() { + val topic = "offsets-simple" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val topicAndPartition = TopicAndPartition(topic, 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, response.requestInfo.get(topicAndPartition).get) + + val request1 = OffsetFetchRequest("test-group", Seq( + TopicAndPartition(topic, 0), + TopicAndPartition(topic, 1) + )) + val response1 = simpleConsumer.fetchOffsets(request1) + assertEquals(ErrorMapping.NoError, response1.requestInfo.get(TopicAndPartition(topic, 0)).get._2) + assertEquals(ErrorMapping.UnknownCode, response1.requestInfo.get(TopicAndPartition(topic, 1)).get._2) + assertEquals(42L, response1.requestInfo.get(TopicAndPartition(topic, 0)).get._1) + assertEquals(-1L, response1.requestInfo.get(TopicAndPartition(topic, 1)).get._1) + } + + @Test + def testCommitAndFetchOffsetsMulti() { + val topic1 = "offsets-multi-1" + val topic2 = "offsets-multi-2" + val topic3 = "not-a-topic" + CreateTopicCommand.createTopic(zkClient, topic1, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic1, 0, server), 1000) + + CreateTopicCommand.createTopic(zkClient, topic2, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic2, 0, server), 1000) + + val commitRequest = OffsetCommitRequest("test-group", Map( + TopicAndPartition(topic1, 0) -> 42L, // existing topic+partition + TopicAndPartition(topic2, 0) -> 43L, // existing topic+partition + TopicAndPartition(topic3, 0) -> 44L, // non-existant topic + TopicAndPartition(topic2, 1) -> 45L // non-existant partition + )) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get) + + val fetchRequest = OffsetFetchRequest("test-group", Seq( + TopicAndPartition(topic1, 0), + TopicAndPartition(topic2, 0), + TopicAndPartition(topic3, 0), + TopicAndPartition(topic2, 1) + )) + val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get._2) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get._2) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get._2) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get._2) + + assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get._1) + assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get._1) + assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get._1) + assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get._1) + } + +}