Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-3728

Broker stops to accept messages

VotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Incomplete
    • 5.5.0
    • AGING_TO_DIE
    • None
    • None
    • Linux, 64bit, JDK 6u30 64bit

    Description

      I have following configuration:
      1.One producer
      2.One broker on the same machine as producer.
      3.One offline durable subscriber but I have enough storage size so it should not overflow.

      I run our import test and it sends about 10 messages/s. After some hours of run I start to get:
      many

      javax.jms.JMSException: org.apache.activemq.transport.RequestTimedOutIOException

      (I set send timeout 2s to avoid infinite block in case that broker storage is full. So first question is why it happens when there should be enough space. I will provide my broker configuration below.)

      Then I get:

      javax.jms.JMSException: Cannot send, channel has already failed: usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
      javax.jms.JMSException: Cannot send, channel has already failed: usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
      (ActiveMQ Connection Executor: tcp://usfr-cmsnpdev.insideidc.com/10.1.4.42:61616) Failed to send message.: javax.jms.JMSException: Broken pipe
      

      some more

      javax.jms.JMSException: Cannot send, channel has already failed: usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
      

      and then all the time:

      javax.jms.IllegalStateException: The Session is closed
      

      I have simple producer code. It is Spring bean. I create connection and session when bean is created. For every send message I create producer and close it:

      //Init code
      connection = connectionFactory.createConnection();
      connection.setExceptionListener(new ExceptionListener() {
          public void onException(JMSException ex) {
              logger.error("Failed to send message.", ex);
          }
      });
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      ....
      //Send code
          private void sendMessage(String xmlMessage, Topic topic, String containerId) {
              MessageProducer producer = null;
              try {
                  // Create the producer.
                  producer = session.createProducer(topic);
                  TextMessage txtMessage = session.createTextMessage(xmlMessage);
                  producer.send(txtMessage);
              } catch (JMSException ex) {
                  logger.error("", ex);
              } finally {
                  try {
                      if (producer != null) {
                          producer.close();
                      }
                  } catch (JMSException ex) {
                  }
              }
          }
      

      Use default connection factory:

          <bean id="idc_metacop_jmsFactory.prototype"
                class="org.apache.activemq.ActiveMQConnectionFactory"
                abstract="true">
              <property name="brokerURL" value="${metacop.jms.brokerURL}"/>
              <property name="sendTimeout" value="2000"/>
          </bean>
      

      Log from broker:

      2012-02-17 21:55:40,389 | INFO  | Transport failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: /10.1.4.42:56166 | org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
      2012-02-17 21:55:42,445 | INFO  | Transport failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: /10.1.4.42:39395 | org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
      

      Broker configuration:

         <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
              <destinationPolicy>
                  <policyMap>
                    <policyEntries>
                      <policyEntry topic=">" producerFlowControl="true" memoryLimit="200mb">
                      </policyEntry>
                      <policyEntry queue=">" producerFlowControl="true" memoryLimit="200mb">
                      </policyEntry>
                    </policyEntries>
                  </policyMap>
              </destinationPolicy>
      
              <managementContext>
                  <managementContext createConnector="false"/>
              </managementContext>
      
              <persistenceAdapter>
                  <kahaDB directory="${activemq.base}/data/kahadb"/>
              </persistenceAdapter>
              <systemUsage>
                  <systemUsage>
                      <memoryUsage>
                          <memoryUsage limit="500 mb"/>
                      </memoryUsage>
                      <storeUsage>
                          <storeUsage limit="2 gb"/>
                      </storeUsage>
                      <tempUsage>
                          <tempUsage limit="2 gb"/>
                      </tempUsage>
                  </systemUsage>
              </systemUsage>
      
              <transportConnectors>
                  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
              </transportConnectors>
      
          </broker>
      

      I can provide full call stacks if necessary. One thing which comes to my mind: When I have just one connection is call of producer.send thread safe? I do not have any synchronization there.

      BTW is there any way how to purge even undelivered messages from broker storage? Or how to monitor number of messages in storage? I tried to delete offline durable subscribers from Web Admin but data folder size does not change but I have no way how to see if messages was really removed from storage (and db is not compacted only or if messages stay in storage.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            mslama Marek Slama
            Votes:
            2 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment