Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-342

Contention in Disruptor Queue which may cause message loss or out of order

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.2-incubating
    • Component/s: storm-core
    • Labels:
      None

      Description

      STORM-342: Message loss, executor hang, or message disorder

      Disruptor helper class contains a potential contention bug between consumer and producer. It can cause consume queue hang, message loss, or message disorder.

      Disruptor.java
      class Disruptor {
      ...
          public void publish(Object obj, boolean block) throws InsufficientCapacityException {
              if(consumerStartedFlag) {
                  final long id;
                  if(block) {
                      id = _buffer.next();
                  } else {
                      id = _buffer.tryNext(1);
                  }
                  final MutableObject m = _buffer.get(id);
                  m.setObject(obj);
                  _buffer.publish(id);
              } else {
                  _cache.add(obj);
                  if(consumerStartedFlag) flushCache();
              }
          }
      
          public void consumerStarted() {
              if(!consumerStartedFlag) {
                  consumerStartedFlag = true;
                  flushCache();
              }
          }
      }
      

      Consumer

      Task Executor Thread
        (disruptor/consumer-started! receive-queue)
        (fn []            
           (disruptor/consume-batch-when-available receive-queue event-handler)
      

      Howto: Executor Hang, message loss:

      1. [Consumer Thread] consumer not started.
      2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache.
      3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
      4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer.
      5. [Producer B Thread] generates enough message, and make RingBuffer full.
      6. [Consumer Thread] flushCache() is called in consumerStarted()
      7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking way, As now RingBuffer is full, the consumer thread will be blocked.
      8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is always full, and the consumer thread is always blocked.

      Howto: Message Disorder

      1. [Consumer Thread] consumer not started.
      2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache.
      3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
      4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer.
      5. [Producer A Thread] publish a new message "2", it will be published directly in RingBuffer.
      6. [Consumer Thread] flushCache() is called in consumerStarted()
      7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE message is written after message "2".
      8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it picks FLUSH_CACHE, will represents "1"
      9. We produce in Producer A Thread in order "1", "2", but we received in consumer thread "2", "1"
      10. Message order is wrong.

      I found this after troubleshooting a tricky random failure(1 in 100 times). It usually happen when producer and consumer colocated in same process, for example, the task send queue thread as producer, produce message to local task receive queue in same worker.

        Activity

        Hide
        clockfly Sean Zhong added a comment - - edited

        why cache is introduced in Disruptor queue? The logic for SingleThreadedClaimStrategy is different with MultiThreadedClaimStrategy.

        For SingleThreadedClaimStrategy, it will mark consumerStartedFlag directly in constructor, and no cache will be used.

        In Disprutor,

        Disruptor.java
            public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
                 this._queueName = PREFIX + queueName;
                 
                 LOG.info("========Disruptor queueName" +  _queueName + ", " + claim.getClass().toString() + ", " + wait.getClass().toString());
                 
                _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
                _consumer = new Sequence();
                _barrier = _buffer.newBarrier();
                _buffer.setGatingSequences(_consumer);
                if(claim instanceof SingleThreadedClaimStrategy) {
                    consumerStartedFlag = true;
                }
            }
        
        Show
        clockfly Sean Zhong added a comment - - edited why cache is introduced in Disruptor queue? The logic for SingleThreadedClaimStrategy is different with MultiThreadedClaimStrategy. For SingleThreadedClaimStrategy, it will mark consumerStartedFlag directly in constructor, and no cache will be used. In Disprutor, Disruptor.java public DisruptorQueue( String queueName, ClaimStrategy claim, WaitStrategy wait) { this ._queueName = PREFIX + queueName; LOG.info( "========Disruptor queueName" + _queueName + ", " + claim.getClass().toString() + ", " + wait.getClass().toString()); _buffer = new RingBuffer<MutableObject>( new ObjectEventFactory(), claim, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.setGatingSequences(_consumer); if (claim instanceof SingleThreadedClaimStrategy) { consumerStartedFlag = true ; } }
        Hide
        clockfly Sean Zhong added a comment -

        This have more seveve impact.

        It can cause hang, and message loss.
        Suppose, during this contention time, publisher has published enough message, and make the buffer full.
        Then when consumer call flushcache() to publish FLUSH_CACHE message into RingBuffer, it will block forever, as there is no free slots available.

        Show
        clockfly Sean Zhong added a comment - This have more seveve impact. It can cause hang, and message loss. Suppose, during this contention time, publisher has published enough message, and make the buffer full. Then when consumer call flushcache() to publish FLUSH_CACHE message into RingBuffer, it will block forever, as there is no free slots available.
        Hide
        revans2 Robert Joseph Evans added a comment -

        Wow great work finding this, it looks like it was a beast to debug. Do you have a fix in mind for this?

        Show
        revans2 Robert Joseph Evans added a comment - Wow great work finding this, it looks like it was a beast to debug. Do you have a fix in mind for this?
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user clockfly opened a pull request:

        https://github.com/apache/incubator-storm/pull/136

        STORM-342: Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.

        STORM-342: Message loss, executor hang, or message disorder
        -------------------------

        Disruptor helper class contains a potential contention bug between consumer and producer. It can cause consume queue hang, message loss, or message disorder.

        ```java
        Disruptor.java
        class Disruptor {
        ...
        public void publish(Object obj, boolean block) throws InsufficientCapacityException {
        if(consumerStartedFlag) {
        final long id;
        if(block)

        { id = _buffer.next(); }

        else

        { id = _buffer.tryNext(1); }

        final MutableObject m = _buffer.get(id);
        m.setObject(obj);
        _buffer.publish(id);
        } else

        { _cache.add(obj); if(consumerStartedFlag) flushCache(); }

        }

        public void consumerStarted() {
        if(!consumerStartedFlag)

        { consumerStartedFlag = true; flushCache(); }

        }
        }
        ```

        Consumer
        ```lisp
        ;;Executor thead
        (disruptor/consumer-started! receive-queue)
        (fn []
        (disruptor/consume-batch-when-available receive-queue event-handler)
        ```

        Howto: Executor Hang, message loss:
        ------------------------
        1. [Consumer Thread] consumer not started.
        2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache.
        3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
        4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer.
        5. [Producer B Thread] generates enough message, and make RingBuffer full.
        6. [Consumer Thread] flushCache() is called in consumerStarted()
        7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking way, As now RingBuffer is full, the consumer thread will be blocked.
        8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is always full, and the consumer thread is always blocked.

        Howto: Message Disorder
        -----------------------------------
        1. [Consumer Thread] consumer not started.
        2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache.
        3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
        4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer.
        5. [Producer A Thread] publish a new message "2", it will be published directly in RingBuffer.
        6. [Consumer Thread] flushCache() is called in consumerStarted()
        7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE message is written after message "2".
        8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it picks FLUSH_CACHE, will represents "1"
        9. We produce in Producer A Thread in order "1", "2", but we received in consumer thread "2", "1"
        10. Message order is wrong.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/clockfly/incubator-storm disruptor_message_loss_hang_or_disorder

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/incubator-storm/pull/136.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #136


        commit 72b1f592885abc8c02c6902aa0eb6499bacae7f2
        Author: Sean Zhong <clockfly@gmail.com>
        Date: 2014-06-10T11:54:11Z

        STORM-342: Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user clockfly opened a pull request: https://github.com/apache/incubator-storm/pull/136 STORM-342 : Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode. STORM-342 : Message loss, executor hang, or message disorder ------------------------- Disruptor helper class contains a potential contention bug between consumer and producer. It can cause consume queue hang, message loss, or message disorder. ```java Disruptor.java class Disruptor { ... public void publish(Object obj, boolean block) throws InsufficientCapacityException { if(consumerStartedFlag) { final long id; if(block) { id = _buffer.next(); } else { id = _buffer.tryNext(1); } final MutableObject m = _buffer.get(id); m.setObject(obj); _buffer.publish(id); } else { _cache.add(obj); if(consumerStartedFlag) flushCache(); } } public void consumerStarted() { if(!consumerStartedFlag) { consumerStartedFlag = true; flushCache(); } } } ``` Consumer ```lisp ;;Executor thead (disruptor/consumer-started! receive-queue) (fn [] (disruptor/consume-batch-when-available receive-queue event-handler) ``` Howto: Executor Hang, message loss: ------------------------ 1. [Consumer Thread] consumer not started. 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache. 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet. 4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer. 5. [Producer B Thread] generates enough message, and make RingBuffer full. 6. [Consumer Thread] flushCache() is called in consumerStarted() 7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking way, As now RingBuffer is full, the consumer thread will be blocked. 8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is always full, and the consumer thread is always blocked. Howto: Message Disorder ----------------------------------- 1. [Consumer Thread] consumer not started. 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache. 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet. 4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer. 5. [Producer A Thread] publish a new message "2", it will be published directly in RingBuffer. 6. [Consumer Thread] flushCache() is called in consumerStarted() 7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE message is written after message "2". 8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it picks FLUSH_CACHE, will represents "1" 9. We produce in Producer A Thread in order "1", "2", but we received in consumer thread "2", "1" 10. Message order is wrong. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clockfly/incubator-storm disruptor_message_loss_hang_or_disorder Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-storm/pull/136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #136 commit 72b1f592885abc8c02c6902aa0eb6499bacae7f2 Author: Sean Zhong <clockfly@gmail.com> Date: 2014-06-10T11:54:11Z STORM-342 : Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user clockfly commented on the pull request:

        https://github.com/apache/incubator-storm/pull/136#issuecomment-45607328

        This bug was found during a scenario test, it happens 1 in 100 times.
        With this fix, I rerun the same test for 500 times, and not seeing this error again.

        Show
        githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/136#issuecomment-45607328 This bug was found during a scenario test, it happens 1 in 100 times. With this fix, I rerun the same test for 500 times, and not seeing this error again.
        Show
        clockfly Sean Zhong added a comment - Robert Joseph Evans , patch ready https://github.com/apache/incubator-storm/pull/136
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user gna-phetsarath commented on the pull request:

        https://github.com/apache/incubator-storm/pull/136#issuecomment-45762704

        Is this patch compatible with storm-0.8.1? or do we have to upgrade and patch?

        Show
        githubbot ASF GitHub Bot added a comment - Github user gna-phetsarath commented on the pull request: https://github.com/apache/incubator-storm/pull/136#issuecomment-45762704 Is this patch compatible with storm-0.8.1? or do we have to upgrade and patch?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user revans2 commented on a diff in the pull request:

        https://github.com/apache/incubator-storm/pull/136#discussion_r13663702

        — Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java —
        @@ -62,6 +69,13 @@ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait)
        _buffer.setGatingSequences(_consumer);
        if(claim instanceof SingleThreadedClaimStrategy)

        { consumerStartedFlag = true; + }

        else {
        + // make sure we flush the pending messages in cache first
        + try

        { + publishDirect(FLUSH_CACHE, true); + }

        catch (InsufficientCapacityException e) {
        + throw new RuntimeException("This code should be unreachable!");
        — End diff –

        Can we include e as the cause of the RuntimeException. There have been times where I thought code should be unreachable, and when we did reach it in production I had no way to debug what happened.

        Show
        githubbot ASF GitHub Bot added a comment - Github user revans2 commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/136#discussion_r13663702 — Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java — @@ -62,6 +69,13 @@ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) _buffer.setGatingSequences(_consumer); if(claim instanceof SingleThreadedClaimStrategy) { consumerStartedFlag = true; + } else { + // make sure we flush the pending messages in cache first + try { + publishDirect(FLUSH_CACHE, true); + } catch (InsufficientCapacityException e) { + throw new RuntimeException("This code should be unreachable!"); — End diff – Can we include e as the cause of the RuntimeException. There have been times where I thought code should be unreachable, and when we did reach it in production I had no way to debug what happened.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user clockfly commented on the pull request:

        https://github.com/apache/incubator-storm/pull/136#issuecomment-45818844

        @gna-phetsarath, it should be compatible with 0.8.1.

        Show
        githubbot ASF GitHub Bot added a comment - Github user clockfly commented on the pull request: https://github.com/apache/incubator-storm/pull/136#issuecomment-45818844 @gna-phetsarath, it should be compatible with 0.8.1.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user clockfly commented on a diff in the pull request:

        https://github.com/apache/incubator-storm/pull/136#discussion_r13683241

        — Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java —
        @@ -62,6 +69,13 @@ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait)
        _buffer.setGatingSequences(_consumer);
        if(claim instanceof SingleThreadedClaimStrategy)

        { consumerStartedFlag = true; + }

        else {
        + // make sure we flush the pending messages in cache first
        + try

        { + publishDirect(FLUSH_CACHE, true); + }

        catch (InsufficientCapacityException e) {
        + throw new RuntimeException("This code should be unreachable!");
        — End diff –

        fixed.

        Show
        githubbot ASF GitHub Bot added a comment - Github user clockfly commented on a diff in the pull request: https://github.com/apache/incubator-storm/pull/136#discussion_r13683241 — Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java — @@ -62,6 +69,13 @@ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) _buffer.setGatingSequences(_consumer); if(claim instanceof SingleThreadedClaimStrategy) { consumerStartedFlag = true; + } else { + // make sure we flush the pending messages in cache first + try { + publishDirect(FLUSH_CACHE, true); + } catch (InsufficientCapacityException e) { + throw new RuntimeException("This code should be unreachable!"); — End diff – fixed.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user revans2 commented on the pull request:

        https://github.com/apache/incubator-storm/pull/136#issuecomment-45918625

        Sorry it took me so long to finish the review. I wanted to be sure I truly understood what was happening. +1 looks like a great fix. If someone else could take a look at this that would be awesome.

        Show
        githubbot ASF GitHub Bot added a comment - Github user revans2 commented on the pull request: https://github.com/apache/incubator-storm/pull/136#issuecomment-45918625 Sorry it took me so long to finish the review. I wanted to be sure I truly understood what was happening. +1 looks like a great fix. If someone else could take a look at this that would be awesome.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user gna-phetsarath commented on the pull request:

        https://github.com/apache/incubator-storm/pull/136#issuecomment-45921906

        I had to fork and backport it to 0.8.1-- I could not take the patch directly. I'll create a pull request once we test it on our side. So far, we are seeing positive results. Will be creating a pull request soon. Thanks for the fix.

        https://github.com/gna-phetsarath/incubator-storm/tree/0.8.1-disruptor_queue_fix

        Show
        githubbot ASF GitHub Bot added a comment - Github user gna-phetsarath commented on the pull request: https://github.com/apache/incubator-storm/pull/136#issuecomment-45921906 I had to fork and backport it to 0.8.1-- I could not take the patch directly. I'll create a pull request once we test it on our side. So far, we are seeing positive results. Will be creating a pull request soon. Thanks for the fix. https://github.com/gna-phetsarath/incubator-storm/tree/0.8.1-disruptor_queue_fix
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user ptgoetz commented on the pull request:

        https://github.com/apache/incubator-storm/pull/136#issuecomment-45942819

        +1 looks good.

        Show
        githubbot ASF GitHub Bot added a comment - Github user ptgoetz commented on the pull request: https://github.com/apache/incubator-storm/pull/136#issuecomment-45942819 +1 looks good.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/incubator-storm/pull/136

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/incubator-storm/pull/136

          People

          • Assignee:
            clockfly Sean Zhong
            Reporter:
            clockfly Sean Zhong
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development