If a producer forcefully disconnects from a broker while it has staged receives, that connection enters a limbo state where it is no longer processed by the SocketServer.Processor, leaking the file descriptor for the socket and the memory used for the staged recieve queue for that connection.
We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after the rolling restart to upgrade, open file descriptors on the brokers started climbing uncontrollably. In a few cases brokers reached our configured max open files limit of 100k and crashed before we rolled back.
We tracked this down to a buildup of muted connections in the Selector.closingChannels list. If a client disconnects from the broker with multiple pending produce requests, when the broker attempts to send an ack to the client it recieves an IOException because the TCP socket has been closed. This triggers the Selector to close the channel, but because it still has pending requests, it adds it to Selector.closingChannels to process those requests. However, because that exception was triggered by trying to send a response, the SocketServer.Processor has marked the channel as muted and will no longer process it at all.
Starting a Kafka broker/cluster
Client produces several messages and then disconnects abruptly (eg. ./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic)
Broker then leaks file descriptor previously used for TCP socket and memory for unprocessed messages
Proposed solution (which we've implemented internally)
Whenever an exception is encountered when writing to a socket in Selector.pollSelectionKeys(...) record that that connection failed a send by adding the KafkaChannel ID to Selector.failedSends. Then re-raise the exception to still trigger the socket disconnection logic. Since every exception raised in this function triggers a disconnect, we also treat any exception while writing to the socket as a failed send.