Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-1201

Messages dropped with ActiveMQ/JMS component and Resequencer pattern

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.5.0
    • 2.0-M1
    • camel-activemq, camel-jms, eip
    • None

    Description

      I've created a simple class to test the resequencer as listed below:

      public final class MyCamelResequencer {
          private static final Logger LOG = Logger.getLogger(MyCamelResequencer.class);
      
          private MyCamelResequencer() {}
      
          public static void main(String args[]) throws Exception {
              CamelContext context = new DefaultCamelContext();
              Tracer tracer = new Tracer();
              tracer.getFormatter().setShowBreadCrumb(false);
              tracer.getFormatter().setShowNode(true);
              context.addInterceptStrategy(tracer);
             
              String brokerUrl = "vm://localhost?broker.persistent=false";
      //        String brokerUrl = "tcp://localhost:61616";
              context.addComponent("activemq", ActiveMQComponent.activeMQComponent(brokerUrl));
              context.addRoutes(new RouteBuilder() {
                  public void configure() {
      //                from("seda:FOO").to("log:PLANETS");
      //                from("seda:FOO").resequencer(header("seqnum")).to("log:PLANETS");
      //                from("activemq:TEST.IN").to("log:PLANETS");
                      from("activemq:TEST.IN").resequencer(header("seqnum")).to("log:PLANETS");
      //                from("activemq:TEST.IN").resequencer(header("seq")).to("activemq:TEST.OUT");
                  }
              });
      
              ProducerTemplate template = context.createProducerTemplate();
      
              context.start();
      
              List<String> planets = new ArrayList(8);
              planets.add("Mercury");
              planets.add("Venus");
              planets.add("Earth");
              planets.add("Mars");
              planets.add("Jupiter");
              planets.add("Saturn");
              planets.add("Uranus");
              planets.add("Neptune");
              
              List<Integer> numbers = new ArrayList<Integer>(8);
              numbers.add(1);
              numbers.add(2);
              numbers.add(3);
              numbers.add(4);
              numbers.add(5);
              numbers.add(6);
              numbers.add(7);
              numbers.add(8);
              
              for (String planet: planets) {
                  Collections.shuffle(numbers);
                  int seqnum = numbers.remove(0);
                  String message = "I am the planet " + planet + " with seqnum: " + seqnum;
                  LOG.info("Sending message: " + message);
                  template.sendBodyAndHeader("activemq:TEST.IN", message, "seqnum", seqnum);
      //            template.sendBodyAndHeader("seda:FOO", message, "seqnum", seqnum);
              }
      
              Thread.sleep(10000);
              LOG.info("Shutting down the Camel context");
              context.stop();
              System.exit(1);
          }
      

      The following are my observations:

      • SEDA -> log receives all messages
      • SEDA -> resequencer -> log receives all messages
      • ActiveMQ -> log receives all messages
      • ActiveMQ -> resequencer -> log does not receive all messages

      Upon enabling the Tracer interceptor, I'm able to see that it doesn't even receive all messages from the ActiveMQ component:

      [ main] DefaultCamelContext INFO JMX enabled. Using InstrumentationLifecycleStrategy.
      [ main] BrokerService INFO Using Persistence Adapter: MemoryPersistenceAdapter
      [ main] BrokerService INFO ActiveMQ null JMS Message Broker (localhost) is starting
      [ main] BrokerService INFO For help or more information please see: http://activemq.apache.org/
      [ JMX connector] ManagementContext WARN Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is java.rmi.AlreadyBoundException: jmxrmi]
      [ main] BrokerService INFO ActiveMQ JMS Message Broker (localhost, ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) started
      [ main] TransportConnector INFO Connector vm://localhost Started
      [ main] MyCamelResequencer INFO Sending message: I am the planet Mercury with seqnum: 1
      [ main] MyCamelResequencer INFO Sending message: I am the planet Venus with seqnum: 3
      [ main] MyCamelResequencer INFO Sending message: I am the planet Earth with seqnum: 6
      [ main] MyCamelResequencer INFO Sending message: I am the planet Mars with seqnum: 2
      [ main] MyCamelResequencer INFO Sending message: I am the planet Jupiter with seqnum: 4
      [ main] MyCamelResequencer INFO Sending message: I am the planet Saturn with seqnum: 8
      [ main] MyCamelResequencer INFO Sending message: I am the planet Uranus with seqnum: 7
      [ main] MyCamelResequencer INFO Sending message: I am the planet Neptune with seqnum: 5
      [aultMessageListenerContainer-1] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Mercury with seqnum: 1
      [aultMessageListenerContainer-1] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:

      Unknown macro: {CamelCauseException=null}

      Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Mercury with seqnum: 1
      [aultMessageListenerContainer-2] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Earth with seqnum: 6
      [aultMessageListenerContainer-2] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:

      Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Earth with seqnum: 6
      [aultMessageListenerContainer-3] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Mars with seqnum: 2
      [aultMessageListenerContainer-3] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:

      Unknown macro: {CamelCauseException=null}

      Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Mars with seqnum: 2
      [aultMessageListenerContainer-4] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Jupiter with seqnum: 4
      [aultMessageListenerContainer-4] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:

      Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Jupiter with seqnum: 4
      [aultMessageListenerContainer-5] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Uranus with seqnum: 7
      [aultMessageListenerContainer-5] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:

      Unknown macro: {CamelCauseException=null}

      Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Uranus with seqnum: 7
      [aultMessageListenerContainer-6] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Neptune with seqnum: 5
      [aultMessageListenerContainer-6] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:

      Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Neptune with seqnum: 5
      [[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1 To[log:PLANETS] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Venus with seqnum: 3
      [[log:PLANETS]]] Polling Thread] PLANETS INFO Exchange[BodyType:String, Body:I am the planet Venus with seqnum: 3]
      [[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1 To[log:PLANETS] InOnly Properties:{} Headers:

      Unknown macro: {JMSDestination=queue}

      BodyType:String Body:I am the planet Saturn with seqnum: 8
      [[log:PLANETS]]] Polling Thread] PLANETS INFO Exchange[BodyType:String, Body:I am the planet Saturn with seqnum: 8]
      [ main] MyCamelResequencer INFO Shutting down the Camel context
      [ ActiveMQ ShutdownHook] BrokerService INFO ActiveMQ Message Broker (localhost, ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) is shutting down
      [ ActiveMQ ShutdownHook] TransportConnector INFO Connector vm://localhost Stopped
      [ ActiveMQ ShutdownHook] BrokerService INFO ActiveMQ JMS Message Broker (localhost, ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) stopped

      Attachments

        Activity

          People

            hadrian Hadrian Zbarcea
            bsnyder Bruce Snyder
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: