Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Incomplete
-
5.5.0
-
None
-
None
-
Linux, 64bit, JDK 6u30 64bit
Description
I have following configuration:
1.One producer
2.One broker on the same machine as producer.
3.One offline durable subscriber but I have enough storage size so it should not overflow.
I run our import test and it sends about 10 messages/s. After some hours of run I start to get:
many
javax.jms.JMSException: org.apache.activemq.transport.RequestTimedOutIOException
(I set send timeout 2s to avoid infinite block in case that broker storage is full. So first question is why it happens when there should be enough space. I will provide my broker configuration below.)
Then I get:
javax.jms.JMSException: Cannot send, channel has already failed: usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
javax.jms.JMSException: Cannot send, channel has already failed: usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
(ActiveMQ Connection Executor: tcp://usfr-cmsnpdev.insideidc.com/10.1.4.42:61616) Failed to send message.: javax.jms.JMSException: Broken pipe
some more
javax.jms.JMSException: Cannot send, channel has already failed: usfr-cmsnpdev.insideidc.com/10.1.4.42:61616
and then all the time:
javax.jms.IllegalStateException: The Session is closed
I have simple producer code. It is Spring bean. I create connection and session when bean is created. For every send message I create producer and close it:
//Init code connection = connectionFactory.createConnection(); connection.setExceptionListener(new ExceptionListener() { public void onException(JMSException ex) { logger.error("Failed to send message.", ex); } }); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); .... //Send code private void sendMessage(String xmlMessage, Topic topic, String containerId) { MessageProducer producer = null; try { // Create the producer. producer = session.createProducer(topic); TextMessage txtMessage = session.createTextMessage(xmlMessage); producer.send(txtMessage); } catch (JMSException ex) { logger.error("", ex); } finally { try { if (producer != null) { producer.close(); } } catch (JMSException ex) { } } }
Use default connection factory:
<bean id="idc_metacop_jmsFactory.prototype" class="org.apache.activemq.ActiveMQConnectionFactory" abstract="true"> <property name="brokerURL" value="${metacop.jms.brokerURL}"/> <property name="sendTimeout" value="2000"/> </bean>
Log from broker:
2012-02-17 21:55:40,389 | INFO | Transport failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: /10.1.4.42:56166 | org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76 2012-02-17 21:55:42,445 | INFO | Transport failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: /10.1.4.42:39395 | org.apache.activemq.broker.TransportConnection.Transport | InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@fcc9c76
Broker configuration:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="200mb"> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="200mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="500 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="2 gb"/> </storeUsage> <tempUsage> <tempUsage limit="2 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> </transportConnectors> </broker>
I can provide full call stacks if necessary. One thing which comes to my mind: When I have just one connection is call of producer.send thread safe? I do not have any synchronization there.
BTW is there any way how to purge even undelivered messages from broker storage? Or how to monitor number of messages in storage? I tried to delete offline durable subscribers from Web Admin but data folder size does not change but I have no way how to see if messages was really removed from storage (and db is not compacted only or if messages stay in storage.