Details
Description
Assuming broker1 and broker2 exist in a cluster, if an AMQP Source constituting a shared durable subscription (having capabilities "shared", "global" and "topic") is created on broker1, and attempted to be recovered from a client connected to broker2 (by creating an attach frame with a null source, with desired capabilities "shared", "global" and "topic"), an Amqp.AmqpException is thrown.
In a cluster, shouldn't it be possible to recover subscriptions that exist on another broker? Shouldn't the search for this subscription occur on all cluster members?
I don't believe this is happening. It looks like we just search locally (not sure about this) - but certainly the exception is thrown and the sub is not recovered:
See code below from org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext (comments are not my own, they are from source):
if (source == null) { // Attempt to recover a previous subscription happens when a link reattach happens on a // subscription queue String clientId = getClientId(); String pubId = sender.getName(); global = hasRemoteDesiredCapability(sender, GLOBAL); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false); QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false); multicast = true; routingTypeToUse = RoutingType.MULTICAST; // Once confirmed that the address exists we need to return a Source that reflects // the lifetime policy and capabilities of the new subscription. if (result.isExists()) {
I have a C# sample that demonstrates the issue:
using System; using System.Collections.Generic; using System.Threading; using System.Transactions; using Amqp; using Amqp.Framing; using Amqp.Sasl; using Amqp.Types; namespace amqp_client_demo { class Program { static void Main(string[] args) { string url1 = "amqp://localhost:5672"; string url2 = "amqp://localhost:5673"; String ADDRESS = "orders"; Connection connection1 = new Connection(new Address(url1)); Session session1 = new Session(connection1); ReceiverLink receiver1 = new ReceiverLink(session1, "test", new Source(){Durable = 2, Capabilities = new Symbol[]{"topic", "shared", "global"}, Address = ADDRESS}, null); SenderLink sender = new SenderLink(session1, "sender", ADDRESS); sender.Send(new Message("test message")); Connection connection2 = new Connection(new Address(url2)); Session session2 = new Session(connection2); ReceiverLink receiver2 = new ReceiverLink(session2, "test", new Attach(){Source = null, DesiredCapabilities = new Symbol[]{"topic", "shared", "global"}}, null); Console.WriteLine(receiver2.Receive(TimeSpan.FromSeconds(1)).Body); //Unhandled Exception: Amqp.AmqpException: Unknown subscription link: test } } }
If url1 and url2 are modified to point to the same cluster members, this works fine. Otherwise the exception (see comment) is thrown.