Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2440

Kafka outage can lead to lockup of topology

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.1, 1.0.1, 1.0.2, 1.1.0
    • Fix Version/s: 1.0.4, 1.1.1, 1.2.0
    • Component/s: storm-core, storm-kafka
    • Labels:
      None

      Description

      During two somewhat extended outages of our Kafka cluster, we experienced a problem with our Storm topologies consuming data from that Kafka cluster.

      Almost all our topologies just silently stopped processing data from some of the topics/partitions, an the only way to fix this situation was to restart those topologies.

      I tracked down one occurrence of the failure to this worker, which was running one the KafkaSpouts:

      2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION]
      2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed
      org.apache.storm.kafka.FailedFetchException: Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION]
              at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections
      2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=tagging_log, partitionMap={0=kafka-03:9092, 1=kafka-12:9092,
       2=kafka-08:9092, 3=kafka-05:9092}}
      2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned [Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, Partition{host=kafka-12:9092, topic=tagging_log, partit
      ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, Partition{host=kafka-05:9092, topic=tagging_log, partition=3}]
      2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}]
      2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}]
      2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition information from: /log_processing/tagging/kafka-tagging-spout/partition_1  --> {"partition":1,"off
      set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}}
      2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error:
      java.net.SocketTimeoutException
              at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
              at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
              at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [stormjar.jar:?]
              at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) [stormjar.jar:?]
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) [stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died!
      java.lang.RuntimeException: java.net.SocketTimeoutException
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      Caused by: java.net.SocketTimeoutException
              at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
              at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
              at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
              at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?]
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?]
              ... 5 more
      2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] 
      java.lang.RuntimeException: java.net.SocketTimeoutException
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      Caused by: java.net.SocketTimeoutException
              at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
              at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
              at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
              at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?]
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?]
              ... 5 more
      2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down...
      

      There were no more outputs in the log after that until the toplogy was manually killed.

      As you can see the java.net.SocketTimeoutException escapes the storm-kafka code (probably a problem in and of itself), but the worker is not killed. The thread that calls the .nextTuple method of the spout is exited on the other hand.
      This is the culprit line: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270

      I see that this has been fixed in the Java port of the executor code by explicitly excluding java.net.SocketTimeoutException from the condition.
      I will open a pull request with a backport tomorrow.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                nico.meyer Nico Meyer
                Reporter:
                nico.meyer Nico Meyer
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h
                  1h