diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index fda3e39..c606351 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -135,12 +135,17 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val producerResponseStatus = data.map { - case (topicAndPartition, data) => - (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) + if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) { + requestChannel.closeConnection(request.processor, request) + } + else { + val producerResponseStatus = data.map { + case (topicAndPartition, data) => + (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) + } + val errorResponse = ProducerResponse(correlationId, producerResponseStatus) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } - val errorResponse = ProducerResponse(correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } def emptyData(){