Uploaded image for project: 'ActiveMQ Artemis'
  1. ActiveMQ Artemis
  2. ARTEMIS-1549

AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 2.2.0, 2.3.0, 2.4.0
    • None
    • AMQP, Broker
    • None
    • AMQP .NET lite client, .NET Core runtime 2.0, connecting with brokers on local machine (fedora 26)

    Description

      I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:

      08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
      	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
      	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
      	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
      	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
      	at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
      

      To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).

      Attempting to send/receive the AMQP messages on the same cluster member works as expected.

      Here is some client code that demonstrates the issue:

      using System;
      using System.Collections.Generic;
      using System.Threading;
      using System.Transactions;
      using Amqp;
      using Amqp.Framing;
      using Amqp.Sasl;
      using Amqp.Types;
      
      namespace Test
      {
          class Program
          {
              static void Main(string[] args)
              {
                  string url1 = "amqp://localhost:5672";
                  string url2 = "amqp://localhost:5673";
                  String ADDRESS = "orders";
      
                  Connection connection1 = new Connection(new Address(url1));
                  Session session1 = new Session(connection1);
                  ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);
      
                  Connection connection2 = new Connection(new Address(url2));
                  Session session2 = new Session(connection2);
                  SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
      
                  receiver.Start(300, (r,  m) => {
                      r.Accept(m);
                      Console.WriteLine("Got message: " + m.Body);
                  });
              
                  Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
                  outMessage.Header = new Header();
                  outMessage.Header.Durable = true;                    
                  sender.Send(outMessage);
      
                  Thread.CurrentThread.Join();
              }
      
              private static Source CreateSharedDurableSubscriberSource(String address)
              {
                  Source source = new Source();
                  source.Address = address;
      
                  source.ExpiryPolicy = new Symbol("never");
                  
                  source.Durable = 2;
      
                  source.Capabilities = new Symbol[]{"topic", "shared", "global"};
                  source.DistributionMode = new Symbol("copy");
      
                  return source;
              }
          }
      }
      

      Here is the cluster config from broker1, broker2 is configured accordingly

            <acceptors>
               <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
      
               <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
            </acceptors>
      
            <connectors>
              <connector name="broker1-connector">tcp://localhost:61616</connector>
              <connector name="broker2-connector">tcp://localhost:61617</connector>
            </connectors>
      
            <cluster-user>todd</cluster-user>
            <cluster-password>password</cluster-password>      
      
            <cluster-connections>
              <cluster-connection name="preprod">
                <connector-ref>broker1-connector</connector-ref>
                <retry-interval>500</retry-interval>
                <use-duplicate-detection>true</use-duplicate-detection>
                <message-load-balancing>ON_DEMAND</message-load-balancing>          
                <max-hops>1</max-hops>
                <static-connectors allow-direct-connections-only="true">
                    <connector-ref>broker2-connector</connector-ref>
                </static-connectors>
              </cluster-connection>
            </cluster-connections>
      

      message-load-balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              toddbaert Todd Baert
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: