Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.0, 1.0.2
    • Fix Version/s: 2.0.0, 1.0.4, 1.1.1, 1.2.0
    • Component/s: storm-core
    • Labels:
      None

      Description

      I've been trying to track down a cause of some of our issues with some exceptions leaving Storm workers in a zombified state for some time. I believe I've isolated the bug to the behaviour in :report-error-and-die/reportErrorAndDie in the executor. Essentially:

           :report-error-and-die (fn [error]
                                   (try
                                     ((:report-error <>) error)
                                     (catch Exception e
                                       (log-message "Error while reporting error to cluster, proceeding with shutdown")))
                                   (if (or
                                          (exception-cause? InterruptedException error)
                                          (exception-cause? java.io.InterruptedIOException error))
                                     (log-message "Got interrupted excpetion shutting thread down...")
                                     ((:suicide-fn <>))))
      

      has the grouping for the if statement slightly wrong. It shouldn't log OR die from InterruptedException/InterruptedIOException, but it should log under that condition, and ALWAYS die.

      Basically:

           :report-error-and-die (fn [error]
                                   (try
                                     ((:report-error <>) error)
                                     (catch Exception e
                                       (log-message "Error while reporting error to cluster, proceeding with shutdown")))
                                   (if (or
                                          (exception-cause? InterruptedException error)
                                          (exception-cause? java.io.InterruptedIOException error))
                                     (log-message "Got interrupted excpetion shutting thread down..."))
                                   ((:suicide-fn <>)))
      

      After digging into the Java port of this code, it looks like a different bug was introduced while porting:

              if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)
                      || Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) {
                  LOG.info("Got interrupted exception shutting thread down...");
                  suicideFn.run();
              }
      

      Was how this was initially ported, and STORM-2142 changed this to:

              if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)
                      || Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) {
                  LOG.info("Got interrupted exception shutting thread down...");
              } else {
                  suicideFn.run();
              }
      

      However, I believe the correct port is as described above:

              if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)
                      || Utils.exceptionCauseIsInstanceOf(java.io.InterruptedIOException.class, e)) {
                  LOG.info("Got interrupted exception shutting thread down...");
              }
              suicideFn.run();
      

      I'll look into providing patches for the 1.x and 2.x branches shortly.

        Issue Links

          Activity

          Hide
          kabhwan Jungtaek Lim added a comment -

          Miss porting back to 1.1.x-branch. Will do.

          Show
          kabhwan Jungtaek Lim added a comment - Miss porting back to 1.1.x-branch. Will do.
          Hide
          sathyafmt Sathya F added a comment -

          Thanks Craig ! We have two problems with 1.0.2 & your patch fixes them both.

          1. when storm workers start & they are not able to bind to 56700 (the rmi port), they hang around and do not die. This is easy to reproduce, I started a nc -l 56700 & started the topology. With your patch, it dies & the supervisor restarts them back again.
          2016-12-01 04:24:41.721 STDERR [INFO] Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 56700; nested exception is:
          2016-12-01 04:24:41.722 STDERR [INFO] java.net.BindException: Address already in use

          2. the storm workers hit a SocketTimeoutException & they don't die. The log:
          2016-11-30 15:21:29.004 o.a.s.util [ERROR] Async loop died!
          java.lang.RuntimeException: org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException
          at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.daemon.executor$fn_8058$fn8071$fn_8124.invoke(executor.clj:850) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
          at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
          at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
          Caused by: org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException
          at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~[stormjar.jar:1.0.0]
          at org.apache.storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:141) ~[stormjar.jar:1.0.0]
          at org.apache.storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:114) ~[stormjar.jar:1.0.0]
          at org.apache.storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:68) ~[stormjar.jar:1.0.0]
          at org.apache.storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:42) ~[stormjar.jar:1.0.0]
          at org.apache.storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:265) ~[stormjar.jar:1.0.0]
          at org.apache.storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:257) ~[stormjar.jar:1.0.0]
          at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.daemon.executor$fn_8058$tuple_action_fn_8060.invoke(executor.clj:731) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
          at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
          ... 6 more
          2016-11-30 15:21:29.028 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down...
          2016-11-30 16:20:59.013 s.d.C.DBPoolDataSource-pool-ds [INFO] DBPoolDataSource-pool-ds: Destroyed connection
          2016-11-30 16:21:14.014 s.d.C.DBPoolDataSource-pool-ds [INFO] DBPoolDataSource-pool-ds: Destroyed connection

          Show
          sathyafmt Sathya F added a comment - Thanks Craig ! We have two problems with 1.0.2 & your patch fixes them both. 1. when storm workers start & they are not able to bind to 56700 (the rmi port), they hang around and do not die. This is easy to reproduce, I started a nc -l 56700 & started the topology. With your patch, it dies & the supervisor restarts them back again. 2016-12-01 04:24:41.721 STDERR [INFO] Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 56700; nested exception is: 2016-12-01 04:24:41.722 STDERR [INFO] java.net.BindException: Address already in use 2. the storm workers hit a SocketTimeoutException & they don't die. The log: 2016-11-30 15:21:29.004 o.a.s.util [ERROR] Async loop died! java.lang.RuntimeException: org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.daemon.executor$fn_ 8058$fn 8071$fn _8124.invoke(executor.clj:850) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91] Caused by: org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~ [stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:141) ~ [stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:114) ~ [stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:68) ~ [stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:42) ~ [stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:265) ~ [stormjar.jar:1.0.0] at org.apache.storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:257) ~ [stormjar.jar:1.0.0] at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.daemon.executor$fn_ 8058$tuple_action_fn _8060.invoke(executor.clj:731) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~ [storm-core-1.0.2.jar:1.0.2] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~ [storm-core-1.0.2.jar:1.0.2] ... 6 more 2016-11-30 15:21:29.028 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down... 2016-11-30 16:20:59.013 s.d.C.DBPoolDataSource-pool-ds [INFO] DBPoolDataSource-pool-ds: Destroyed connection 2016-11-30 16:21:14.014 s.d.C.DBPoolDataSource-pool-ds [INFO] DBPoolDataSource-pool-ds: Destroyed connection
          Hide
          chawco Craig Hawco added a comment -

          I attached a scrubbed thread dump showing the issue.

          This worker should have 24 spout threads, and 24 b-0 threads, however we can see that one of the b-0 threads has died, but left the worker running:

          Sulla:Desktop chawco$ grep spout- scrubbed-thread-dump.txt | grep -v SendThread | grep -v EventThread | wc -l
                24
          Sulla:Desktop chawco$ grep b-0- scrubbed-thread-dump.txt | wc -l
                23
          Sulla:Desktop chawco$
          
          Show
          chawco Craig Hawco added a comment - I attached a scrubbed thread dump showing the issue. This worker should have 24 spout threads, and 24 b-0 threads, however we can see that one of the b-0 threads has died, but left the worker running: Sulla:Desktop chawco$ grep spout- scrubbed-thread-dump.txt | grep -v SendThread | grep -v EventThread | wc -l 24 Sulla:Desktop chawco$ grep b-0- scrubbed-thread-dump.txt | wc -l 23 Sulla:Desktop chawco$
          Hide
          chawco Craig Hawco added a comment -

          Showing 24 spout threads as expected, but only 23 b-0 threads

          Show
          chawco Craig Hawco added a comment - Showing 24 spout threads as expected, but only 23 b-0 threads
          Hide
          chawco Craig Hawco added a comment -

          Checking internally to make sure it's OK to post stacks that contain some internal information, however the dump does confirm we have the expected 24 spout threads, but only 23 bolt threads – one of them has died but not shut down the process.

          Show
          chawco Craig Hawco added a comment - Checking internally to make sure it's OK to post stacks that contain some internal information, however the dump does confirm we have the expected 24 spout threads, but only 23 bolt threads – one of them has died but not shut down the process.
          Hide
          chawco Craig Hawco added a comment -

          Just managed to snag a thread dump of a worker I think was exhibiting this behaviour. Will take a look at it tomorrow, and attach it here.

          Show
          chawco Craig Hawco added a comment - Just managed to snag a thread dump of a worker I think was exhibiting this behaviour. Will take a look at it tomorrow, and attach it here.
          Hide
          chawco Craig Hawco added a comment -

          It does appear to me that the executor's report-error-and-die is the fn passed to the bolt creation multimethod mk-threads :bolt, via executor-data, and this is what gets invoked in the async-loop for the outer catch clause against Throwable if the exception is an InterruptedIOException, which then applies the above logic.

          I don't see how this behaviour could possibly be correct now, unless there's some other mechanism for causing workers to exit – any bolt that throws an unhandled exception that has InterruptedIOException in the chain will invoke :report-error-and-die, which then happily logs and does nothing, leaving the process otherwise running.

          I'm fairly certain there is at least SOME bug here, so now the question is:

          1. Do we just log and always suicide? STORM-772 and STORM-773 indicate that we DID actually want to ignore SOME of these exceptions.
          2. Do we just log and ignore if and only if error is ITSELF an InterruptedException/InterruptedIOException? This now seems to be the most likely intent given the discussion in those two other tickets.
          3. There's some other mechanism for making workers exit that I'm not aware – entirely possible, but I'm not sure how I'd go about finding it if that's the case.
          Show
          chawco Craig Hawco added a comment - It does appear to me that the executor's report-error-and-die is the fn passed to the bolt creation multimethod mk-threads :bolt , via executor-data , and this is what gets invoked in the async-loop for the outer catch clause against Throwable if the exception is an InterruptedIOException , which then applies the above logic. I don't see how this behaviour could possibly be correct now, unless there's some other mechanism for causing workers to exit – any bolt that throws an unhandled exception that has InterruptedIOException in the chain will invoke :report-error-and-die , which then happily logs and does nothing, leaving the process otherwise running. I'm fairly certain there is at least SOME bug here, so now the question is: Do we just log and always suicide? STORM-772 and STORM-773 indicate that we DID actually want to ignore SOME of these exceptions. Do we just log and ignore if and only if error is ITSELF an InterruptedException / InterruptedIOException ? This now seems to be the most likely intent given the discussion in those two other tickets. There's some other mechanism for making workers exit that I'm not aware – entirely possible, but I'm not sure how I'd go about finding it if that's the case.
          Hide
          chawco Craig Hawco added a comment - - edited

          Will do, however after looking through STORM-772 and STORM-773, it's not entirely clear what the intent was.

          Basically, my understand is that the exception passed here is the exception from the worker – and exception-cause? walks the entire exception chain checking for InterruptedException or InterruptedIOException – which may have been due to a bolt talking to an external service (e.g. SocketTimeoutException is an InterruptedIOException).

          So, some options:

          1. My understanding is incorrect, and this error is NOT from the bolt itself
          2. exception-cause? doesn't walk the entire exception tree as I thought
          3. This behaviour is mostly intentional, but should only be applied if error itself is InterruptedException or InterruptedIOException, but not if those appear anywhere in the chain

          I think #2 above is the case:

          user=> (def ex (new RuntimeException (new InterruptedException)))
          #'user/ex
          (defn exception-cause?
            [klass ^Throwable t]
           (->> (iterate #(.getCause ^Throwable %) t)
                 (take-while identity)
                 (some (partial instance? klass))
                 boolean))
          #'user/exception-cause?
          user=> (exception-cause? InterruptedException ex)
          true
          user=>
          

          So, either the exception isn't the one raised by the bolt (I'll look for some evidence to support this next), the check should only be ignoring errors that are themselves InterruptedException/InterruptedIOException but not looking elsewhere in the chain, or this was intended as just a logging line, and sucide-fn should still get invoked after logging.

          Show
          chawco Craig Hawco added a comment - - edited Will do, however after looking through STORM-772 and STORM-773 , it's not entirely clear what the intent was. Basically, my understand is that the exception passed here is the exception from the worker – and exception-cause? walks the entire exception chain checking for InterruptedException or InterruptedIOException – which may have been due to a bolt talking to an external service (e.g. SocketTimeoutException is an InterruptedIOException ). So, some options: My understanding is incorrect, and this error is NOT from the bolt itself exception-cause? doesn't walk the entire exception tree as I thought This behaviour is mostly intentional, but should only be applied if error itself is InterruptedException or InterruptedIOException , but not if those appear anywhere in the chain I think #2 above is the case: user=> (def ex ( new RuntimeException ( new InterruptedException))) #'user/ex (defn exception-cause? [klass ^Throwable t] (->> (iterate #(.getCause ^Throwable %) t) (take- while identity) (some (partial instance? klass)) boolean )) #'user/exception-cause? user=> (exception-cause? InterruptedException ex) true user=> So, either the exception isn't the one raised by the bolt (I'll look for some evidence to support this next), the check should only be ignoring errors that are themselves InterruptedException / InterruptedIOException but not looking elsewhere in the chain, or this was intended as just a logging line, and sucide-fn should still get invoked after logging.
          Hide
          kabhwan Jungtaek Lim added a comment -

          Could you attach thread dump when you got zombie Storm worker? That was introduced on STORM-773 but haven't received similar report.

          Show
          kabhwan Jungtaek Lim added a comment - Could you attach thread dump when you got zombie Storm worker? That was introduced on STORM-773 but haven't received similar report.
          Hide
          chawco Craig Hawco added a comment -

          This has PRs for 1.0.x-branch and master – I'm not sure what the procedure is for 1.x-branch is. Please let me know if I should open another PR for that branch.

          Show
          chawco Craig Hawco added a comment - This has PRs for 1.0.x-branch and master – I'm not sure what the procedure is for 1.x-branch is. Please let me know if I should open another PR for that branch.

            People

            • Assignee:
              ppoulosk Paul Poulosky
              Reporter:
              chawco Craig Hawco
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 2h 40m
                2h 40m

                  Development