Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
Running follow script in hedwig-server project
while mvn test -Dtest=TestThrottlingDelivery; do echo .; done
We may get assertion failure:
testServerSideThrottle[0](org.apache.hedwig.server.delivery.TestThrottlingDelivery) Time elapsed: 14.922 sec <<< FAILURE!
junit.framework.AssertionFailedError: Timed out waiting for messages 31
at junit.framework.Assert.fail(Assert.java:47)
at junit.framework.Assert.assertTrue(Assert.java:20)
at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:159)
at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:206)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28)
This is a race issue which may cause messages been throttled by mistake, the root cause is ActiveSubscriberState.messageConsumed() and ActiveSubscriberState.deliverNextMessage() may be executed in different threads by AbstractSubscriptionManager and FIFODeliveryManager.
Read the log in attachement around Line 2420 if you want to get more information, here I replay the logs onto the code (Line XX denotes code listed below):
- Messages 1-30 are to be delivered and the message window size in Hub server is 10.
- Messages 1-10 are delivered to subscriber while message 11-30 is throttled by the window size limitation.
- Subscriber calls consume 1-10 asynchronously.
- CONSUME 1 is handled and FIFODeliveryManager continue to deliver message 11.
- Subscriber receive message 11 and quickly ack CONSUME 11 to Hub.
- Now there are two threads operate on a same ActiveSubscriberState:
- Thread in AbstractSubscriptionManager: call ActiveSubscriberState.messageConsumed() for message 2, 3, 11 (4-10 is still on the way since it's asynchronous consume). Let's assume this thread happen to run in Line (14) for message 11.
- Thread in FIFODeliveryManager: Coincidently, it's in Line (36) now (with last delivered 11, last consumed 1 and variable isThrottled is still false).
- If thread in AbstractSubscriptionManager executed before FIFODeliveryManager, then consume operator for 11 does nothing more.
- CONSUME [4-10] will be just ignored by the if statement in Line (2) since lastSeqIdConsumedUtil is now 11.
- Further messages like 12 have no chance to been delivered at this time.
(01) protected void messageConsumed(long newSeqIdConsumed) { (02) if (newSeqIdConsumed <= lastSeqIdConsumedUtil) { (03) return; (04) } (05) if (logger.isDebugEnabled()) { (06) logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.", (07) va(this, lastSeqIdConsumedUtil, newSeqIdConsumed)); (08) } (09) lastSeqIdConsumedUtil = newSeqIdConsumed; (10) // after updated seq id check whether it still exceed msg limitation (11) if (msgLimitExceeded()) { (12) return; (13) } (14) if (isThrottled) { (15) isThrottled = false; (16) logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.", (17) va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil)); (18) (19) enqueueWithoutFailure(new DeliveryManagerRequest() { (20) @Override (21) public void performRequest() { (22) // enqueue (23) clearRetryDelayForSubscriber(ActiveSubscriberState.this); (24) } (25) }); (26) } (27) } (28) (29) public void deliverNextMessage() { (30) if (!isConnected()) { (31) return; (32) } (33) (34) // check whether we have delivered enough messages without receiving their consumes (35) if (msgLimitExceeded()) { (36) logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.", (37) va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil)); (38) isThrottled = true; (39) // do nothing, since the delivery process would be throttled. (40) // After message consumed, it would be added back to retry queue. (41) return; (42) } (43) (44) localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1); (45) (46) ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow, (47) /* callback= */this, /* ctx= */null); (48) (49) persistenceMgr.scanSingleMessage(scanRequest); (50) }
By the way, we should also take care of thread-safe issue in other methods for ActiveSubscriberState, which implements some other callback interface.
Attachments
Attachments
Issue Links
- relates to
-
BOOKKEEPER-539 ClientNotSubscribedException & doesn't receive enough messages in TestThrottlingDelivery#testServerSideThrottle
- Closed