Issue Details (XML | Word | Printable)

Key: AMQ-882
Type: Bug Bug
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: Unassigned
Reporter: Kai Pruente
Votes: 0
Watchers: 0
Operations

If you were logged in you would be able to see more operations.
ActiveMQ

TopicPublisher.publish(topicSession.createTextMessage("Hello World") hangs and throws a JMSException

Created: 17/Aug/06 06:08 AM   Updated: 17/Aug/06 07:40 PM
Return to search
Component/s: Transport
Affects Version/s: 4.0.1
Fix Version/s: 4.0.1, 4.0.2, 4.1.0

Time Tracking:
Not Specified

Environment:
Server: Suse Linux 2.6.5-7.244-smp, JDK 1.5.0_07
Client: Windows XP SP2, JDK 1.5.0_06


 Description  « Hide
Scenario:
ActiveMQ and the publisher process running on the same server.
Several clients are running on several Windows-XP clients

Publisher code:

// initializing
connection = msgFactory.createTopicConnection();
connection.setExceptionListener(new JMSExceptionListener());
connection.start();

topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(MFS_LOCATION_CHANGE_EVENT_TOPIC);

publisher = topicSession.createPublisher(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
[...]
// sending several times with the same topic
try {
  publisher.publish(topicSession.createTextMessage(location.getName()));
} catch (JMSException e) {
  log.fatal("Problems during informing Workplace topic:" + location.getName(), e);
  [...]

Subscriber code:

topicConnection = msgFactory.createTopicConnection();
topicConnection.start();
topicConnection.setExceptionListener(new JMSExceptionListener(listeners, this));

topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(EventSender.MFS_LOCATION_CHANGE_EVENT_TOPIC);

topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
          if (message instanceof TextMessage) {
[...]
      topicConnection.start();

After some thound of messages publish hangs for more than 1 minute and then throws a JMSException (see logs below). After searching in ActiveMQ mailing lists I changed the broker url of the publisher from:

  • tcp://arvwms:61616
    to
  • tcp://arvwms:61616?soTimeout=2000&connectionTimeout=10000&socketBufferSize=1024&wireFormat.maxInactivityDuration=0

This works some days fine, but now we get the exceptions again.

Is this a problem between topic publisher and ActiveMQ or could it be also a problem between ActiveMQ and topic subscriber? If it is real a problem between publisher and ActiveMQ it can't be a network problem, becuase it is the same server.

This problems occur about 10 times the day. With the exception and losing of 10 messages the day I could live, but the hanging about 1 minute is terrible for our application.

The Exception:

DEBUG 2006-08-17 12:45:53.738 portConfirmationDefaultHandler :    -:     - handle telegram: com.ssn.acx.extensions.logistics.mfsadapter.telegram.receiving.TransportOrderCompletionTelegram@1385c9f[ID=139887884,loadUnit=01900,lastLocation=SCS_CS,weight=<null>,orientation=0,infoType=COMPLETE,wmsID=1039340,reason=OK]
FATAL 2006-08-17 12:47:12.534 EventSender                    :    -:     - Problems during informing Workplace topic:SCS_CS
javax.jms.JMSException: Broken pipe
  at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1094)
  at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1553)
  at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:462)
  at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:356)
  at org.apache.activemq.ActiveMQTopicPublisher.publish(ActiveMQTopicPublisher.java:128)
  at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.sendLocationChangeEvent(EventSender.java:150)
  at com.ssn.acx.extensions.logistics.mfsadapter.MFSTransactionWithSendingTrigger.commit(MFSTransactionWithSendingTrigger.java:109)
  at com.ssn.acx.core.common.transaction.GlobalTransactionImpl.commit(GlobalTransactionImpl.java:198)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.handleTelegram(TelegramDispatcher.java:306)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.dispatch(TelegramDispatcher.java:180)
  at com.ssn.acx.core.common.adapterservice.AbstractCollector.dispatch(AbstractCollector.java:81)
  at com.ssn.acx.api.common.adapterservice.TriggeredCollector.dispatch(TriggeredCollector.java:87)
  at com.ssn.acx.core.logistics.mfsadapter.MFSCollector.collectTelegrams(MFSCollector.java:122)
  at com.ssn.acx.core.logistics.mfsadapter.WakeUpListener.run(WakeUpListener.java:142)
  at java.lang.Thread.run(Thread.java:595)
Caused by: java.net.SocketException: Broken pipe
  at java.net.SocketOutputStream.socketWrite0(Native Method)
  at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
  at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
  at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:108)
  at java.io.DataOutputStream.flush(DataOutputStream.java:106)
  at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:125)
  at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
  at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
  at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
  at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
  at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1092)
  ... 14 more
WARN  2006-08-17 12:47:12.555 EventSender                    :    -:     - Destroy EventSender and cleanup JMS resources failed! Caught: javax.jms.JMSException: Cannot write to the stream any more it has already been closed
javax.jms.JMSException: Cannot write to the stream any more it has already been closed
  at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1094)
  at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1655)
  at org.apache.activemq.ActiveMQMessageProducer.close(ActiveMQMessageProducer.java:315)
  at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.destroy(EventSender.java:84)
  at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.sendLocationChangeEvent(EventSender.java:155)
  at com.ssn.acx.extensions.logistics.mfsadapter.MFSTransactionWithSendingTrigger.commit(MFSTransactionWithSendingTrigger.java:109)
  at com.ssn.acx.core.common.transaction.GlobalTransactionImpl.commit(GlobalTransactionImpl.java:198)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.handleTelegram(TelegramDispatcher.java:306)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.dispatch(TelegramDispatcher.java:180)
  at com.ssn.acx.core.common.adapterservice.AbstractCollector.dispatch(AbstractCollector.java:81)
  at com.ssn.acx.api.common.adapterservice.TriggeredCollector.dispatch(TriggeredCollector.java:87)
  at com.ssn.acx.core.logistics.mfsadapter.MFSCollector.collectTelegrams(MFSCollector.java:122)
  at com.ssn.acx.core.logistics.mfsadapter.WakeUpListener.run(WakeUpListener.java:142)
  at java.lang.Thread.run(Thread.java:595)
Caused by: java.io.EOFException: Cannot write to the stream any more it has already been closed
  at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.checkClosed(TcpBufferedOutputStream.java:131)
  at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:69)
  at java.io.DataOutputStream.writeInt(DataOutputStream.java:180)
  at org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:238)
  at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:124)
  at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
  at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
  at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
  at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
  at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1092)
  ... 13 more
INFO  2006-08-17 12:47:12.555 EventSender                    :    -:     - Destroyed event sender


 All   Comments   Work Log   Change History   Subversion Commits   FishEye   Crucible      Sort Order: Ascending order - Click to sort in descending order
James Strachan added a comment - 17/Aug/06 07:40 PM
This problem looks like the socket is being closed for some reason on your network. Not sure why - could be a dodgy router or some intermittent issue.

The workaround is to enable failover (putting failover: before the tcp

http://incubator.apache.org/activemq/how-can-i-support-auto-reconnection.html