Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-101

Samza task leaking file descriptors on Kafka exceptions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.9.0
    • None
    • None

    Description

      Initially, my samza task began seeing many UnresolvedAddressExceptions, likely because the kafka cluster went down and the samza task is retrying:

      2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating new SimpleConsumer for host kafka-host-12345:10251 for system kafka
      2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating simple consumer and retrying connection
      2013-12-06 12:17:23 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace for fetchMessages exception.
      java.nio.channels.UnresolvedAddressException
      at sun.nio.ch.Net.checkAddress(Net.java:30)
      at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:480)
      at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
      at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
      at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
      at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
      at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
      at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
      at org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
      at java.lang.Thread.run(Thread.java:662)
      

      Eventually, I began seeing these, which the samza task will never recover from:

      2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating simple consumer and retrying connection
      2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [INFO] Creating new SimpleConsumer for host kafka-host-12345:10251 for system kafka
      2013-12-06 12:20:49 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace for fetchMessages exception.
      java.net.SocketException: Too many open files
      at sun.nio.ch.Net.socket0(Native Method)
      at sun.nio.ch.Net.socket(Net.java:97)
      at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:84)
      at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:37)
      at java.nio.channels.SocketChannel.open(SocketChannel.java:105)
      at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
      at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
      at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:143)
      at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
      at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
      at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
      at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
      at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(DefaultFetchSimpleConsumer.scala:50)
      at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.defaultFetch(DefaultFetchSimpleConsumer.scala:43)
      at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:134)
      at org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
      at java.lang.Thread.run(Thread.java:662)
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              alanwli Alan Li
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: