Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-9567

SjmsConsumer does not suspend (on Camel context) stop

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Unknown

    Description

      When using SjmsConsumer that consumes JMS messages from broker (e.g. ActiveMQ) and stop Camel Context, the consumer still reads new messages from broker even if it should only finish processing of already fetched messages. It causes that Context is never stopped if there are still new and new messages in the broker (or before timeout for stop operation occurs).

      When I investigated code, it seems that suspend operation is not implemented (or code does not check isSuspended flag). What I would expect is that consumer unregisters JMS listener on context stop (consumer suspend).

      Here is the sample code I used for testing:

      public class Test {
      
          public static void main(String[] args) throws Exception {
             // pre-fill JMS Broker with many many messages, e.g. 10.000
      
              RouteBuilder rb = new RouteBuilder() {
                  @Override
                  public void configure() throws Exception {
                      from("sjms:queue:test?consumerCount=5")
                              .process(new Processor() {
                                  @Override
                                  public void process(Exchange exchange) throws Exception {
                                      Thread.sleep(1000); // not to consume all messages instantly
                                      System.out.println("Processed message " + exchange.getExchangeId());
                                  }
                              });
                  }
              };
      
              CamelContext context = new DefaultCamelContext();
              context.getShutdownStrategy().setTimeout(1000); // 1000s = big enough timeout so I can be sure it is consumer problem
              addJmsComponent(context);
              context.addRoutes(rb);
      
              System.out.println("=====> Starting context");
              context.start();
              Thread.sleep(5 * 1000); // Consume few messages at the beginning
      
              System.out.println("=====> Stopping context");
              context.stop();
              System.out.println("=====> Context stopped"); // Will not get here as long as there are any messages left in the queue
          }
      
          private static void addJmsComponent(CamelContext context) {
              ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ is easiest for testing this
              ConnectionFactoryResource connResource = new ConnectionFactoryResource(5, factory);
              SjmsComponent comp = new SjmsComponent();
              comp.setConnectionResource(connResource);
              context.addComponent("sjms", comp);
          }
      }
      

      The original mailing list thread:
      http://camel.465427.n5.nabble.com/CamelContext-stop-with-SJMS-consumer-does-not-stop-consuming-messages-from-broker-td5777207.html

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            davsclaus Claus Ibsen
            d1x Zdeněk Obst
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment