The basic architecture of the consumer is a java daemon that spawns a configurable number of single threaded consumers that implement MessageListener- each opens its own connection and transacted session. In the consumer onMessage() method session.commit() is being called upon successful processing of the message- and I've verified that it is actually executed. The problem is that despite the message being successfully processed and session.commit() executed the messages remain as pending in the queue. If the consumer daemon is stopped and re-started these messages are consumed again (definitely not good). Note that session.rollback() was NOT called in this scenario, all the messages were processed successfully. Also note that these are persistent messages and we are using the default AMQ message store.
- The problem only occurs when the number of consumer threads is greater than 1.
- The problem occurs whether or not a correlation ID is used on the messages. I wasn't sure if this mattered or not, but it doesn't.
Fortunately, as a result of working with this test case I discovered a work around to the problem, which is to not only call session.commit() on successful message processing but to then call message.acknowledge() as well. This works like a charm, but it was my understanding that calling message.acknowledge() was not necessary when using transacted sessions.