Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2440

Kafka outage can lead to lockup of topology

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.1, 1.0.1, 1.0.2, 1.1.0
    • Fix Version/s: 1.0.4, 1.1.1
    • Component/s: storm-core, storm-kafka
    • Labels:
      None

      Description

      During two somewhat extended outages of our Kafka cluster, we experienced a problem with our Storm topologies consuming data from that Kafka cluster.

      Almost all our topologies just silently stopped processing data from some of the topics/partitions, an the only way to fix this situation was to restart those topologies.

      I tracked down one occurrence of the failure to this worker, which was running one the KafkaSpouts:

      2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION]
      2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed
      org.apache.storm.kafka.FailedFetchException: Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION]
              at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections
      2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=tagging_log, partitionMap={0=kafka-03:9092, 1=kafka-12:9092,
       2=kafka-08:9092, 3=kafka-05:9092}}
      2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned [Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, Partition{host=kafka-12:9092, topic=tagging_log, partit
      ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, Partition{host=kafka-05:9092, topic=tagging_log, partition=3}]
      2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}]
      2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}]
      2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition information from: /log_processing/tagging/kafka-tagging-spout/partition_1  --> {"partition":1,"off
      set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}}
      2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error:
      java.net.SocketTimeoutException
              at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
              at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
              at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [stormjar.jar:?]
              at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) [stormjar.jar:?]
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) [stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died!
      java.lang.RuntimeException: java.net.SocketTimeoutException
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      Caused by: java.net.SocketTimeoutException
              at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
              at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
              at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
              at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?]
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?]
              ... 5 more
      2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] 
      java.lang.RuntimeException: java.net.SocketTimeoutException
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?]
              at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
              at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      Caused by: java.net.SocketTimeoutException
              at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
              at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
              at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
              at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
              at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
              at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
              at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?]
              at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?]
              at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?]
              at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?]
              ... 5 more
      2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down...
      

      There were no more outputs in the log after that until the toplogy was manually killed.

      As you can see the java.net.SocketTimeoutException escapes the storm-kafka code (probably a problem in and of itself), but the worker is not killed. The thread that calls the .nextTuple method of the spout is exited on the other hand.
      This is the culprit line: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270

      I see that this has been fixed in the Java port of the executor code by explicitly excluding java.net.SocketTimeoutException from the condition.
      I will open a pull request with a backport tomorrow.

        Issue Links

          Activity

          Hide
          kristopherkane Kristopher Kane added a comment -

          I just discovered this today as well on Storm 1.0.1. We recently upgraded from .10

          Show
          kristopherkane Kristopher Kane added a comment - I just discovered this today as well on Storm 1.0.1. We recently upgraded from .10
          Hide
          nico.meyer Nico Meyer added a comment -

          As far as I can tell the issue is already present in 0.10.0

          Show
          nico.meyer Nico Meyer added a comment - As far as I can tell the issue is already present in 0.10.0
          Hide
          nico.meyer Nico Meyer added a comment - - edited
          Show
          nico.meyer Nico Meyer added a comment - - edited https://github.com/apache/storm/pull/2036 against branch 1.x-branch
          Hide
          kristopherkane Kristopher Kane added a comment -

          I don't have the immediate luxury to swap out storm-core and am looking to fix this in storm-kafka. What I don't get is that this is reported at the top as runtimeexception. Why wouldn't that cause the worker to die and restart?

          Show
          kristopherkane Kristopher Kane added a comment - I don't have the immediate luxury to swap out storm-core and am looking to fix this in storm-kafka. What I don't get is that this is reported at the top as runtimeexception. Why wouldn't that cause the worker to die and restart?
          Hide
          nico.meyer Nico Meyer added a comment - - edited

          The reason is in the line I linked: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270 and the next two lines.

          If the exceptions is derived from java.io.InterruptedIOException the 'suicide-fn' will not be called. I think this was introduced to prevent some test case from killing the process (see STORM-773). I don't think the code should be changed to fix a test, but this special case handling is there now, and in the Java port of executor.clj it was fixed by adding another special case , which I copied in the Pull request.

          I think will also fix it in storm-kafka on Monday, but just wanted it to be fix it in core for the sake of being thorough.

          Show
          nico.meyer Nico Meyer added a comment - - edited The reason is in the line I linked: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270 and the next two lines. If the exceptions is derived from java.io.InterruptedIOException the 'suicide-fn' will not be called. I think this was introduced to prevent some test case from killing the process (see STORM-773 ). I don't think the code should be changed to fix a test, but this special case handling is there now, and in the Java port of executor.clj it was fixed by adding another special case , which I copied in the Pull request. I think will also fix it in storm-kafka on Monday, but just wanted it to be fix it in core for the sake of being thorough.
          Hide
          kristopherkane Kristopher Kane added a comment -

          Good catch on the storm-core part. I don't know how you found that.

          For storm-kafka, https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java#L75 I was thinking either retry with back off or raise RuntimeException which is done in other places when communicating with Kafka (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java#L191) - am wondering what your thoughts are.

          Show
          kristopherkane Kristopher Kane added a comment - Good catch on the storm-core part. I don't know how you found that. For storm-kafka, https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java#L75 I was thinking either retry with back off or raise RuntimeException which is done in other places when communicating with Kafka ( https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java#L191 ) - am wondering what your thoughts are.
          Hide
          Srdo Stig Rohde Døssing added a comment -

          This is caused by STORM-2194 not being fixed on 1.x

          Show
          Srdo Stig Rohde Døssing added a comment - This is caused by STORM-2194 not being fixed on 1.x
          Hide
          nico.meyer Nico Meyer added a comment -

          Not fixing it on 1.x would mean there is no stable version of Storm that could be recommended for production use with a good conscience. At least when using KafkaSpout. Right?

          Is there a plan to release a version based on the master anytime soon?

          Show
          nico.meyer Nico Meyer added a comment - Not fixing it on 1.x would mean there is no stable version of Storm that could be recommended for production use with a good conscience. At least when using KafkaSpout. Right? Is there a plan to release a version based on the master anytime soon?
          Hide
          kabhwan Jungtaek Lim added a comment -

          We could get it fixed for 1.1.1 and release soon. There're some pull requests for storm-kafka-client, and we postponed addressing them for 1.1.0. So releasing 1.1.1 soon is a viable option.

          Show
          kabhwan Jungtaek Lim added a comment - We could get it fixed for 1.1.1 and release soon. There're some pull requests for storm-kafka-client, and we postponed addressing them for 1.1.0. So releasing 1.1.1 soon is a viable option.
          Hide
          nico.meyer Nico Meyer added a comment -

          I think it should be fixed in storm-core also. Who knows where else there is a looming SocketTimeoutException in some spout implementation.

          Show
          nico.meyer Nico Meyer added a comment - I think it should be fixed in storm-core also. Who knows where else there is a looming SocketTimeoutException in some spout implementation.
          Hide
          kabhwan Jungtaek Lim added a comment -

          Thanks Nico Meyer, I merged into 1.x and 1.0.x branches.

          Show
          kabhwan Jungtaek Lim added a comment - Thanks Nico Meyer , I merged into 1.x and 1.0.x branches.

            People

            • Assignee:
              nico.meyer Nico Meyer
              Reporter:
              nico.meyer Nico Meyer
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h
                1h

                  Development