ActiveMQ
  1. ActiveMQ
  2. AMQ-1573

ConduitBridge merges destinations x.a and x.>, resulting in loss of subscription

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 5.0.0
    • Fix Version/s: NEEDS_REVIEW
    • Component/s: None
    • Labels:
      None

      Description

      Two brokers A <-> B.

      1. B subscribes to x.a and x.>.
      2. A registers subscription x.a in ConduitBridge
      3. A examines subscription x.> in ConduitBridge:

      • A DestinationFilter is created for x.>
      • The filter matches the old subscription for x.a
      • Consumer B is added as "interested" in x.a
        4. Consumer on B will only receive messages for x.a, but not x.b.

        Activity

        Marcus Nilsson created issue -
        Rob Davies made changes -
        Field Original Value New Value
        Assignee Rob Davies [ rajdavies ]
        Rob Davies made changes -
        Fix Version/s 5.3.0 [ 11914 ]
        Hide
        Fran Vázquez added a comment -

        A simple test to reproduce this issue (just execute several times):

        import java.net.URI;

        import javax.jms.BytesMessage;
        import javax.jms.Destination;
        import javax.jms.MessageConsumer;
        import javax.jms.MessageListener;
        import javax.jms.MessageProducer;
        import javax.jms.Session;
        import javax.jms.TextMessage;

        import org.apache.activemq.ActiveMQConnection;
        import org.apache.activemq.ActiveMQConnectionFactory;
        import org.apache.activemq.broker.BrokerService;
        import org.apache.activemq.broker.TransportConnector;
        import org.apache.activemq.command.ActiveMQMessage;
        import org.apache.activemq.command.ActiveMQTopic;

        public class ActiveMQSubscriptionTest {

        /**

        • @param args
          */
          public static void main(String[] args) {
          try {
          URI discoveryUri = new URI("multicast://default?group=onegroup");
          URI localPort = new URI("tcp://localhost:0");

        BrokerService broker = new BrokerService();
        broker.setUseJmx(false);
        broker.setPersistent(false);

        TransportConnector connector = broker.addConnector(localPort);
        connector.setDiscoveryUri(discoveryUri);
        broker.addNetworkConnector(discoveryUri);
        broker.start();

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
        ActiveMQConnection.DEFAULT_PASSWORD, "vm://"+ broker.getBrokerName());

        ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(session.createTopic("ActiveMQ.Advisory.>"));
        TestListener listener= new TestListener("ActiveMQ.Advisory.>");
        consumer.setMessageListener(listener);

        Destination d= new ActiveMQTopic("one.>");
        consumer = session.createConsumer(d);
        listener = new TestListener("one.>");
        consumer.setMessageListener(listener);

        d= new ActiveMQTopic("one.two.>");
        consumer = session.createConsumer(d);
        listener = new TestListener("one.two.>");
        consumer.setMessageListener(listener);

        d= new ActiveMQTopic("one.two.three");
        consumer = session.createConsumer(d);
        listener = new TestListener("one.two.three");
        consumer.setMessageListener(listener);

        d= new ActiveMQTopic("one.four.five");
        MessageProducer producer = session.createProducer(d);
        TextMessage msg = session.createTextMessage();

        int i = 0;
        String ID = connection.getConnectionInfo().getClientId();
        while(true){
        synchronized(session)

        { session.wait(10000); msg.setText(ID +" ----------- " + i +" -----------" ); producer.send(msg); i++; }

        }
        }catch(Exception e)

        { System.out.println(e); }
        }

        }

        class TestListener implements MessageListener {

        private String topic;

        public TestListener(String topic)
        { this.topic = topic; }

        public void onMessage(javax.jms.Message message) {
        try{

        System.out.println(topic + " Message : " + message.getJMSDestination().toString());
        String content = "Not known";
        byte[] body;
        if(message instanceof BytesMessage)
        { body = new byte[(int)((BytesMessage)message).getBodyLength()]; ((BytesMessage)message).readBytes(body); content = new String(body); }
        else if (message instanceof TextMessage)
        { content = ((TextMessage)message).getText(); }
        else if (message instanceof ActiveMQMessage){ content = ""; }
        System.out.println(content);
        }catch(Exception e){ System.out.println(e); }

        }
        }

        Show
        Fran Vázquez added a comment - A simple test to reproduce this issue (just execute several times): import java.net.URI; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; public class ActiveMQSubscriptionTest { /** @param args */ public static void main(String[] args) { try { URI discoveryUri = new URI("multicast://default?group=onegroup"); URI localPort = new URI("tcp://localhost:0"); BrokerService broker = new BrokerService(); broker.setUseJmx(false); broker.setPersistent(false); TransportConnector connector = broker.addConnector(localPort); connector.setDiscoveryUri(discoveryUri); broker.addNetworkConnector(discoveryUri); broker.start(); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "vm://"+ broker.getBrokerName()); ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createTopic("ActiveMQ.Advisory.>")); TestListener listener= new TestListener("ActiveMQ.Advisory.>"); consumer.setMessageListener(listener); Destination d= new ActiveMQTopic("one.>"); consumer = session.createConsumer(d); listener = new TestListener("one.>"); consumer.setMessageListener(listener); d= new ActiveMQTopic("one.two.>"); consumer = session.createConsumer(d); listener = new TestListener("one.two.>"); consumer.setMessageListener(listener); d= new ActiveMQTopic("one.two.three"); consumer = session.createConsumer(d); listener = new TestListener("one.two.three"); consumer.setMessageListener(listener); d= new ActiveMQTopic("one.four.five"); MessageProducer producer = session.createProducer(d); TextMessage msg = session.createTextMessage(); int i = 0; String ID = connection.getConnectionInfo().getClientId(); while(true){ synchronized(session) { session.wait(10000); msg.setText(ID +" ----------- " + i +" -----------" ); producer.send(msg); i++; } } }catch(Exception e) { System.out.println(e); } } } class TestListener implements MessageListener { private String topic; public TestListener(String topic) { this.topic = topic; } public void onMessage(javax.jms.Message message) { try{ System.out.println(topic + " Message : " + message.getJMSDestination().toString()); String content = "Not known"; byte[] body; if(message instanceof BytesMessage) { body = new byte[(int)((BytesMessage)message).getBodyLength()]; ((BytesMessage)message).readBytes(body); content = new String(body); } else if (message instanceof TextMessage) { content = ((TextMessage)message).getText(); } else if (message instanceof ActiveMQMessage){ content = ""; } System.out.println(content); }catch(Exception e){ System.out.println(e); } } }
        Gary Tully made changes -
        Fix Version/s 5.3.0 [ 11914 ]
        Fix Version/s 5.4.0 [ 12110 ]
        Rob Davies made changes -
        Fix Version/s 5.4.0 [ 12110 ]
        Fix Version/s NEEDS_REVIEWED [ 12186 ]
        Jeff Turner made changes -
        Project Import Fri Nov 26 22:32:02 EST 2010 [ 1290828722158 ]

          People

          • Assignee:
            Rob Davies
            Reporter:
            Marcus Nilsson
          • Votes:
            1 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:

              Development