Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-6417

Timeout when consuming from expired messages route

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Incomplete
    • Affects Version/s: 5.13.1, 5.13.2, 5.13.3, 5.13.4, 5.14.0
    • Fix Version/s: None
    • Component/s: None
    • Regression:
      Regression

      Description

      Starting from ActiveMQ 5.13.1, we get a timeout when consuming expired messages. I'm at loss at how to resolve this, so any help is appreciated. Here's my test:

      package no.foo.integration.module
      
      import no.foo.integration.FooIntegrationApplication
      import no.foo.test_helpers.PortSelector
      import org.apache.camel.EndpointInject
      import org.apache.camel.builder.AdviceWithRouteBuilder
      import org.apache.camel.component.mock.MockEndpoint
      import org.apache.camel.management.JmxSystemPropertyKeys
      import org.apache.camel.model.BeanDefinition
      import org.apache.camel.model.ModelCamelContext
      import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner
      import org.apache.camel.test.spring.UseAdviceWith
      import org.junit.*
      import org.junit.runner.RunWith
      import org.springframework.beans.factory.annotation.Autowired
      import org.springframework.jms.core.JmsTemplate
      import org.springframework.test.context.ContextConfiguration
      import org.springframework.test.context.support.AnnotationConfigContextLoader
      
      import static no.foo.integration.constants.Route.INCOMING
      import static no.foo.integration.constants.Route.INTEGRATIONS_TOPIC_EXPIRED_MESSAGES_ADVISORY
      import static no.foo.test_helpers.SystemPropertyUtil.resetProperty
      import static no.foo.test_helpers.SystemPropertyUtil.writeProperty
      
      @RunWith(CamelSpringJUnit4ClassRunner.class)
      @UseAdviceWith
      @ContextConfiguration(classes = [FooIntegrationApplication.class, MockConfig.class], loader = AnnotationConfigContextLoader.class)
      public class ExpiredMessagesRouteModuleTest {
      
          private static final String CONSUMER_DESTINATION_ONE = "activemq:Consumer.monster.VirtualTopic.Foo.Integrations"
          private static final String CONSUMER_DESTINATION_TWO = "activemq:Consumer.monstermagnet.VirtualTopic.Foo.Integrations"
          private static int freePortBroker
      
          @Autowired
          private JmsTemplate jmsTemplate
      
          @Autowired
          private ModelCamelContext context
      
          @EndpointInject(uri = "mock:expiredMessages")
          private MockEndpoint expiredMessagesEndpointMock
      
      
          @BeforeClass
          public static void enableCamelJMX() {
             System.clearProperty(JmxSystemPropertyKeys.DISABLED)
             freePortBroker = PortSelector.random().select()
             writeProperty("foo-broker.port", "" + freePortBroker)
          }
      
          @AfterClass
          public static void tearDown() {
             resetProperty("foo-broker.port")
          }
      
          @Before
          public void replaceBeanWithMockEndpoint() throws Exception {
              context.getRouteDefinition(INTEGRATIONS_TOPIC_EXPIRED_MESSAGES_ADVISORY.name()).adviceWith(context, new AdviceWithRouteBuilder() {
                  @Override
                  public void configure() throws Exception {
                     weaveByType(BeanDefinition).replace().to(expiredMessagesEndpointMock)
                  }
              });
      
              context.start()
          }
      
          @Test(timeout = 10000L)
          public void consume_from_expired_messages_advisory_topic() throws Exception {
              createTwoTopicConsumers()
              setUpMessagesToExpireFast()
      
              jmsTemplate.convertAndSend(INCOMING.destination, [SEARCH_ID_KEY: "1"])
      
              waitForMessageToExpire()
              expiredMessagesEndpointMock.expectedMessageCount(2)
              expiredMessagesEndpointMock.assertIsSatisfied()
          }
      
          private void setUpMessagesToExpireFast() {
              jmsTemplate.setExplicitQosEnabled(true)
              jmsTemplate.setTimeToLive(500L)
          }
      
          // Need to create topic consumer, otherwise the messages sent to the topic will be discarded before they expire.
          private void createTwoTopicConsumers() {
              context.createConsumerTemplate().receiveNoWait(CONSUMER_DESTINATION_ONE)
              context.createConsumerTemplate().receiveNoWait(CONSUMER_DESTINATION_TWO)
          }
      
          def waitForMessageToExpire() {
              Thread.sleep(1000)
          }
      }
      

      and here's the exception

       no.foo.integration.module.ExpiredMessagesRouteModuleTest > consume_from_expired_messages_advisory_topic FAILED
           org.junit.runners.model.TestTimedOutException: test timed out after 10000 milliseconds
               at sun.misc.Unsafe.park(Native Method)
               at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
               at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
               at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
               at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
               at org.apache.camel.component.mock.MockEndpoint.waitForCompleteLatch(MockEndpoint.java:1425)
               at org.apache.camel.component.mock.MockEndpoint.waitForCompleteLatch(MockEndpoint.java:1409)
               at org.apache.camel.component.mock.MockEndpoint.doAssertIsSatisfied(MockEndpoint.java:405)
               at org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied(MockEndpoint.java:386)
               at org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied(MockEndpoint.java:374)
               at org.apache.camel.component.mock.MockEndpoint$assertIsSatisfied$0.call(Unknown Source)
               at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)
               at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)
               at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)
               at no.foo.integration.module.ExpiredMessagesRouteModuleTest.consume_from_expired_messages_advisory_topic(ExpiredMessagesRouteModuleTest.groovy:77)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
               at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
               at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
               at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
               at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
               at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
               at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
               at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
               at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
               at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
               at java.lang.Thread.run(Thread.java:745)
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              monti Henrik
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: