Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5345

Some socket connections not closed after restart of Kafka Streams


    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s:,
    • Fix Version/s:,
    • Component/s: streams
    • Labels:
    • Environment:
      MacOs 10.12.5 and Ubuntu 14.04


      We ran into a problem that resulted in a "Too many open files" exception because some sockets are not closed after a restart.
      This problem only occurs with version and and both work as expected.
      I used the same version for the server and client.

      I used https://github.com/kohsuke/file-leak-detector to display the open file descriptors. The culprit was :

      #146 socket channel by thread:pool-2-thread-1 on Mon May 29 11:20:47 CEST 2017
      	at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:108)
      	at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
      	at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
      	at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
      	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
      	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
      	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.ensureOneNodeIsReady(StreamsKafkaClient.java:195)
      	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.getAnyReadyBrokerId(StreamsKafkaClient.java:233)
      	at org.apache.kafka.streams.processor.internals.StreamsKafkaClient.checkBrokerCompatibility(StreamsKafkaClient.java:300)
      	at org.apache.kafka.streams.KafkaStreams.checkBrokerVersionCompatibility(KafkaStreams.java:401)
      	at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:425)

      I could narrow the problem down to a reproducable example below (the only dependency is
      IMPORTANT: You have to run this code in the Intellij IDEA debugger with a special breakpoint to see it fail.
      See the comments on the socketChannels variable on how to add this breakpoint.
      When you run this code you will see the number of open SocketChannels increase (only on version 0.10.2.x).

      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.KafkaStreams;
      import org.apache.kafka.streams.StreamsConfig;
      import org.apache.kafka.streams.kstream.KStream;
      import org.apache.kafka.streams.kstream.KStreamBuilder;
      import java.nio.channels.SocketChannel;
      import java.nio.channels.spi.AbstractInterruptibleChannel;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.TimeUnit;
      import java.util.stream.Collectors;
      public class App {
          private static KafkaStreams streams;
          private static String brokerList;
          // Fill socketChannels with entries on line 'Socket socket = socketChannel.socket();' (line number 170  on
          // of org.apache.kafka.common.network.Selector: Add breakpoint, right click on breakpoint.
          // - Uncheck 'Suspend'
          // - Check 'Evaluate and log' and fill text field with (without quotes) 'App.socketChannels.add(socketChannel)'
          private static final List<SocketChannel> socketChannels = new ArrayList<>();
          public static void main(String[] args) {
              brokerList = args[0];
              ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
              Runnable command = () -> {
                  System.out.println("Open socketChannels: " + socketChannels.stream()
              scheduledThreadPool.scheduleWithFixedDelay(command, 10000L, 2000, TimeUnit.MILLISECONDS);
          private static void init() {
              Properties streamsConfiguration = new Properties();
              streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JeroenApp");
              streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
              StreamsConfig config = new StreamsConfig(streamsConfiguration);
              KStreamBuilder builder = new KStreamBuilder();
              KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), "HarrieTopic");
              stream.foreach((key, value) -> System.out.println(value));
              streams = new KafkaStreams(builder, config);




            • Assignee:
              rsivaram Rajini Sivaram
              jvwilge Jeroen van Wilgenburg
            • Votes:
              0 Vote for this issue
              5 Start watching this issue


              • Created: