Uploaded image for project: 'ActiveMQ C++ Client'
  1. ActiveMQ C++ Client
  2. AMQCPP-543

message producer send never blocking when using producer flow control

Details

    • New Feature
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.8.2
    • 3.9.0
    • Openwire
    • None

    Description

      For testing,
      message producer is set to non-persisted mode, with the connection producer window size to 1MB. (the broker enables the producer flow control and set the memory limit to ~10MB with vm only storage)

      I notice that when i don't have any message consumer, the broker notify me that the memory limit is reached, that the producer will be throttled (as i would expect), however the producer never blocks on a send, as if the window size has no effect.

      while digging into ActiveMQProducerKernel.cpp,
      I notice the private member memoryUsage (auto_ptr) is never initialized. and there's a TODO in the code ?

      ActiveMQProducerKernel::ActiveMQProducerKernel(
      
      [...]
      
          // TODO - Check for need of MemoryUsage if there's a producer Windows size
          //        and the Protocol version is greater than 3.
      }
      

      I tried initializing the memoryUsage, and producer seem to block as expected on a send, when the limit is reached.

      ActiveMQProducerKernel::ActiveMQProducerKernel(
      
      [...]
      
          // TODO - Check for need of MemoryUsage if there's a producer Windows size
          //        and the Protocol version is greater than 3.
      
          if (session->getConnection()->getProducerWindowSize()) { 
                  this->memoryUsage.reset( new MemoryUsage(session->getConnection()->getProducerWindowSize()) );
          }
      }
      

      I'm not sure what is the proper fix,

      Attachments

        1. activemq-cpp_AMQCPP-543_v0.patch
          5 kB
          Christian Mamen

        Activity

          You'd need to inspect the Java OpenWire client code to see how everything works and then create tests that can validate your patch to ensure it works and there are no breakages.

          tabish Timothy A. Bish added a comment - You'd need to inspect the Java OpenWire client code to see how everything works and then create tests that can validate your patch to ensure it works and there are no breakages.

          Hi,
          I compare with the java client, and uploaded a patch that I think does pretty much what the java do.

          The java assume the latest version of the protocol by default and when it receives a wireFormatInfo command from the broker, saves the protocol version. this is done in an AtomicInteger.

          ActiveMQMessageProducer.java

                  // Enable producer window flow control if protocol > 3 and the window
                  // size > 0
                  if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
                      producerWindow = new MemoryUsage("Producer Window: " + producerId);
                      producerWindow.setExecutor(session.getConnectionExecutor());
                      producerWindow.setLimit(this.info.getWindowSize());
                      producerWindow.start();
                  }
          

          Can you explain why the comment says protocol > 3, but the code actually does >= 3? Perhaps you can also review the patch?

          cmamen Christian Mamen added a comment - Hi, I compare with the java client, and uploaded a patch that I think does pretty much what the java do. The java assume the latest version of the protocol by default and when it receives a wireFormatInfo command from the broker, saves the protocol version. this is done in an AtomicInteger. ActiveMQMessageProducer.java // Enable producer window flow control if protocol > 3 and the window // size > 0 if (session.connection.getProtocolVersion() >= 3 && this .info.getWindowSize() > 0) { producerWindow = new MemoryUsage( "Producer Window: " + producerId); producerWindow.setExecutor(session.getConnectionExecutor()); producerWindow.setLimit( this .info.getWindowSize()); producerWindow.start(); } Can you explain why the comment says protocol > 3, but the code actually does >= 3? Perhaps you can also review the patch?

          Applied fix on trunk.

          tabish Timothy A. Bish added a comment - Applied fix on trunk.

          People

            tabish Timothy A. Bish
            cmamen Christian Mamen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: