Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-7369

Session receives only one page of messages in CLIENT_ACKNOWLEDGE

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 5.15.11
    • Fix Version/s: None
    • Component/s: JMS client
    • Labels:
      None
    • Environment:

      Windows 10, Oracle JDK 11.0.1

      Description

      A queue contains 1000 messages. Then a consumer connects and starts a session in CLIENT_ACKNOWLEDGE or transacted mode. The consumer will receive only 200 messages. Subsequent call to the receive() method will block indefinitely, ignoring the remaining 800 messages in the queue. Note that the client didn't acknowledge or commit yet.

      When one more message is produced to the queue using another connection, the consumer will receive another 200 messages. Yet another message produced to the queue will cause the consumer to receive all remaining messages, 602 of them this time (600 from the pre-existing ones and 2 more added while the consumer was up).

      The number 200 corresponds to the maxPageSize destination configuration option. Setting it to >1000 will cause that all pending messages are delivered immediately. However, setting it to 10 will cause that 10 messages are delivered after each added item, it will never magically start working.

      Code to reproduce:
       

      public class JmsTest3 {
      
          public static final String BROKER_URL = "tcp://localhost:12354";
      
          public static void main(String[] args) throws Exception {
              BrokerService broker = new BrokerService();
              broker.setPersistent(false);
              broker.addConnector(BROKER_URL);
              // uncomment to configure the maxPageSize
      //        broker.setDestinationPolicy(new PolicyMap());
      //        PolicyMap destinationPolicy = broker.getDestinationPolicy();
      //        PolicyEntry defaultEntry = new PolicyEntry();
      //        destinationPolicy.setDefaultEntry(defaultEntry);
      //        defaultEntry.setMaxPageSize(10);
              broker.start();
      
              ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(BROKER_URL);
      
              int initialMessages = 1000;
              int additionalMessages = 10;
      
              // pre-insert initial messages
              try (
                      Connection connection = cf.createConnection();
                      Session session = connection.createSession(true, 0);
                      MessageProducer producer = session.createProducer(session.createQueue("queue"));
              ) {
                  for (int i = 0; i < initialMessages; i++) {
                      producer.send(session.createTextMessage("msg-" + i));
                  }
                  session.commit();
              }
      
              // start the consumer in a thread
              Thread consumerThread = new Thread(() -> {
                  try (
                          Connection connection = cf.createConnection();
                          Session session = connection.createSession(false, CLIENT_ACKNOWLEDGE);
                          MessageConsumer consumer = session.createConsumer(session.createQueue("queue"))
                  ) {
                      connection.start();
                      for (long count = 0; count < initialMessages + additionalMessages; count++) {
                          consumer.receive();
                          System.out.println("received so far: " + count);
                      }
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              });
              consumerThread.start();
      
              // start the consumer in a thread
              Thread producerThread = new Thread(() -> {
                  try (
                          Connection connection = cf.createConnection();
                          Session session = connection.createSession(false, DUPS_OK_ACKNOWLEDGE);
                          MessageProducer producer = session.createProducer(session.createQueue("queue"));
                  ) {
                      for (int i = 0; i < additionalMessages; i++) {
                          Thread.sleep(1000);
                          producer.send(session.createTextMessage("msg-" + i));
                      }
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              });
              producerThread.start();
      
              producerThread.join();
              consumerThread.join();
              broker.stop();
          }
      }
      

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              vilo Viliam Durina
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: