Details
Description
We ran into a problem that resulted in a "Too many open files" exception because some sockets are not closed after a restart.
This problem only occurs with version 0.10.2.1 and 0.10.2.0.
0.10.1.1 and 0.10.1.0 both work as expected.
I used the same version for the server and client.
I used https://github.com/kohsuke/file-leak-detector to display the open file descriptors. The culprit was :
#146 socket channel by thread:pool-2-thread-1 on Mon May 29 11:20:47 CEST 2017 at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:108) at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) at java.nio.channels.SocketChannel.open(SocketChannel.java:145) at org.apache.kafka.common.network.Selector.connect(Selector.java:168) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186) at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.ensureOneNodeIsReady(StreamsKafkaClient.java:195) at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.getAnyReadyBrokerId(StreamsKafkaClient.java:233) at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.checkBrokerCompatibility(StreamsKafkaClient.java:300) at org.apache.kafka.streams.KafkaStreams.checkBrokerVersionCompatibility(KafkaStreams.java:401) at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:425)
I could narrow the problem down to a reproducable example below (the only dependency is
org.apache.kafka:kafka-streams:jar:0.10.2.1).
IMPORTANT: You have to run this code in the Intellij IDEA debugger with a special breakpoint to see it fail.
See the comments on the socketChannels variable on how to add this breakpoint.
When you run this code you will see the number of open SocketChannels increase (only on version 0.10.2.x).
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import java.nio.channels.SocketChannel; import java.nio.channels.spi.AbstractInterruptibleChannel; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class App { private static KafkaStreams streams; private static String brokerList; // Fill socketChannels with entries on line 'Socket socket = socketChannel.socket();' (line number 170 on 0.10.2.1) // of org.apache.kafka.common.network.Selector: Add breakpoint, right click on breakpoint. // - Uncheck 'Suspend' // - Check 'Evaluate and log' and fill text field with (without quotes) 'App.socketChannels.add(socketChannel)' private static final List<SocketChannel> socketChannels = new ArrayList<>(); public static void main(String[] args) { brokerList = args[0]; init(); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); Runnable command = () -> { streams.close(); System.out.println("Open socketChannels: " + socketChannels.stream() .filter(AbstractInterruptibleChannel::isOpen) .collect(Collectors.toList()).size()); init(); }; scheduledThreadPool.scheduleWithFixedDelay(command, 10000L, 2000, TimeUnit.MILLISECONDS); } private static void init() { Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JeroenApp"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); StreamsConfig config = new StreamsConfig(streamsConfiguration); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), "HarrieTopic"); stream.foreach((key, value) -> System.out.println(value)); streams = new KafkaStreams(builder, config); streams.start(); } }