Uploaded image for project: 'Bookkeeper'
  1. Bookkeeper
  2. BOOKKEEPER-503

The test case of TestThrottlingDelivery#testServerSideThrottle failed sometimes

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 4.2.0
    • Component/s: hedwig-server
    • Labels:
      None

      Description

      Running follow script in hedwig-server project

      while mvn test -Dtest=TestThrottlingDelivery; do echo .; done
      

      We may get assertion failure:

      testServerSideThrottle[0](org.apache.hedwig.server.delivery.TestThrottlingDelivery)  Time elapsed: 14.922 sec  <<< FAILURE!
      junit.framework.AssertionFailedError: Timed out waiting for messages 31
      	at junit.framework.Assert.fail(Assert.java:47)
      	at junit.framework.Assert.assertTrue(Assert.java:20)
      	at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:159)
      	at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:206)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:601)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
      	at org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28)
      

      This is a race issue which may cause messages been throttled by mistake, the root cause is ActiveSubscriberState.messageConsumed() and ActiveSubscriberState.deliverNextMessage() may be executed in different threads by AbstractSubscriptionManager and FIFODeliveryManager.

      Read the log in attachement around Line 2420 if you want to get more information, here I replay the logs onto the code (Line XX denotes code listed below):

      1. Messages 1-30 are to be delivered and the message window size in Hub server is 10.
      2. Messages 1-10 are delivered to subscriber while message 11-30 is throttled by the window size limitation.
      3. Subscriber calls consume 1-10 asynchronously.
      4. CONSUME 1 is handled and FIFODeliveryManager continue to deliver message 11.
      5. Subscriber receive message 11 and quickly ack CONSUME 11 to Hub.
      6. Now there are two threads operate on a same ActiveSubscriberState:
        • Thread in AbstractSubscriptionManager: call ActiveSubscriberState.messageConsumed() for message 2, 3, 11 (4-10 is still on the way since it's asynchronous consume). Let's assume this thread happen to run in Line (14) for message 11.
        • Thread in FIFODeliveryManager: Coincidently, it's in Line (36) now (with last delivered 11, last consumed 1 and variable isThrottled is still false).
      7. If thread in AbstractSubscriptionManager executed before FIFODeliveryManager, then consume operator for 11 does nothing more.
      8. CONSUME [4-10] will be just ignored by the if statement in Line (2) since lastSeqIdConsumedUtil is now 11.
      9. Further messages like 12 have no chance to been delivered at this time.
      (01) protected void messageConsumed(long newSeqIdConsumed) {
      (02)     if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
      (03)         return;
      (04)     }
      (05)     if (logger.isDebugEnabled()) {
      (06)         logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
      (07)                      va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
      (08)     }
      (09)     lastSeqIdConsumedUtil = newSeqIdConsumed;
      (10)     // after updated seq id check whether it still exceed msg limitation
      (11)     if (msgLimitExceeded()) {
      (12)         return;
      (13)     }
      (14)     if (isThrottled) {
      (15)         isThrottled = false;
      (16)         logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
      (17)                     va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
      (18) 
      (19)         enqueueWithoutFailure(new DeliveryManagerRequest() {
      (20)             @Override
      (21)             public void performRequest() {
      (22)                 // enqueue 
      (23)                 clearRetryDelayForSubscriber(ActiveSubscriberState.this);            
      (24)             }
      (25)         });
      (26)     }
      (27) }
      (28) 
      (29) public void deliverNextMessage() {
      (30)     if (!isConnected()) {
      (31)         return;
      (32)     }
      (33) 
      (34)     // check whether we have delivered enough messages without receiving their consumes
      (35)     if (msgLimitExceeded()) {
      (36)         logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.",
      (37)                     va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
      (38)         isThrottled = true;
      (39)         // do nothing, since the delivery process would be throttled.
      (40)         // After message consumed, it would be added back to retry queue.
      (41)         return;
      (42)     }
      (43) 
      (44)     localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
      (45) 
      (46)     ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
      (47)             /* callback= */this, /* ctx= */null);
      (48) 
      (49)     persistenceMgr.scanSingleMessage(scanRequest);
      (50) }
      

      By the way, we should also take care of thread-safe issue in other methods for ActiveSubscriberState, which implements some other callback interface.

        Attachments

        1. logs.tar
          100 kB
          Jiannan Wang
        2. 0001-BOOKKEEPER-503-Test-case.patch
          9 kB
          Ivan Kelly
        3. BOOKKEEPER-503.patch
          17 kB
          Jiannan Wang
        4. BOOKKEEPER-503.patch
          16 kB
          Sijie Guo

          Issue Links

            Activity

              People

              • Assignee:
                jiannan Jiannan Wang
                Reporter:
                jiannan Jiannan Wang
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: