diff --git core/src/main/scala/kafka/api/OffsetCommitRequest.scala core/src/main/scala/kafka/api/OffsetCommitRequest.scala new file mode 100644 index 0000000..96a8ea5 --- /dev/null +++ core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -0,0 +1,83 @@ +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetCommitRequest extends Logging { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + val replicaId = buffer.getInt + + // Read the OffsetRequest + val consumerGroupId = readShortString(buffer) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val offset = buffer.getLong + (TopicAndPartition(topic, partitionId), offset) + }) + }) + OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId, replicaId) + } +} + +case class OffsetCommitRequest(groupId: String, + requestInfo: Map[TopicAndPartition, Long], + versionId: Short = OffsetCommitRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetCommitRequest.DefaultClientId, + replicaId: Int = Request.OrdinaryConsumerId) + extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + buffer.putInt(replicaId) + + // Write OffsetCommitRequest + writeShortString(buffer, groupId) // consumer group + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.groupBy(_._1.partition).foreach( t2 => { + val offset = t2._2.get(TopicAndPartition(t1._1, t2._1)).get + buffer.putInt(t2._1) // partition + buffer.putLong(offset) // offset + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + 4 + /* replicaId */ + shortStringLength(groupId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { + val (topic, offsets) = currTopic + shortStringLength(topic) + /* topic */ + 4 + /* number of partitions */ + offsets.size * ( + 4 + /* partition */ + 8 /* offset */ + ) + }) +} diff --git core/src/main/scala/kafka/api/RequestKeys.scala core/src/main/scala/kafka/api/RequestKeys.scala index b000eb7..71049bc 100644 --- core/src/main/scala/kafka/api/RequestKeys.scala +++ core/src/main/scala/kafka/api/RequestKeys.scala @@ -27,6 +27,7 @@ object RequestKeys { val MetadataKey: Short = 3 val LeaderAndIsrKey: Short = 4 val StopReplicaKey: Short = 5 + val OffsetCommitKey: Short = 6 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -34,7 +35,8 @@ object RequestKeys { OffsetsKey -> ("Offsets", OffsetRequest.readFrom), MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), - StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom)) + StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom), + OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index eff627c..6d1bd38 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -22,7 +22,7 @@ import kafka.api._ import kafka.message._ import kafka.network._ import kafka.log._ -import kafka.utils.{Pool, SystemTime, Logging} +import kafka.utils.{Pool, SystemTime, Logging, ZkUtils, ZKGroupTopicDirs} import org.apache.log4j.Logger import scala.collection._ import kafka.network.RequestChannel.Response @@ -60,6 +60,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) + case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -520,6 +521,17 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + def handleOffsetCommitRequest(request: RequestChannel.Request) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + println("Committing offsets for group " + offsetCommitRequest.groupId + ", " + + offsetCommitRequest.requestInfo.size + " partitions") + offsetCommitRequest.requestInfo.foreach( t => { + val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic) + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + + t._1.partition, t._2.toString) + }) + } + def close() { debug("Shutting down.") fetchRequestPurgatory.shutdown() diff --git core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 531f32e..4923ad4 100644 --- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -143,6 +143,13 @@ object SerializationTestUtils{ def createTestTopicMetadataResponse: TopicMetadataResponse = { new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2)) } + + def createTestOffsetCommitRequest: OffsetCommitRequest = { + new OffsetCommitRequest("group 1", collection.immutable.Map( + TopicAndPartition(topic1, 0) -> 42L, + TopicAndPartition(topic1, 1) -> 100L + )) + } } class RequestResponseSerializationTest extends JUnitSuite { @@ -157,6 +164,7 @@ class RequestResponseSerializationTest extends JUnitSuite { private val offsetResponse = SerializationTestUtils.createTestOffsetResponse private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse + private val offsetCommitRequest = SerializationTestUtils.createTestOffsetCommitRequest @Test @@ -237,5 +245,13 @@ class RequestResponseSerializationTest extends JUnitSuite { val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer) assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse, deserializedTopicMetadataResponse) + + buffer = ByteBuffer.allocate(offsetCommitRequest.sizeInBytes) + offsetCommitRequest.writeTo(buffer) + buffer.rewind() + val deserializedOffsetCommitRequest = OffsetCommitRequest.readFrom(buffer) + assertEquals("The original and deserialzed offsetCommitRequest should be the same", offsetCommitRequest, + deserializedOffsetCommitRequest) + } }