Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-4589

Race condition in AjaxListener causes lost messages

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.8.0
    • 5.9.0
    • None
    • None
    • Jetty 9.0.3, Java 7, Windows

    • Patch Available

    Description

      Running activemq-web under a container that supports Servlet 3, there is a race condition when resuming continuations that leads to lost messages because the same continuation is resumed multiple times and the second time around the undelivered_message attribute is overwritten.

      Here's an example debug log of the issue under Jetty:

      2013-06-19 10:46:51,340 DEBUG o.a.a.web.MessageListenerServlet   - doMessage timeout=25000
      2013-06-19 10:46:51,350 DEBUG o.a.a.web.MessageListenerServlet   - received null from ActiveMQMessageConsumer { value=ID:deapp0313-52345-1371228095770-3:126:1:1, started=true }
      2013-06-19 10:46:51,350 DEBUG o.a.a.web.MessageListenerServlet   - Suspending continuation org.eclipse.jetty.continuation.Servlet3Continuation@73d4a75a
      2013-06-19 10:46:53,053 DEBUG org.apache.activemq.web.AjaxListener   - message is ActiveMQTextMessage {commandId = 4419, responseRequired = true, messageId = ID:deapp0313-52345-1371228095770-1:1:1:1:4415, originalDestination = null, originalTransactionId = null, producerId = ID:deapp0313-52345-1371228095770-1:1:1:1, destination = topic://W2ETopic, transactionId = null, expiration = 0, timestamp = 1371631612606, arrival = 0, brokerInTime = 1371631612628, brokerOutTime = 1371631612629, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1e5bc429, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {instance=1796}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = {"instance":{"instanceId":1796,"changeNo":217...nput":true}}}
      2013-06-19 10:46:53,062 DEBUG org.apache.activemq.web.AjaxListener   - Resuming suspended continuation org.eclipse.jetty.continuation.Servlet3Continuation@73d4a75a
      2013-06-19 10:46:53,063 DEBUG org.apache.activemq.web.AjaxListener   - message for ActiveMQMessageConsumer { value=ID:deapp0313-52345-1371228095770-3:126:1:1, started=true } continuation=org.eclipse.jetty.continuation.Servlet3Continuation@73d4a75a
      2013-06-19 10:46:53,064 DEBUG org.apache.activemq.web.AjaxListener   - message is ActiveMQTextMessage {commandId = 4420, responseRequired = true, messageId = ID:deapp0313-52345-1371228095770-1:1:1:1:4416, originalDestination = null, originalTransactionId = null, producerId = ID:deapp0313-52345-1371228095770-1:1:1:1, destination = topic://W2ETopic, transactionId = null, expiration = 0, timestamp = 1371631612644, arrival = 0, brokerInTime = 1371631612644, brokerOutTime = 1371631613022, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1aab6f51, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {instance=1796}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = {"instance":{"instanceId":1796,"changeNo":217...nput":true}}}
      2013-06-19 10:46:53,064 DEBUG org.apache.activemq.web.AjaxListener   - Resuming suspended continuation org.eclipse.jetty.continuation.Servlet3Continuation@73d4a75a
      2013-06-19 10:46:53,065 DEBUG o.a.a.web.MessageListenerServlet   - GET client=org.apache.activemq.web.AjaxWebClient@7c00281 session=c5qseeifcc9l1n49e7ivdx3at clientId=w2e-gui-0.18586184200830758 uri=/amq query=timeout=120000&d=1371631611528&r=0.4084213834721595&clientId=w2e-gui-0.18586184200830758
      2013-06-19 10:46:53,065 DEBUG o.a.a.web.MessageListenerServlet   - doMessage timeout=25000
      2013-06-19 10:46:53,065 DEBUG o.a.a.web.MessageListenerServlet   - sending pre-existing message
      2013-06-19 10:46:53,066 DEBUG o.a.a.web.MessageListenerServlet   - Send 0 unconsumed messages
      2013-06-19 10:46:53,065 ERROR org.apache.activemq.web.AjaxListener   - Error receiving message java.lang.IllegalStateException: REDISPATCHED,resumed
      java.lang.IllegalStateException: REDISPATCHED,resumed
      	at org.eclipse.jetty.server.HttpChannelState.dispatch(HttpChannelState.java:335) ~[jetty-server-9.0.3.v20130506.jar:9.0.3.v20130506]
      	at org.eclipse.jetty.server.AsyncContextState.dispatch(AsyncContextState.java:109) ~[jetty-server-9.0.3.v20130506.jar:9.0.3.v20130506]
      	at org.eclipse.jetty.continuation.Servlet3Continuation.resume(Servlet3Continuation.java:186) ~[jetty-continuation-9.0.3.v20130506.jar:9.0.3.v20130506]
      	at org.apache.activemq.web.AjaxListener.onMessageAvailable(AjaxListener.java:74) ~[activemq-web-5.8.0.jar:5.8.0]
      	at org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1343) [activemq-client-5.8.0.jar:5.8.0]
      	at org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:131) [activemq-client-5.8.0.jar:5.8.0]
      	at org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:202) [activemq-client-5.8.0.jar:5.8.0]
      	at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:129) [activemq-client-5.8.0.jar:5.8.0]
      	at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:47) [activemq-client-5.8.0.jar:5.8.0]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) [na:1.7.0_07]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) [na:1.7.0_07]
      	at java.lang.Thread.run(Thread.java:722) [na:1.7.0_07]
      2013-06-19 10:46:53,068 DEBUG o.a.a.web.MessageListenerServlet   - Continuation org.eclipse.jetty.continuation.Servlet3Continuation@73d4a75a completed.
      

      The root of the race is that the code uses isSuspended() to check whether a continuation has been resumed, but resume() only resets isSuspended() after the resume request has been processed in the servlet engine. The race-free way to check whether is has been resumed is to check isResumed(), see attached patch.

      Attachments

        1. AjaxListener-race.diff
          0.8 kB
          Thomas Themel

        Activity

          People

            Unassigned Unassigned
            themel Thomas Themel
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: