ActiveMQ
  1. ActiveMQ
  2. AMQ-1509

Duplicate topic messages received with network of brokers and selectors

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 4.1.1
    • Fix Version/s: 5.3.0
    • Component/s: Broker, Transport
    • Labels:
      None

      Description

      If you create a network of two brokers, A and B, one publisher publishing to A, and n (where n is > 1) receivers with selectors, each receiver recieves n messages for every 1 message sent. The key here is to have a selector. It would appear that the conduitSubscriptions flag does not work when using selectors. The conduit does not properly reconcile consumers if they have selectors. A suggested soltuion would be that ather than process each selector independantly, each selector should be or'ed together and if any selector results in true then a single message should be sent to the other broker.

      In doing research, it would appear that this problem was introduced with bug fix AMQ-810. Another user reported it via email back to the assignee of AMQ-810 and a short dialog transpired. See http://www.mail-archive.com/activemq-users@geronimo.apache.org/msg05198.html.

      1. ActiveMQActor.java
        10 kB
        Giovanni Toffetti

        Issue Links

          Activity

          Giovanni Toffetti made changes -
          Link This issue is cloned as AMQ-3582 [ AMQ-3582 ]
          Hide
          Giovanni Toffetti added a comment -

          Hi Dejan,

          just by chance I started looking again at this issue. I don't think the problem is fixed: as soon as there are more than one ( at least 2 hops ) paths between brokers message duplication occurs.

          Here's a little example:

          FourBrokerTopicNetworkTest
          
          public class FourBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport implements MessageListener {
          	protected static final int MESSAGE_COUNT = 5;
          	public boolean dynamicOnly;
          
          	public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
          		addCombinationValues("dynamicOnly", new Object[] { true, false });
          	}
          
          	/**
          	 * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB
          	 * <-> BrokerD BrokerD <-> BrokerC
          	 * 
          	 */
          	public void testSquareConnectedBrokerNetwork2() throws Exception {
          		int networkTTL = 2;
          		boolean conduitSubs = true;
          		boolean dynamicOnly = false;
          
          		// Setup broker networks
          		bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
          				conduitSubs);
          
          		bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
          				conduitSubs);
          
          		startAllBrokers();
          
          		// Setup destination
          		Destination dest = createDestination("TEST.FOO", true);
          
          		// Setup consumers
          		MessageConsumer clientA = createConsumer("BrokerA", dest, "msgId > 0");
          		MessageConsumer clientB = createConsumer("BrokerB", dest, "msgId > 0");
          		MessageConsumer clientC = createConsumer("BrokerC", dest, "msgId > 0");
          		MessageConsumer clientD = createConsumer("BrokerD", dest, "msgId > 0");
          		// let consumers propogate around the network
          		Thread.sleep(5000);
          		
          		clientD.setMessageListener(this);
          
          		// Send messages
          		String[] brokers = { "BrokerA", "BrokerB", "BrokerC", "BrokerD" };
          		HashMap<String, Object> props = new HashMap<String, Object>();
          		for (String broker : brokers) {
          			props.put("sender", broker);
          			for (int i = 1; i <= MESSAGE_COUNT; i++) {
          				props.put("msgId", i);
          				sendMessages(broker, dest, 1, props);
          			}
          		}
          
          		// Get message count
          		MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
          		MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
          		MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
          		MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);
          
          		msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          		msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          		msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          		msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          		
          		System.out.println(msgsA.toString());
          
          		assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
          		assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
          		assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
          		assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
          	}
          
          	/**
          	 * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB
          	 * <-> BrokerD BrokerD <-> BrokerC
          	 * 
          	 */
          	public void testSquareConnectedBrokerNetwork() throws Exception {
          		int networkTTL = 2;
          		boolean conduitSubs = true;
          		boolean dynamicOnly = false;
          
          		// Setup broker networks
          		bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
          				conduitSubs);
          
          		bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
          				conduitSubs);
          		bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
          				conduitSubs);
          
          		startAllBrokers();
          
          		// Setup destination
          		Destination dest = createDestination("TEST.FOO", true);
          
          		// Setup consumers
          		MessageConsumer clientA = createConsumer("BrokerA", dest);
          		MessageConsumer clientB = createConsumer("BrokerB", dest);
          		MessageConsumer clientC = createConsumer("BrokerC", dest);
          		MessageConsumer clientD = createConsumer("BrokerD", dest);
          		// let consumers propogate around the network
          		Thread.sleep(5000);
          
          		// Send messages
          		sendMessages("BrokerA", dest, MESSAGE_COUNT);
          		sendMessages("BrokerB", dest, MESSAGE_COUNT);
          		sendMessages("BrokerC", dest, MESSAGE_COUNT);
          		sendMessages("BrokerD", dest, MESSAGE_COUNT);
          
          		// Get message count
          		MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
          		MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
          		MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
          		MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);
          
          		msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          		msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          		msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          		msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);
          
          		assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
          		assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
          		assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
          		assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
          	}
          
          	public void setUp() throws Exception {
          		super.setAutoFail(true);
          		super.setUp();
          		String options = new String("?persistent=false&useJmx=false");
          		createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
          		createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
          		createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
          		createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
          	}
          
          	public static Test suite() {
          		return suite(FourBrokerTopicNetworkTest.class);
          	}
          
          	@Override
          	public void onMessage(Message message) {
          		try {
          			System.err.println(message.getStringProperty("sender") + " msgID:" + message.getIntProperty("msgId") );
          		} catch (JMSException e) {
          			// TODO Auto-generated catch block
          			e.printStackTrace();
          		}
          		
          	}
          }
          
          

          I don't know if there's anything wrong with this test, or if I should use different configurations of TTL, conduit, and dynamicOnly. I tested it with the latest AMQ I could get (5.5.1).

          As you can see delivered messages are more than 20, they are 25. The reason behind it can be seen in the testSquareConnectedBrokerNetwork2 method: clientD will print all messages coming from BrokerA twice as they are forwarded by both BrokerB and BrokerC on two different paths.
          And of course this is a major problem whenever a broker network has multiple paths as message duplication becomes so severe that it basically kills the whole thing.

          Please let me know if the test is correct as I'd like to have some more insight about why this is happening. Also my colleagues and I have some ideas about the correct way to fix it.

          Regards,

          g

          Show
          Giovanni Toffetti added a comment - Hi Dejan, just by chance I started looking again at this issue. I don't think the problem is fixed: as soon as there are more than one ( at least 2 hops ) paths between brokers message duplication occurs. Here's a little example: FourBrokerTopicNetworkTest public class FourBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport implements MessageListener { protected static final int MESSAGE_COUNT = 5; public boolean dynamicOnly; public void initCombosForTestABandBCbrokerNetworkWithSelectors() { addCombinationValues( "dynamicOnly" , new Object [] { true , false }); } /** * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB * <-> BrokerD BrokerD <-> BrokerC * */ public void testSquareConnectedBrokerNetwork2() throws Exception { int networkTTL = 2; boolean conduitSubs = true ; boolean dynamicOnly = false ; // Setup broker networks bridgeBrokers( "BrokerA" , "BrokerB" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerB" , "BrokerA" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerA" , "BrokerC" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerC" , "BrokerA" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerD" , "BrokerC" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerC" , "BrokerD" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerD" , "BrokerB" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerB" , "BrokerD" , dynamicOnly, networkTTL, conduitSubs); startAllBrokers(); // Setup destination Destination dest = createDestination( "TEST.FOO" , true ); // Setup consumers MessageConsumer clientA = createConsumer( "BrokerA" , dest, "msgId > 0" ); MessageConsumer clientB = createConsumer( "BrokerB" , dest, "msgId > 0" ); MessageConsumer clientC = createConsumer( "BrokerC" , dest, "msgId > 0" ); MessageConsumer clientD = createConsumer( "BrokerD" , dest, "msgId > 0" ); // let consumers propogate around the network Thread .sleep(5000); clientD.setMessageListener( this ); // Send messages String [] brokers = { "BrokerA" , "BrokerB" , "BrokerC" , "BrokerD" }; HashMap< String , Object > props = new HashMap< String , Object >(); for ( String broker : brokers) { props.put( "sender" , broker); for ( int i = 1; i <= MESSAGE_COUNT; i++) { props.put( "msgId" , i); sendMessages(broker, dest, 1, props); } } // Get message count MessageIdList msgsA = getConsumerMessages( "BrokerA" , clientA); MessageIdList msgsB = getConsumerMessages( "BrokerB" , clientB); MessageIdList msgsC = getConsumerMessages( "BrokerC" , clientC); MessageIdList msgsD = getConsumerMessages( "BrokerD" , clientD); msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4); msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4); msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4); msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4); System .out.println(msgsA.toString()); assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount()); assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount()); assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount()); assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount()); } /** * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB * <-> BrokerD BrokerD <-> BrokerC * */ public void testSquareConnectedBrokerNetwork() throws Exception { int networkTTL = 2; boolean conduitSubs = true ; boolean dynamicOnly = false ; // Setup broker networks bridgeBrokers( "BrokerA" , "BrokerB" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerB" , "BrokerA" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerA" , "BrokerC" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerC" , "BrokerA" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerD" , "BrokerC" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerC" , "BrokerD" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerD" , "BrokerB" , dynamicOnly, networkTTL, conduitSubs); bridgeBrokers( "BrokerB" , "BrokerD" , dynamicOnly, networkTTL, conduitSubs); startAllBrokers(); // Setup destination Destination dest = createDestination( "TEST.FOO" , true ); // Setup consumers MessageConsumer clientA = createConsumer( "BrokerA" , dest); MessageConsumer clientB = createConsumer( "BrokerB" , dest); MessageConsumer clientC = createConsumer( "BrokerC" , dest); MessageConsumer clientD = createConsumer( "BrokerD" , dest); // let consumers propogate around the network Thread .sleep(5000); // Send messages sendMessages( "BrokerA" , dest, MESSAGE_COUNT); sendMessages( "BrokerB" , dest, MESSAGE_COUNT); sendMessages( "BrokerC" , dest, MESSAGE_COUNT); sendMessages( "BrokerD" , dest, MESSAGE_COUNT); // Get message count MessageIdList msgsA = getConsumerMessages( "BrokerA" , clientA); MessageIdList msgsB = getConsumerMessages( "BrokerB" , clientB); MessageIdList msgsC = getConsumerMessages( "BrokerC" , clientC); MessageIdList msgsD = getConsumerMessages( "BrokerD" , clientD); msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4); msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4); msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4); msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4); assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount()); assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount()); assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount()); assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount()); } public void setUp() throws Exception { super .setAutoFail( true ); super .setUp(); String options = new String ( "?persistent= false &useJmx= false " ); createBroker( new URI( "broker:(tcp: //localhost:61616)/BrokerA" + options)); createBroker( new URI( "broker:(tcp: //localhost:61617)/BrokerB" + options)); createBroker( new URI( "broker:(tcp: //localhost:61618)/BrokerC" + options)); createBroker( new URI( "broker:(tcp: //localhost:61619)/BrokerD" + options)); } public static Test suite() { return suite(FourBrokerTopicNetworkTest.class); } @Override public void onMessage(Message message) { try { System .err.println(message.getStringProperty( "sender" ) + " msgID:" + message.getIntProperty( "msgId" ) ); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } I don't know if there's anything wrong with this test, or if I should use different configurations of TTL, conduit, and dynamicOnly. I tested it with the latest AMQ I could get (5.5.1). As you can see delivered messages are more than 20, they are 25. The reason behind it can be seen in the testSquareConnectedBrokerNetwork2 method: clientD will print all messages coming from BrokerA twice as they are forwarded by both BrokerB and BrokerC on two different paths. And of course this is a major problem whenever a broker network has multiple paths as message duplication becomes so severe that it basically kills the whole thing. Please let me know if the test is correct as I'd like to have some more insight about why this is happening. Also my colleagues and I have some ideas about the correct way to fix it. Regards, g
          Jeff Turner made changes -
          Project Import Fri Nov 26 22:32:02 EST 2010 [ 1290828722158 ]
          Dejan Bosanac made changes -
          Status Reopened [ 4 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Hide
          Dejan Bosanac added a comment -

          We did some refactoring in this area for another issue, but this generally should be working.

          Here's the test case that tries to reproduce the problem (org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest)

              public void testABandBCbrokerNetworkWithSelectors() throws Exception {
                  // Setup broker networks
                  bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, 2, true);
                  bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, 2, true);
          
                  startAllBrokers();
          
                  // Setup destination
                  Destination dest = createDestination("TEST.FOO", true);
          
                  // Setup consumers
                  MessageConsumer clientA = createConsumer("BrokerC", dest, "dummy = 33");
                  MessageConsumer clientB = createConsumer("BrokerC", dest, "dummy > 30");
                  MessageConsumer clientC = createConsumer("BrokerC", dest, "dummy = 34");
          
                  // let consumers propogate around the network
                  Thread.sleep(2000);
                  // Send messages
                  // Send messages for broker A
                  HashMap<String, Object> props = new HashMap<String, Object>();
                  props.put("dummy", 33);
                  sendMessages("BrokerA", dest, MESSAGE_COUNT, props);
                  props.put("dummy", 34);
                  sendMessages("BrokerA", dest, MESSAGE_COUNT * 2, props);
          
                  // Get message count
                  MessageIdList msgsA = getConsumerMessages("BrokerC", clientA);
                  MessageIdList msgsB = getConsumerMessages("BrokerC", clientB);
                  MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
          
                  msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
                  msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
                  msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2) ;
          
                  assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
                  assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
                  assertEquals(MESSAGE_COUNT *2, msgsC.getMessageCount());
              }

          If you find it still not working, please submit the test case similar to the one above

          Show
          Dejan Bosanac added a comment - We did some refactoring in this area for another issue, but this generally should be working. Here's the test case that tries to reproduce the problem (org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest) public void testABandBCbrokerNetworkWithSelectors() throws Exception { // Setup broker networks bridgeBrokers( "BrokerA" , "BrokerB" , dynamicOnly, 2, true ); bridgeBrokers( "BrokerB" , "BrokerC" , dynamicOnly, 2, true ); startAllBrokers(); // Setup destination Destination dest = createDestination( "TEST.FOO" , true ); // Setup consumers MessageConsumer clientA = createConsumer( "BrokerC" , dest, "dummy = 33" ); MessageConsumer clientB = createConsumer( "BrokerC" , dest, "dummy > 30" ); MessageConsumer clientC = createConsumer( "BrokerC" , dest, "dummy = 34" ); // let consumers propogate around the network Thread .sleep(2000); // Send messages // Send messages for broker A HashMap< String , Object > props = new HashMap< String , Object >(); props.put( "dummy" , 33); sendMessages( "BrokerA" , dest, MESSAGE_COUNT, props); props.put( "dummy" , 34); sendMessages( "BrokerA" , dest, MESSAGE_COUNT * 2, props); // Get message count MessageIdList msgsA = getConsumerMessages( "BrokerC" , clientA); MessageIdList msgsB = getConsumerMessages( "BrokerC" , clientB); MessageIdList msgsC = getConsumerMessages( "BrokerC" , clientC); msgsA.waitForMessagesToArrive(MESSAGE_COUNT); msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3); msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2) ; assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount()); assertEquals(MESSAGE_COUNT *2, msgsC.getMessageCount()); } If you find it still not working, please submit the test case similar to the one above
          Gary Tully made changes -
          Fix Version/s 5.2.0 [ 11841 ]
          Fix Version/s 5.3.0 [ 11914 ]
          Giovanni Toffetti made changes -
          Attachment ActiveMQActor.java [ 16477 ]
          Hide
          Giovanni Toffetti added a comment -

          This are the clients I use to reproduce the bug on cyclic topologies.

          Show
          Giovanni Toffetti added a comment - This are the clients I use to reproduce the bug on cyclic topologies.
          Hide
          Giovanni Toffetti added a comment -

          Hi Rob,

          I saw you committed the changes to fix issue AMQ-1509. I downloaded the latest svn snapshot on may the 8th and tested it on the (for me usual) 4-nodes-in-a-cycle topology. It seems your patch improves the whole thing as now my test runs end nicely (before the duplication was so massive clients were not able to finish their workload), alas, my clients keep receiving duplicate messages.

          I will attach the code of my clients for completeness, either there's something wrong with them or the bug is still there for bigger cyclic topologies.

          Show
          Giovanni Toffetti added a comment - Hi Rob, I saw you committed the changes to fix issue AMQ-1509 . I downloaded the latest svn snapshot on may the 8th and tested it on the (for me usual) 4-nodes-in-a-cycle topology. It seems your patch improves the whole thing as now my test runs end nicely (before the duplication was so massive clients were not able to finish their workload), alas, my clients keep receiving duplicate messages. I will attach the code of my clients for completeness, either there's something wrong with them or the bug is still there for bigger cyclic topologies.
          Hiram Chirino made changes -
          Fix Version/s 5.2.0 [ 11841 ]
          Fix Version/s 5.1.0 [ 11802 ]
          Hide
          Rob Davies added a comment -

          The only 'safe' way to do this is the simple fix suggested by Howard. If we try and OR selectors together - we have to deal with the timing around updating that selector state - and this could be prone to some difficult to track down timing issues.

          If you want to shape the traffic across networks, the best way would be to use destinations, wilcards and network filters.

          Show
          Rob Davies added a comment - The only 'safe' way to do this is the simple fix suggested by Howard. If we try and OR selectors together - we have to deal with the timing around updating that selector state - and this could be prone to some difficult to track down timing issues. If you want to shape the traffic across networks, the best way would be to use destinations, wilcards and network filters.
          Rob Davies made changes -
          Status Resolved [ 5 ] Reopened [ 4 ]
          Resolution Fixed [ 1 ]
          Hide
          Rob Davies added a comment -

          Yep - not fixed

          Show
          Rob Davies added a comment - Yep - not fixed
          Hide
          Giovanni Toffetti added a comment -

          Thank you Howard.

          I'm afraid the temporary fix you propose wouldn't be of much use in my case. I am currently working at comparing how well our in-house distributed pub/sub solution routes packets with respect to ActiveMQ on a generic network of brokers: if I implement the temporary fix ActiveMQ performance wouldn't probably be as good as it should invalidating my test. I guess I'll have to find another MOM for the experiments while ActiveMQ gets fixed. Did any of you ever use any other JMS implementation supporting generic (cyclic) networks of brokers?

          Using pub/sub on networks of brokers must not be so common after all, I would have expected this bug to receive much more comments.

          Thank you again.

          Show
          Giovanni Toffetti added a comment - Thank you Howard. I'm afraid the temporary fix you propose wouldn't be of much use in my case. I am currently working at comparing how well our in-house distributed pub/sub solution routes packets with respect to ActiveMQ on a generic network of brokers: if I implement the temporary fix ActiveMQ performance wouldn't probably be as good as it should invalidating my test. I guess I'll have to find another MOM for the experiments while ActiveMQ gets fixed. Did any of you ever use any other JMS implementation supporting generic (cyclic) networks of brokers? Using pub/sub on networks of brokers must not be so common after all, I would have expected this bug to receive much more comments. Thank you again.
          Hide
          Howard Orner added a comment -

          Here's a description of how to fix it. Sorry, I don't have the source available to me right now to upload it, but it's pretty straight forward. I'm also doing this from memory so you may have to look at the code a bit and make sure I'm not missing anything.

          The ideal solution would actually apply an 'or' of all selectors before forwarding message between brokers. However that appears very difficult to do because of all the classes involved and the fact that the selector is parsed and cached where its used in other places. So instead, the org.apache.activemq.network.ConduitBridge and DurableConduitBridge were modified to change all incoming subscription selectors to null before adding the subscription to the list of interested consumers. This results in excess network traffic because all messages get forwarded regardless of whether the 'other' brokers will actually deliver them after they apply individual selectors. But it works.

          In ConduitBridge.addToAlreadyInterestedConsumers() remove the lines that check for a selector. Then put in a line that sets the selector to null before doCreateDemandSubscrition is called in createDemandSubcription. Do basically the same in DurableConduitBridge.

          Rob - If this hasn't really been fixed (it appears not to be), I and I'm sure others would appreciate it if you would incorporate this into the code. It would be better to take my original suggestion of 'or'ing selectors together, but that looks like a lot of code changes, so this at least fixes the bug with just a few lines of code.

          Show
          Howard Orner added a comment - Here's a description of how to fix it. Sorry, I don't have the source available to me right now to upload it, but it's pretty straight forward. I'm also doing this from memory so you may have to look at the code a bit and make sure I'm not missing anything. The ideal solution would actually apply an 'or' of all selectors before forwarding message between brokers. However that appears very difficult to do because of all the classes involved and the fact that the selector is parsed and cached where its used in other places. So instead, the org.apache.activemq.network.ConduitBridge and DurableConduitBridge were modified to change all incoming subscription selectors to null before adding the subscription to the list of interested consumers. This results in excess network traffic because all messages get forwarded regardless of whether the 'other' brokers will actually deliver them after they apply individual selectors. But it works. In ConduitBridge.addToAlreadyInterestedConsumers() remove the lines that check for a selector. Then put in a line that sets the selector to null before doCreateDemandSubscrition is called in createDemandSubcription. Do basically the same in DurableConduitBridge. Rob - If this hasn't really been fixed (it appears not to be), I and I'm sure others would appreciate it if you would incorporate this into the code. It would be better to take my original suggestion of 'or'ing selectors together, but that looks like a lot of code changes, so this at least fixes the bug with just a few lines of code.
          Hide
          Giovanni Toffetti added a comment - - edited

          Hi all,

          I'm also wondering how to get to the patch for this issue, it seems it's not included in the RC4 release of 5.1.0 as I can still reproduce it using 4 brokers.
          Did anybody manage to solve the problem?

          Thank you.

          Show
          Giovanni Toffetti added a comment - - edited Hi all, I'm also wondering how to get to the patch for this issue, it seems it's not included in the RC4 release of 5.1.0 as I can still reproduce it using 4 brokers. Did anybody manage to solve the problem? Thank you.
          Hide
          Vui Nguyen added a comment -

          Hello Rob. I have been assigned to fix this bug for a version of activemq at work. Since we need this fix fairly soon and it would be a shame to have to reinvent the wheel, I'd really appreciate it if you could share your solution to this bug. I'm actually working on this task for Howard Orner, who submitted the previous comments. Thanks.

          Show
          Vui Nguyen added a comment - Hello Rob. I have been assigned to fix this bug for a version of activemq at work. Since we need this fix fairly soon and it would be a shame to have to reinvent the wheel, I'd really appreciate it if you could share your solution to this bug. I'm actually working on this task for Howard Orner, who submitted the previous comments. Thanks.
          Hide
          Howard Orner added a comment -

          I do not see any subversion commits listed nor do I see code changes in files I would have expected to change on the main subversion trunk. Is there a way to get the code changes associated with this resolution? Thanks!

          Show
          Howard Orner added a comment - I do not see any subversion commits listed nor do I see code changes in files I would have expected to change on the main subversion trunk. Is there a way to get the code changes associated with this resolution? Thanks!
          Rob Davies made changes -
          Fix Version/s 5.1.0 [ 11802 ]
          Status Open [ 1 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Rob Davies made changes -
          Field Original Value New Value
          Assignee Rob Davies [ rajdavies ]
          Howard Orner created issue -

            People

            • Assignee:
              Rob Davies
              Reporter:
              Howard Orner
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development