Description
In NetworkClient.handleTimedOutRequests, we disconnect the broker connection:
private void handleTimedOutRequests(List<ClientResponse> responses, long now) { List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now); for (String nodeId : nodeIds) { // close connection to the node this.selector.close(nodeId); log.info("Disconnecting from node {} due to request timeout.", nodeId); processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE); } }
This calls processDisconnection which calls cancelInFlightRequests:
for (InFlightRequest request : inFlightRequests) { if (log.isDebugEnabled()) { log.debug("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected " + "(elapsed time since creation: {}ms, elapsed time since send: {}ms, request timeout: {}ms): {}", request.header.apiKey(), request.header.correlationId(), nodeId, request.timeElapsedSinceCreateMs(now), request.timeElapsedSinceSendMs(now), request.requestTimeoutMs, request.request); } else { log.info("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected " + "(elapsed time since creation: {}ms, elapsed time since send: {}ms, request timeout: {}ms)", request.header.apiKey(), request.header.correlationId(), nodeId, request.timeElapsedSinceCreateMs(now), request.timeElapsedSinceSendMs(now), request.requestTimeoutMs); } if (!request.isInternalRequest) { if (responses != null) responses.add(request.disconnected(now, null)); } else if (request.header.apiKey() == ApiKeys.METADATA) { metadataUpdater.handleFailedRequest(now, Optional.empty()); } }
We create a new ClientResponse in which the disconnected flag is set.
We then complete the record batch In Sender.handleProduceResponse with:
if (response.wasDisconnected()) { log.trace("Cancelled request with header {} due to node {} being disconnected", requestHeader, response.destination()); for (ProducerBatch batch : batches.values()) completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION, String.format("Disconnected from node %s", response.destination())), correlationId, now); }
This seems like it could be confusing for customers that they would see network exceptions on a request timeout instead of a timeout error.
One implication of completing the batch with a network exception is that the producer will try to refresh metadata after a request timeout. I can see arguments for why this is necessary.
Attachments
Issue Links
- duplicates
-
KAFKA-10228 producer: NETWORK_EXCEPTION is thrown instead of a request timeout
- Resolved
- links to