diff --git core/src/main/scala/kafka/api/OffsetCommitRequest.scala core/src/main/scala/kafka/api/OffsetCommitRequest.scala new file mode 100644 index 0000000..0f82e31 --- /dev/null +++ core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -0,0 +1,79 @@ +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetCommitRequest extends Logging { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "default" + + def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetRequest + val consumerGroupId = readShortString(buffer) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val offset = buffer.getLong + (TopicAndPartition(topic, partitionId), offset) + }) + }) + OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId) + } +} + +case class OffsetCommitRequest(groupId: String, + requestInfo: Map[TopicAndPartition, Long], + versionId: Short = OffsetCommitRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetCommitRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetCommitRequest + writeShortString(buffer, groupId) // consumer group + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Long] + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { + buffer.putInt(t2._1.partition) // partition + buffer.putLong(t2._2) // offset + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + shortStringLength(groupId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { + val (topic, offsets) = topicAndOffsets + count + + shortStringLength(topic) + /* topic */ + 4 + /* number of partitions */ + offsets.size * ( + 4 + /* partition */ + 8 /* offset */ + ) + }) +} diff --git core/src/main/scala/kafka/api/OffsetCommitResponse.scala core/src/main/scala/kafka/api/OffsetCommitResponse.scala new file mode 100644 index 0000000..abdbf5e --- /dev/null +++ core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -0,0 +1,76 @@ +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetCommitResponse extends Logging { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): OffsetCommitResponse = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetResponse + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val error = buffer.getShort + (TopicAndPartition(topic, partitionId), error) + }) + }) + OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId) + } +} + +case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], + versionId: Short = OffsetCommitResponse.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetCommitResponse.DefaultClientId) + extends RequestOrResponse { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetCommitResponse + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short] + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { // TopicAndPartition -> Short + buffer.putInt(t2._1.partition) + buffer.putShort(t2._2) //error + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { + val (topic, offsets) = topicAndOffsets + count + + shortStringLength(topic) + /* topic */ + 4 + /* number of partitions */ + offsets.size * ( + 4 + /* partition */ + 2 /* error */ + ) + }) +} + diff --git core/src/main/scala/kafka/api/OffsetFetchRequest.scala core/src/main/scala/kafka/api/OffsetFetchRequest.scala new file mode 100644 index 0000000..56a4c49 --- /dev/null +++ core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -0,0 +1,72 @@ +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetFetchRequest extends Logging { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "default" + + def readFrom(buffer: ByteBuffer): OffsetFetchRequest = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetFetchRequest + val consumerGroupId = readShortString(buffer) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + TopicAndPartition(topic, partitionId) + }) + }) + OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId) + } +} + +case class OffsetFetchRequest(groupId: String, + requestInfo: Seq[TopicAndPartition], + versionId: Short = OffsetFetchRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetFetchRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetFetchRequest + writeShortString(buffer, groupId) // consumer group + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // (topic, Seq[TopicAndPartition]) + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { + buffer.putInt(t2.partition) + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + shortStringLength(groupId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, t) => { + count + shortStringLength(t._1) + /* topic */ + 4 + /* number of partitions */ + t._2.size * 4 /* partition */ + }) +} diff --git core/src/main/scala/kafka/api/OffsetFetchResponse.scala core/src/main/scala/kafka/api/OffsetFetchResponse.scala new file mode 100644 index 0000000..484e47a --- /dev/null +++ core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -0,0 +1,78 @@ +package kafka.api + +import java.nio.ByteBuffer + +import kafka.api.ApiUtils._ +import kafka.common.TopicAndPartition +import kafka.utils.Logging + +object OffsetFetchResponse extends Logging { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): OffsetFetchResponse = { + // Read values from the envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) + + // Read the OffsetResponse + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val offset = buffer.getLong + val error = buffer.getShort + (TopicAndPartition(topic, partitionId), (offset, error)) + }) + }) + OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId) + } +} + +case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, Tuple2[Long, Short]], + versionId: Short = OffsetFetchResponse.CurrentVersion, + correlationId: Int = 0, + clientId: String = OffsetFetchResponse.DefaultClientId) + extends RequestOrResponse { + + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + + def writeTo(buffer: ByteBuffer) { + // Write envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + + // Write OffsetFetchResponse + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics + requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Tuple2[Long, Short]] + writeShortString(buffer, t1._1) // topic + buffer.putInt(t1._2.size) // number of partitions for this topic + t1._2.foreach( t2 => { // TopicAndPartition -> Tuple2[Long, Short] + buffer.putInt(t2._1.partition) + buffer.putLong(t2._2._1) + buffer.putShort(t2._2._2) + }) + }) + } + + override def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + shortStringLength(clientId) + + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((count, t) => { + count + + shortStringLength(t._1) + /* topic */ + 4 + /* number of partitions */ + t._2.size * ( + 4 + /* partition */ + 8 + /* offset */ + 2 /* error */ + ) + }) +} + diff --git core/src/main/scala/kafka/api/RequestKeys.scala core/src/main/scala/kafka/api/RequestKeys.scala index b000eb7..89ce92a 100644 --- core/src/main/scala/kafka/api/RequestKeys.scala +++ core/src/main/scala/kafka/api/RequestKeys.scala @@ -27,6 +27,8 @@ object RequestKeys { val MetadataKey: Short = 3 val LeaderAndIsrKey: Short = 4 val StopReplicaKey: Short = 5 + val OffsetCommitKey: Short = 6 + val OffsetFetchKey: Short = 7 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -34,7 +36,9 @@ object RequestKeys { OffsetsKey -> ("Offsets", OffsetRequest.readFrom), MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), - StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom)) + StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom), + OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), + OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git core/src/main/scala/kafka/consumer/SimpleConsumer.scala core/src/main/scala/kafka/consumer/SimpleConsumer.scala index d642a67..56b807c 100644 --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -159,6 +159,20 @@ class SimpleConsumer(val host: String, */ def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) + /** + * Commit offsets for a topic + * @param request a [[kafka.api.OffsetCommitRequest]] object. + * @return a [[kafka.api.OffsetCommitResponse]] object. + */ + def commitOffsets(request: OffsetCommitRequest) = OffsetCommitResponse.readFrom(sendRequest(request).buffer) + + /** + * Fetch offsets for a topic + * @param request a [[kafka.api.OffsetFetchRequest]] object. + * @return a [[kafka.api.OffsetFetchResponse]] object. + */ + def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) + private def getOrMakeConnection() { if(!blockingChannel.isConnected) { connect() diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index eff627c..549d03f 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -22,7 +22,7 @@ import kafka.api._ import kafka.message._ import kafka.network._ import kafka.log._ -import kafka.utils.{Pool, SystemTime, Logging} +import kafka.utils.{Pool, SystemTime, Logging, ZkUtils, ZKGroupTopicDirs} import org.apache.log4j.Logger import scala.collection._ import kafka.network.RequestChannel.Response @@ -60,6 +60,8 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) + case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) + case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -520,6 +522,56 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + /* + * Service the Offset commit API + */ + def handleOffsetCommitRequest(request: RequestChannel.Request) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + info("Committing offsets: " + offsetCommitRequest) + val responseInfo = offsetCommitRequest.requestInfo.map( t => { + // Check if topic and partition exist + if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(t._1.topic)) && + ZkUtils.pathExists(zkClient, ZkUtils.getTopicPartitionPath(t._1.topic, t._1.partition))) { + val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic) + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + + t._1.partition, t._2.toString) + (t._1, ErrorMapping.NoError) + } else { + // TODO what do we do with unknown topics/partitions? + (t._1, ErrorMapping.UnknownTopicOrPartitionCode) + } + }) + val response = new OffsetCommitResponse(responseInfo, + offsetCommitRequest.versionId, + offsetCommitRequest.correlationId, + offsetCommitRequest.clientId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + + /* + * Service the Offset fetch API + */ + def handleOffsetFetchRequest(request: RequestChannel.Request) { + val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] + info("Fetching offsets: " + offsetFetchRequest) + val responseInfo = offsetFetchRequest.requestInfo.map( t => { + // Check if topic and partition exist + if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(t.topic)) && + ZkUtils.pathExists(zkClient, ZkUtils.getTopicPartitionPath(t.topic, t.partition))) { + val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic) + val offsetStr = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition) + (t, (offsetStr._1.toLong, ErrorMapping.NoError)) + } else { + (t, (-1L, ErrorMapping.UnknownTopicOrPartitionCode)) + } + }) + val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), + offsetFetchRequest.versionId, + offsetFetchRequest.correlationId, + offsetFetchRequest.clientId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } + def close() { debug("Shutting down.") fetchRequestPurgatory.shutdown() diff --git core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 531f32e..8303286 100644 --- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -143,6 +143,35 @@ object SerializationTestUtils{ def createTestTopicMetadataResponse: TopicMetadataResponse = { new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2)) } + + def createTestOffsetCommitRequest: OffsetCommitRequest = { + new OffsetCommitRequest("group 1", collection.immutable.Map( + TopicAndPartition(topic1, 0) -> 42L, + TopicAndPartition(topic1, 1) -> 100L + )) + } + + def createTestOffsetCommitResponse: OffsetCommitResponse = { + new OffsetCommitResponse(collection.immutable.Map( + TopicAndPartition(topic1, 0) -> ErrorMapping.NoError, + TopicAndPartition(topic1, 1) -> ErrorMapping.UnknownTopicOrPartitionCode + )) + } + + def createTestOffsetFetchRequest: OffsetFetchRequest = { + new OffsetFetchRequest("group 1", Seq( + TopicAndPartition(topic1, 0), + TopicAndPartition(topic1, 1) + )) + } + + def createTestOffsetFetchResponse: OffsetFetchResponse = { + new OffsetFetchResponse(collection.immutable.Map( + TopicAndPartition(topic1, 0) -> (42L, ErrorMapping.NoError), + TopicAndPartition(topic1, 1) -> (100L, ErrorMapping.UnknownTopicOrPartitionCode) + )) + } + } class RequestResponseSerializationTest extends JUnitSuite { @@ -157,6 +186,10 @@ class RequestResponseSerializationTest extends JUnitSuite { private val offsetResponse = SerializationTestUtils.createTestOffsetResponse private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse + private val offsetCommitRequest = SerializationTestUtils.createTestOffsetCommitRequest + private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse + private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest + private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse @Test @@ -237,5 +270,34 @@ class RequestResponseSerializationTest extends JUnitSuite { val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer) assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse, deserializedTopicMetadataResponse) + + buffer = ByteBuffer.allocate(offsetCommitRequest.sizeInBytes) + offsetCommitRequest.writeTo(buffer) + buffer.rewind() + val deserializedOffsetCommitRequest = OffsetCommitRequest.readFrom(buffer) + assertEquals("The original and deserialzed offsetCommitRequest should be the same", offsetCommitRequest, + deserializedOffsetCommitRequest) + + buffer = ByteBuffer.allocate(offsetCommitResponse.sizeInBytes) + offsetCommitResponse.writeTo(buffer) + buffer.rewind() + val deserializedOffsetCommitResponse = OffsetCommitResponse.readFrom(buffer) + assertEquals("The original and deserialzed offsetCommitResponse should be the same", offsetCommitResponse, + deserializedOffsetCommitResponse) + + buffer = ByteBuffer.allocate(offsetFetchRequest.sizeInBytes) + offsetFetchRequest.writeTo(buffer) + buffer.rewind() + val deserializedOffsetFetchRequest = OffsetFetchRequest.readFrom(buffer) + assertEquals("The original and deserialzed offsetFetchRequest should be the same", offsetFetchRequest, + deserializedOffsetFetchRequest) + + buffer = ByteBuffer.allocate(offsetFetchResponse.sizeInBytes) + offsetFetchResponse.writeTo(buffer) + buffer.rewind() + val deserializedOffsetFetchResponse = OffsetFetchResponse.readFrom(buffer) + assertEquals("The original and deserialzed offsetFetchResponse should be the same", offsetFetchResponse, + deserializedOffsetFetchResponse) + } } diff --git core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala new file mode 100644 index 0000000..44f58bf --- /dev/null +++ core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -0,0 +1,206 @@ +package kafka.server + +import java.io.File +import kafka.utils._ +import junit.framework.Assert._ +import java.util.{Random, Properties} +import kafka.consumer.SimpleConsumer +import org.junit.{After, Before, Test} +import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import kafka.zk.ZooKeeperTestHarness +import org.scalatest.junit.JUnit3Suite +import kafka.admin.CreateTopicCommand +import kafka.api.{OffsetCommitRequest, OffsetFetchRequest} +import kafka.utils.TestUtils._ +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.utils.nonthreadsafe +import kafka.utils.threadsafe +import kafka.utils.ZkUtils +import org.junit.After +import org.junit.Before +import org.junit.Test + +class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { + val random = new Random() + var logDir: File = null + var topicLogDir: File = null + var server: KafkaServer = null + var logSize: Int = 100 + val brokerPort: Int = 9099 + var simpleConsumer: SimpleConsumer = null + var time: Time = new MockTime() + + @Before + override def setUp() { + super.setUp() + val config: Properties = createBrokerConfig(1, brokerPort) + val logDirPath = config.getProperty("log.dir") + logDir = new File(logDirPath) + time = new MockTime() + server = TestUtils.createServer(new KafkaConfig(config), time) + simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024) + } + + @After + override def tearDown() { + simpleConsumer.close + server.shutdown + Utils.rm(logDir) + super.tearDown() + } + + @Test + def testCommitOffsetsForUnknownTopic() { + val topicAndPartition = TopicAndPartition("offsets-unknown-topic", 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + response.requestInfo.get(topicAndPartition).get) + } + + @Test + def testCommitOffsetsSimple() { + val topic = "offsets-simple" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val topicAndPartition = TopicAndPartition(topic, 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(topicAndPartition).get) + } + + @Test + def testCommitOffsetsMulti() { + val topic = "offsets-multi" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val request = OffsetCommitRequest("test-group", Map( + TopicAndPartition(topic, 0) -> 42L, + TopicAndPartition(topic, 1) -> 43L, + TopicAndPartition("foo", 0) -> 44L + )) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(TopicAndPartition(topic, 0)).get) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + response.requestInfo.get(TopicAndPartition(topic, 1)).get) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + response.requestInfo.get(TopicAndPartition("foo", 0)).get) + } + + @Test + def testCommitOffsetsForUnknownPartition() { + val topic = "offsets-unknown-part" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val request = OffsetCommitRequest("test-group", Map( + TopicAndPartition(topic, 0) -> 42L, + TopicAndPartition(topic, 1) -> 43L + )) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(TopicAndPartition(topic, 0)).get) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + response.requestInfo.get(TopicAndPartition(topic, 1)).get) + } + + @Test + def testCommitAndFetchOffsetsSimple() { + val topic = "offsets-simple" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val topicAndPartition = TopicAndPartition(topic, 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(topicAndPartition).get) + + val request1 = OffsetFetchRequest("test-group", Seq(topicAndPartition)) + val response1 = simpleConsumer.fetchOffsets(request1) + assertEquals(ErrorMapping.NoError, response1.requestInfo.get(topicAndPartition).get._2) + assertEquals(42L, response1.requestInfo.get(topicAndPartition).get._1) + } + + @Test + def testFetchUnknownTopic() { + val topicAndPartition = TopicAndPartition("foo", 0) + val request = OffsetFetchRequest("test-group", Seq(topicAndPartition)) + val response = simpleConsumer.fetchOffsets(request) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + response.requestInfo.get(topicAndPartition).get._2) + } + + @Test + def testFetchUnknownPartition() { + val topic = "offsets-simple" + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, server), 1000) + + val topicAndPartition = TopicAndPartition(topic, 0) + val request = OffsetCommitRequest("test-group", Map(topicAndPartition -> 42L)) + val response = simpleConsumer.commitOffsets(request) + assertEquals(ErrorMapping.NoError, + response.requestInfo.get(topicAndPartition).get) + + val request1 = OffsetFetchRequest("test-group", Seq( + TopicAndPartition(topic, 0), + TopicAndPartition(topic, 1) + )) + val response1 = simpleConsumer.fetchOffsets(request1) + assertEquals(ErrorMapping.NoError, response1.requestInfo.get(TopicAndPartition(topic, 0)).get._2) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, response1.requestInfo.get(TopicAndPartition(topic, 1)).get._2) + assertEquals(42L, response1.requestInfo.get(TopicAndPartition(topic, 0)).get._1) + assertEquals(-1L, response1.requestInfo.get(TopicAndPartition(topic, 1)).get._1) + } + + @Test + def testCommitAndFetchOffsetsMulti() { + val topic1 = "offsets-multi-1" + val topic2 = "offsets-multi-2" + val topic3 = "not-a-topic" + CreateTopicCommand.createTopic(zkClient, topic1, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic1, 0, server), 1000) + + CreateTopicCommand.createTopic(zkClient, topic2, 1, 1, "1") + waitUntilTrue(() => isLeaderLocalOnBroker(topic2, 0, server), 1000) + + val commitRequest = OffsetCommitRequest("test-group", Map( + TopicAndPartition(topic1, 0) -> 42L, // existing topic+partition + TopicAndPartition(topic2, 0) -> 43L, // existing topic+partition + TopicAndPartition(topic3, 0) -> 44L, // non-existant topic + TopicAndPartition(topic2, 1) -> 45L // non-existant partition + )) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + commitResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + commitResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get) + + val fetchRequest = OffsetFetchRequest("test-group", Seq( + TopicAndPartition(topic1, 0), + TopicAndPartition(topic2, 0), + TopicAndPartition(topic3, 0), + TopicAndPartition(topic2, 1) + )) + val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get._2) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get._2) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get._2) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get._2) + + assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get._1) + assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get._1) + assertEquals(-1L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get._1) + assertEquals(-1L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get._1) + } + +}