Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-5711

[AMQP] Consumer on named temporary queue results in NullPointerException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 5.10.1, 5.10.2, 5.11.1
    • 5.12.0
    • AMQP

    Description

      Attempting to create a consumer on a named temporary queue ("temp-queue://<name>") via AMQP results in a NullPointerException. Following minimal test client using the Qpid AMQP client (qpid-amqp-1-0-client-jms) demonstrates the problem (imports elided for brevity):

      public class MinimalAmqpTest {
          public static void main(String[] args) throws Exception {
              try (InputStream propertiesStream =
                      MinimalAmqpTest.class.getResourceAsStream("amqp.properties")) {
                  Properties properties = new Properties();
                  properties.load(propertiesStream);
                  Context context = new InitialContext(properties);
                  ConnectionFactory connectionFactory =
                      (ConnectionFactory) context.lookup("localhost");
                  Connection connection = connectionFactory.createConnection();            
                  connection.start();
       
                  Queue queue = (Queue) context.lookup("test");
                  Session consumerSession =
                      connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                  consumerSession.createConsumer(queue);
              }
          }
      }
      

      Where amqp.properties looks something like this:

      java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory
      connectionfactory.localhost = amqp://172.16.133.147:5672?clientid=test-client
      queue.test = temp-queue://test
      

      Running this test against an AMQP-enabled ActiveMQ 5.11.1, 5.10.2 or 5.10.1 broker results in an NPE:

      java.lang.NullPointerException
              at org.apache.activemq.broker.region.TempQueue.addSubscription(TempQueue.java:72)
              at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:319)
              at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:427)
              at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)
              at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)
              at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)
              at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)
              at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:107)
              at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:663)
              at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:348)
              at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
              at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
              at org.apache.activemq.transport.amqp.AmqpTransportFilter.sendToActiveMQ(AmqpTransportFilter.java:114)
              at org.apache.activemq.transport.amqp.AmqpProtocolConverter.sendToActiveMQ(AmqpProtocolConverter.java:1497)
              at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onSenderOpen(AmqpProtocolConverter.java:1421)
              at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onLinkOpen(AmqpProtocolConverter.java:577)
              at org.apache.activemq.transport.amqp.AmqpProtocolConverter.processLinkEvent(AmqpProtocolConverter.java:389)
              at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onFrame(AmqpProtocolConverter.java:334)
              at org.apache.activemq.transport.amqp.AmqpProtocolConverter.onAMQPData(AmqpProtocolConverter.java:275)
              at org.apache.activemq.transport.amqp.AmqpTransportFilter.onCommand(AmqpTransportFilter.java:98)
              at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
              at org.apache.activemq.transport.amqp.AmqpFrameParser$1.onFrame(AmqpFrameParser.java:57)
              at org.apache.activemq.transport.amqp.AmqpFrameParser$5.parse(AmqpFrameParser.java:205)
              at org.apache.activemq.transport.amqp.AmqpFrameParser$4.parse(AmqpFrameParser.java:178)
              at org.apache.activemq.transport.amqp.AmqpFrameParser.parse(AmqpFrameParser.java:73)
              at org.apache.activemq.transport.amqp.AmqpNioTransport.serviceRead(AmqpNioTransport.java:118)
              at org.apache.activemq.transport.amqp.AmqpNioTransport.access$000(AmqpNioTransport.java:44)
              at org.apache.activemq.transport.amqp.AmqpNioTransport$1.onSelect(AmqpNioTransport.java:75)
              at org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:97)
              at org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      

      The reason for the NPE is that in the following excerpt from TempQueue.java, tempDest.getConnectionId() returns null:

              if (!context.isFaultTolerant()
                      && (!context.isNetworkConnection() && !tempDest
                              .getConnectionId().equals(
                                      sub.getConsumerInfo().getConsumerId()
                                              .getConnectionId()))) {
      

      The ActiveMQTempDestination tempDest is created by ActiveMQDestination.createDestination(String, byte):

      ActiveMQTempQueue(ActiveMQTempDestination).<init>(String) line: 39	
      ActiveMQTempQueue.<init>(String) line: 36	
      ActiveMQDestination.createDestination(String, byte) line: 95	
      AmqpProtocolConverter.createDestination(Object) line: 951	
      AmqpProtocolConverter.onSenderOpen(Sender, AmqpProtocolConverter$AmqpSessionContext) line: 1375	
      AmqpProtocolConverter.onLinkOpen(Link) line: 577	
      ...
      

      The created object is returned through the call stack as-is until it reaches the onSenderOpen frame, where it is assigned to the ConsumerInfo. The connection ID is never set.

      Proposed fix: in DestinationFactoryImpl.createDestination(ConnectionContext, ActiveMQDestination, DestinationStatistics), after casting destination to ActiveMQTempDestination and before instantiating TempQueue, the connection ID (and any other required information) could be set from the ConnectionContext.

      Attachments

        Activity

          People

            tabish Timothy A. Bish
            dake Daniel Keyhani
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: