Description
Attempting to create a consumer on a named temporary queue ("temp-queue://<name>") via AMQP results in a NullPointerException. Following minimal test client using the Qpid AMQP client (qpid-amqp-1-0-client-jms) demonstrates the problem (imports elided for brevity):
public class MinimalAmqpTest { public static void main(String[] args) throws Exception { try (InputStream propertiesStream = MinimalAmqpTest.class.getResourceAsStream("amqp.properties")) { Properties properties = new Properties(); properties.load(propertiesStream); Context context = new InitialContext(properties); ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("localhost"); Connection connection = connectionFactory.createConnection(); connection.start(); Queue queue = (Queue) context.lookup("test"); Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumerSession.createConsumer(queue); } } }
Where amqp.properties looks something like this:
java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory connectionfactory.localhost = amqp://172.16.133.147:5672?clientid=test-client queue.test = temp-queue://test
Running this test against an AMQP-enabled ActiveMQ 5.11.1, 5.10.2 or 5.10.1 broker results in an NPE:
java.lang.NullPointerException at org.apache.activemq.broker.region.TempQueue.addSubscription(TempQueue.java:72) at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:319) at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:427) at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102) at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102) at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102) at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102) at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:107) at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:663) at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:348) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188) at org.apache.activemq.transport.amqp.AmqpTransportFilter.sendToActiveMQ(AmqpTransportFilter.java:114) at org.apache.activemq.transport.amqp.AmqpProtocolConverter.sendToActiveMQ(AmqpProtocolConverter.java:1497) at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onSenderOpen(AmqpProtocolConverter.java:1421) at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onLinkOpen(AmqpProtocolConverter.java:577) at org.apache.activemq.transport.amqp.AmqpProtocolConverter.processLinkEvent(AmqpProtocolConverter.java:389) at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onFrame(AmqpProtocolConverter.java:334) at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onAMQPData(AmqpProtocolConverter.java:275) at org.apache.activemq.transport.amqp.AmqpTransportFilter.onCommand(AmqpTransportFilter.java:98) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.amqp.AmqpFrameParser$1.onFrame(AmqpFrameParser.java:57) at org.apache.activemq.transport.amqp.AmqpFrameParser$5.parse(AmqpFrameParser.java:205) at org.apache.activemq.transport.amqp.AmqpFrameParser$4.parse(AmqpFrameParser.java:178) at org.apache.activemq.transport.amqp.AmqpFrameParser.parse(AmqpFrameParser.java:73) at org.apache.activemq.transport.amqp.AmqpNioTransport.serviceRead(AmqpNioTransport.java:118) at org.apache.activemq.transport.amqp.AmqpNioTransport.access$000(AmqpNioTransport.java:44) at org.apache.activemq.transport.amqp.AmqpNioTransport$1.onSelect(AmqpNioTransport.java:75) at org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:97) at org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
The reason for the NPE is that in the following excerpt from TempQueue.java, tempDest.getConnectionId() returns null:
if (!context.isFaultTolerant()
&& (!context.isNetworkConnection() && !tempDest
.getConnectionId().equals(
sub.getConsumerInfo().getConsumerId()
.getConnectionId()))) {
The ActiveMQTempDestination tempDest is created by ActiveMQDestination.createDestination(String, byte):
ActiveMQTempQueue(ActiveMQTempDestination).<init>(String) line: 39 ActiveMQTempQueue.<init>(String) line: 36 ActiveMQDestination.createDestination(String, byte) line: 95 AmqpProtocolConverter.createDestination(Object) line: 951 AmqpProtocolConverter.onSenderOpen(Sender, AmqpProtocolConverter$AmqpSessionContext) line: 1375 AmqpProtocolConverter.onLinkOpen(Link) line: 577 ...
The created object is returned through the call stack as-is until it reaches the onSenderOpen frame, where it is assigned to the ConsumerInfo. The connection ID is never set.
Proposed fix: in DestinationFactoryImpl.createDestination(ConnectionContext, ActiveMQDestination, DestinationStatistics), after casting destination to ActiveMQTempDestination and before instantiating TempQueue, the connection ID (and any other required information) could be set from the ConnectionContext.