diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index a807c1f..fb2a230 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -25,6 +25,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.consumer.ConsumerConfig import java.util.concurrent.atomic.AtomicInteger import kafka.network.RequestChannel +import kafka.message.MessageSet case class PartitionFetchInfo(offset: Long, fetchSize: Int) @@ -155,7 +156,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, data) => - (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null)) + (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) } val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))