Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-342

Contention in Disruptor Queue which may cause message loss or out of order

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 0.9.2-incubating
    • storm-core
    • None

    Description

      STORM-342: Message loss, executor hang, or message disorder

      Disruptor helper class contains a potential contention bug between consumer and producer. It can cause consume queue hang, message loss, or message disorder.

      Disruptor.java
      class Disruptor {
      ...
          public void publish(Object obj, boolean block) throws InsufficientCapacityException {
              if(consumerStartedFlag) {
                  final long id;
                  if(block) {
                      id = _buffer.next();
                  } else {
                      id = _buffer.tryNext(1);
                  }
                  final MutableObject m = _buffer.get(id);
                  m.setObject(obj);
                  _buffer.publish(id);
              } else {
                  _cache.add(obj);
                  if(consumerStartedFlag) flushCache();
              }
          }
      
          public void consumerStarted() {
              if(!consumerStartedFlag) {
                  consumerStartedFlag = true;
                  flushCache();
              }
          }
      }
      

      Consumer

      Task Executor Thread
        (disruptor/consumer-started! receive-queue)
        (fn []            
           (disruptor/consume-batch-when-available receive-queue event-handler)
      

      Howto: Executor Hang, message loss:

      1. [Consumer Thread] consumer not started.
      2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache.
      3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
      4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer.
      5. [Producer B Thread] generates enough message, and make RingBuffer full.
      6. [Consumer Thread] flushCache() is called in consumerStarted()
      7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking way, As now RingBuffer is full, the consumer thread will be blocked.
      8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is always full, and the consumer thread is always blocked.

      Howto: Message Disorder

      1. [Consumer Thread] consumer not started.
      2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, it will be added it into cache.
      3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to true, but flushCache() is not called yet.
      4. As "consumerStartedFlag" is true now, new produced message will be published to RingBuffer.
      5. [Producer A Thread] publish a new message "2", it will be published directly in RingBuffer.
      6. [Consumer Thread] flushCache() is called in consumerStarted()
      7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE message is written after message "2".
      8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it picks FLUSH_CACHE, will represents "1"
      9. We produce in Producer A Thread in order "1", "2", but we received in consumer thread "2", "1"
      10. Message order is wrong.

      I found this after troubleshooting a tricky random failure(1 in 100 times). It usually happen when producer and consumer colocated in same process, for example, the task send queue thread as producer, produce message to local task receive queue in same worker.

      Attachments

        Activity

          People

            clockfly Sean Zhong
            clockfly Sean Zhong
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: