ActiveMQ
  1. ActiveMQ
  2. AMQ-3591

bridge transport is stuck on socket write at both ends

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 5.5.0
    • Fix Version/s: None
    • Component/s: Transport
    • Labels:
      None
    • Environment:

      2 redhat linux

      Description

      We have embedded (into Jboss 7 web app) broker A on one linux connecting through tcp duplex=true bridge to another standalone broker B. Both brokers exchange messages through 4 common queues. Each broker has its own persistence storage in mssql db.

      After a few hours of flawless work the bridge stops delivering the messages in both directions.

      broker A offending thread stack trace (causing the rest to wait on oneway mutex):
      BrokerService[as2analyticsEmbedded] Task-15 [RUNNABLE, IN_NATIVE] CPU time: 0:00
      java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
      java.net.SocketOutputStream.socketWrite(byte[], int, int)
      java.net.SocketOutputStream.write(byte[], int, int)
      org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(byte[], int, int)
      java.io.DataOutputStream.write(byte[], int, int)
      org.apache.activemq.openwire.v7.BaseDataStreamMarshaller.tightMarshalByteSequence2(ByteSequence, DataOutput, BooleanStream)
      org.apache.activemq.openwire.v7.MessageMarshaller.tightMarshal2(OpenWireFormat, Object, DataOutput, BooleanStream)
      org.apache.activemq.openwire.v7.ActiveMQMessageMarshaller.tightMarshal2(OpenWireFormat, Object, DataOutput, BooleanStream)
      org.apache.activemq.openwire.v7.ActiveMQObjectMessageMarshaller.tightMarshal2(OpenWireFormat, Object, DataOutput, BooleanStream)
      org.apache.activemq.openwire.OpenWireFormat.marshal(Object, DataOutput)
      org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
      org.apache.activemq.transport.InactivityMonitor.oneway(Object)
      org.apache.activemq.transport.TransportFilter.oneway(Object)
      org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
      org.apache.activemq.transport.failover.FailoverTransport.oneway(Object)
      org.apache.activemq.transport.MutexTransport.oneway(Object)
      org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
      org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
      org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
      org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
      org.apache.activemq.transport.TransportFilter.onCommand(Object)
      org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener, Object)
      org.apache.activemq.transport.vm.VMTransport.oneway(Object)
      org.apache.activemq.transport.MutexTransport.oneway(Object)
      org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
      org.apache.activemq.broker.TransportConnection.dispatch(Command)
      org.apache.activemq.broker.TransportConnection.processDispatch(Command)
      org.apache.activemq.broker.TransportConnection.iterate()
      org.apache.activemq.thread.PooledTaskRunner.runTask()
      org.apache.activemq.thread.PooledTaskRunner$1.run()
      java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Runnable)
      java.util.concurrent.ThreadPoolExecutor$Worker.run()
      java.lang.Thread.run()

      broker B offending thread stack trace (causing the rest to wait on oneway mutex):

      ActiveMQ Connection Dispatcher: vm://analyticsCentral#6 [RUNNABLE, IN_NATIVE] CPU time: 0:00
      java.net.SocketOutputStream.socketWrite0(FileDescriptor, byte[], int, int)
      java.net.SocketOutputStream.socketWrite(byte[], int, int)
      java.net.SocketOutputStream.write(byte[], int, int)
      org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush()
      java.io.DataOutputStream.flush()
      org.apache.activemq.transport.tcp.TcpTransport.oneway(Object)
      org.apache.activemq.transport.InactivityMonitor.oneway(Object)
      org.apache.activemq.transport.TransportFilter.oneway(Object)
      org.apache.activemq.transport.WireFormatNegotiator.oneway(Object)
      org.apache.activemq.transport.MutexTransport.oneway(Object)
      org.apache.activemq.transport.ResponseCorrelator.asyncRequest(Object, ResponseCallback)
      org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(Command)
      org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(Object)
      org.apache.activemq.transport.ResponseCorrelator.onCommand(Object)
      org.apache.activemq.transport.TransportFilter.onCommand(Object)
      org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport, TransportListener, Object)
      org.apache.activemq.transport.vm.VMTransport.oneway(Object)
      org.apache.activemq.transport.MutexTransport.oneway(Object)
      org.apache.activemq.transport.ResponseCorrelator.oneway(Object)
      org.apache.activemq.broker.TransportConnection.dispatch(Command)
      org.apache.activemq.broker.TransportConnection.processDispatch(Command)
      org.apache.activemq.broker.TransportConnection.iterate()
      org.apache.activemq.thread.DedicatedTaskRunner.runTask()
      org.apache.activemq.thread.DedicatedTaskRunner$1.run()

      Both brokers run on a local 1 Gbit network. This scenario happens always after few hours of work and is recreated consistently.

      Broker A relevant configuration:
      <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:amq="http://activemq.apache.org/schema/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"
      >

      <broker xmlns="http://activemq.apache.org/schema/core" brokerName="as2analyticsEmbedded" persistent="true" dataDirectory="/home/as2/amq_storage" destroyApplicationContextOnStop="true">
      <destinationPolicy>
      <policyMap>
      <policyEntries>
      <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
      <pendingQueuePolicy>
      <fileQueueCursor/>
      </pendingQueuePolicy>
      </policyEntry>
      </policyEntries>
      </policyMap>
      </destinationPolicy>

      <managementContext>
      <managementContext createConnector="false"/>
      </managementContext>

      <networkConnectors>
      <networkConnector uri="static:(failover:(tcp://10.0.213.38:61616))"
      name="bridge"
      duplex="true"
      conduitSubscriptions="true"
      decreaseNetworkConsumerPriority="false">
      </networkConnector>
      </networkConnectors>

      <!-- both FS and DB must be provided. AMQ will use DB for long-term storage.
      FS contents that has not been consumed yet is stored to DB from time to time -->
      <persistenceAdapter>
      <jdbcPersistenceAdapter dataDirectory="/home/as2/amq_storage/as2analyticsEmbedded" dataSource="#mssql-ds" useDatabaseLock="false">
      <adapter>
      <imageBasedJDBCAdaptor/>
      </adapter>
      </jdbcPersistenceAdapter>
      </persistenceAdapter>

      <systemUsage>
      <systemUsage>
      <memoryUsage>
      <memoryUsage limit="2gb"/>
      </memoryUsage>
      <storeUsage>
      <storeUsage limit="80gb"/>
      </storeUsage>
      <tempUsage>
      <tempUsage limit="10gb"/>
      </tempUsage>
      </systemUsage>
      </systemUsage>

      <transportConnectors>
      <transportConnector name="as2analyticsEmbeddedConnector" uri="vm://as2analyticsEmbedded"/>
      </transportConnectors>

      </broker>

      <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
      <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;databaseName=obfuscated"/>
      <property name="username" value="obfuscated"/>
      <property name="password" value="obfuscated"/>
      <property name="initialSize" value="1"/>
      <property name="maxActive" value="100"/>
      <property name="poolPreparedStatements" value="true"/>
      </bean>

      </beans>

      Broker B relevant configuration:

      <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:amq="http://activemq.apache.org/schema/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

      <!-- Allows us to use system properties as variables in this configuration file -->
      <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
      <property name="locations">
      <value>file:$

      {activemq.base}/conf/credentials.properties</value>
      </property>
      </bean>

      <!--
      The <broker> element is used to configure the ActiveMQ broker.
      -->
      <broker xmlns="http://activemq.apache.org/schema/core" brokerName="analyticsCentral" dataDirectory="${activemq.base}

      /data" destroyApplicationContextOnStop="true">

      <!--
      For better performances use VM cursor and small memory limit.
      For more information, see:

      http://activemq.apache.org/message-cursors.html

      Also, if your producer is "hanging", it's probably due to producer flow control.
      For more information, see:
      http://activemq.apache.org/producer-flow-control.html
      -->

      <destinationPolicy>
      <policyMap>
      <policyEntries>
      <policyEntry queue=">" producerFlowControl="false" memoryLimit="1gb">
      <pendingQueuePolicy>
      <fileQueueCursor/>
      </pendingQueuePolicy>
      </policyEntry>
      <policyEntry queue=">">
      <deadLetterStrategy>
      <individualDeadLetterStrategy
      queuePrefix="DLQ." useQueueForQueueMessages="true" />
      </deadLetterStrategy>
      </policyEntry>
      </policyEntries>
      </policyMap>
      </destinationPolicy>

      <!--
      The managementContext is used to configure how ActiveMQ is exposed in
      JMX. By default, ActiveMQ uses the MBean server that is started by
      the JVM. For more information, see:

      http://activemq.apache.org/jmx.html
      -->
      <managementContext>
      <managementContext createConnector="false"/>
      </managementContext>

      <!--
      Configure message persistence for the broker. The default persistence
      mechanism is the KahaDB store (identified by the kahaDB tag).
      For more information, see:

      http://activemq.apache.org/persistence.html
      -->
      <persistenceAdapter>
      <jdbcPersistenceAdapter dataDirectory="$

      {activemq.base}

      /data" dataSource="#mssql-ds" useDatabaseLock="false">
      <adapter>
      <imageBasedJDBCAdaptor/>
      </adapter>
      </jdbcPersistenceAdapter>
      </persistenceAdapter>

      <systemUsage>
      <systemUsage>
      <memoryUsage>
      <memoryUsage limit="2gb"/>
      </memoryUsage>
      <storeUsage>
      <storeUsage limit="10gb"/>
      </storeUsage>
      <tempUsage>
      <tempUsage limit="5gb"/>
      </tempUsage>
      </systemUsage>
      </systemUsage>

      <!--
      The transport connectors expose ActiveMQ over a given protocol to
      clients and other brokers. For more information, see:

      http://activemq.apache.org/configuring-transports.html
      -->
      <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
      </transportConnectors>

      </broker>

      <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost">
      <amq:redeliveryPolicy>
      <amq:redeliveryPolicy maximumRedeliveries="0"/>
      </amq:redeliveryPolicy>
      </amq:connectionFactory>

      <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
      <property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
      <property name="url" value="jdbc:sqlserver://10.0.213.159:1433;DatabaseName=obfuscated"/>
      <property name="username" value="obfuscated"/>
      <property name="password" value="obfuscated"/>
      <property name="initialSize" value="1"/>
      <property name="maxActive" value="100"/>
      <property name="poolPreparedStatements" value="true"/>
      </bean>

      <!--
      Enable web consoles, REST and Ajax APIs and demos
      It also includes Camel (with its web console), see $

      {ACTIVEMQ_HOME}/conf/camel.xml for more info

      Take a look at ${ACTIVEMQ_HOME}

      /conf/jetty.xml for more details
      -->
      <import resource="jetty.xml"/>

      </beans>

        Activity

        No work has yet been logged on this issue.

          People

          • Assignee:
            Unassigned
            Reporter:
            Leon Fleysher
          • Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:

              Development