Details
Description
I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:
08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:] at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0] at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).
Attempting to send/receive the AMQP messages on the same cluster member works as expected.
Here is some client code that demonstrates the issue:
using System; using System.Collections.Generic; using System.Threading; using System.Transactions; using Amqp; using Amqp.Framing; using Amqp.Sasl; using Amqp.Types; namespace Test { class Program { static void Main(string[] args) { string url1 = "amqp://localhost:5672"; string url2 = "amqp://localhost:5673"; String ADDRESS = "orders"; Connection connection1 = new Connection(new Address(url1)); Session session1 = new Session(connection1); ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null); Connection connection2 = new Connection(new Address(url2)); Session session2 = new Session(connection2); SenderLink sender = new SenderLink(session2, "sender", ADDRESS); receiver.Start(300, (r, m) => { r.Accept(m); Console.WriteLine("Got message: " + m.Body); }); Message outMessage = new Message("order placed at " + DateTime.Now.ToString()); outMessage.Header = new Header(); outMessage.Header.Durable = true; sender.Send(outMessage); Thread.CurrentThread.Join(); } private static Source CreateSharedDurableSubscriberSource(String address) { Source source = new Source(); source.Address = address; source.ExpiryPolicy = new Symbol("never"); source.Durable = 2; source.Capabilities = new Symbol[]{"topic", "shared", "global"}; source.DistributionMode = new Symbol("copy"); return source; } } }
Here is the cluster config from broker1, broker2 is configured accordingly
<acceptors> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> </acceptors> <connectors> <connector name="broker1-connector">tcp://localhost:61616</connector> <connector name="broker2-connector">tcp://localhost:61617</connector> </connectors> <cluster-user>todd</cluster-user> <cluster-password>password</cluster-password> <cluster-connections> <cluster-connection name="preprod"> <connector-ref>broker1-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <static-connectors allow-direct-connections-only="true"> <connector-ref>broker2-connector</connector-ref> </static-connectors> </cluster-connection> </cluster-connections>
message-load-balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question.
Attachments
Issue Links
- is duplicated by
-
ARTEMIS-1542 AMQPMessage can throw / not check for null
- Closed