Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-7229

JmsConnector handling loss of connection [ActiveMQConnection {id=ID:xs-38677-1560088234347-2:100,clientId=ID:xs.test.int-38677-1560088234347-1:100,started=true}] | org.apache.activemq.network.jms.JmsConnector | ActiveMQ Session Task-8514

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Blocker
    • Resolution: Unresolved
    • 5.15.8, 5.15.9, 5.15.10, 5.15.12
    • None
    • Broker, Camel, Connector, JMS client

    Description

      Periodically I am getting the following logged to my production and dev AMQ instances;

      JmsConnector handling loss of connection [ActiveMQConnection
      
      {id=ID:server-38677-1560088234347-2:100,clientId=ID:xs.test.int-38677-1560088234347-1:100,started=true}]|
      org.apache.activemq.network.jms.JmsConnector| ActiveMQ Session Task-8514
      

      This come at random times from two separate servers on separate networks (connecting to a common upstream provider) These errors ALWAYS come in groups of 11, with the time spanning 11 sec - both servers see the same error at the same time (Indicating something at the far end maybe?)

      When I check the connection count in a JMX console it has increased by 11, with none of the old connections seeming to close or get cleaned up.

      System config:

      JMS-JMS Bridge with embedded broker bridging topics from a remote server to local server. Local clients connect with nio+stomp and all have persistent subscriptions to topics.

      The remote connection is openwire.

      When this error occurs it leads to a corrupted message which then causes the DB store to grow until an OOM error occurs and AMQ stops processing. The only way to resolve the issue is to stop all the connected clients and clear there subscriptions and then reconnect.

      I have done wireshark on a dev broker but can't see anything obvious though wireshark doesn't like to dissect openwire so I can't see what's actually going on.

      This error sometimes occurs days apart, or like today multiple times a day.

      The only reference I see in the source code is line 496

       

      The error logged in debug is:

      2019-04-02 19:29:20,360 | TRACE | ID:xs-39837-1553964750839-4:140:2 sending message: ActiveMQTextMessage {commandId = 3319310, responseRequired = false, mes
       sageId = ID:xs-39837-1553964750839-4:140:2:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:xs-39837-1553964750839-4:140:2:1,
       destination = topic://TD_ALL, transactionId = null, expiration = 0, timestamp = 1554229760360, arrival = 0, brokerInTime = 1554229755115, brokerOutTime = 15
       54229755118, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
        compressed = true, userID = null, content = org.apache.activemq.util.ByteSequence@2a02b7e9, marshalledProperties = org.apache.activemq.util.ByteSequence@7d
       db0ca1, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = \\\{"SF_MSG":{"time":"1554229754000","area_id":...r":"2W58"}}} | org.apache.activemq.ActiveMQSession | ActiveMQ Session Task-4447
       2019-04-02 19:29:20,360 | TRACE | Running task iteration 0 - vm://xs.test.int#279 | org.apache.activemq.thread.PooledTaskRunner | ActiveMQ VMTransport: vm://xs.test.int#279-1
       2019-04-02 19:29:20,400 | DEBUG | Error occured while processing sync command: ActiveMQTextMessage {commandId = 7, responseRequired = true, messageId = ID:xs-39837-1553964750839-4:140:2:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:xs-39837-1553964750839-4:140:2:1, destination = topic://TD_ALL, transactionId = null, expiration = 0, timestamp = 1554229760360, arrival = 0, brokerInTime = 1554229760361, brokerOutTime = 1554229755118, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = true, userID = null, content = org.apache.activemq.util.ByteSequence@2a02b7e9, marshalledProperties = org.apache.activemq.util.ByteSequence@7ddb0ca1, dataStructure = null, redeliveryCounter = 0, size = 1595, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = \\{"SF_MSG":{"time":"1554229754000","area_id":...r":"2W58"}}}, exception: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ VMTransport: vm://xs.test.int#279-1
       java.io.EOFException
               at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:340)[:1.8.0_201]
               at java.io.DataInputStream.readUTF(DataInputStream.java:589)[:1.8.0_201]
               at java.io.DataInputStream.readUTF(DataInputStream.java:564)[:1.8.0_201]
               at org.apache.activemq.util.MarshallingSupport.unmarshalPrimitiveMap(MarshallingSupport.java:97)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.util.MarshallingSupport.unmarshalPrimitiveMap(MarshallingSupport.java:78)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.util.MarshallingSupport.unmarshalPrimitiveMap(MarshallingSupport.java:70)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.command.Message.unmarsallProperties(Message.java:252)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.command.Message.getProperty(Message.java:202)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy.add(RetainedMessageSubscriptionRecoveryPolicy.java:53)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:756)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:556)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.region.Topic.send(Topic.java:484)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.region.DestinationFilter.send(DestinationFilter.java:138)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:508)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:459)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:293)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:226)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:578)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:275)[activemq-broker-5.15.9.jar:5.15.9]
               at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)[activemq-client-5.15.9.jar:5.15.9]
               at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)[activemq-client-5.15.9.jar:5.15.9]
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[:1.8.0_201]
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[:1.8.0_201]
               at java.lang.Thread.run(Thread.java:748)[:1.8.0_201]

       

      If I front the connection to the upstream server by using camel - I get the error below which matches up with the error above.. I am currently running camel in full debug to hopefully catch another event. I'm unsure if using the camel configuration will stop the eventual OOM conditrion as seen above but I will try and configure a test client later to watch.

      2019-06-15 19:28:23,855 | WARN  | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - javax.jms.JMSException: java.io.EOFException] | org.apache.camel.component.jms.EndpointMessageListener | Camel (Piper_Bravo) thread #1 - JmsConsumer[TD_ALL_SIG_AREA]
      org.apache.camel.RuntimeCamelException: javax.jms.JMSException: java.io.EOFException
              at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:209)[camel-jms-2.23.0.jar:2.23.0]
              at org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:235)[camel-jms-2.23.0.jar:2.23.0]
              at org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:258)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:220)[camel-jms-2.23.0.jar:2.23.0]
              at org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:170)[camel-jms-2.23.0.jar:2.23.0]
              at org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:94)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:115)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:75)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:34)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)[camel-core-2.23.0.jar:2.23.0]
              at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)[camel-jms-2.23.0.jar:2.23.0]
              at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)[spring-jms-4.3.18.RELEASE.jar:4.3.18.RELEASE]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[:1.8.0_212]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[:1.8.0_212]
              at java.lang.Thread.run(Thread.java:748)[:1.8.0_212]

       

      DEBUG Capture of good message:

      2019-06-26 18:50:43,461 | DEBUG |
      nrod://topic:TD_ALL_SIG_AREA?clientId=test.broker1%40test&durableSubscriptionName=td-test-test
      consumer received JMS message: ActiveMQTextMessage {commandId = 2113029,
      responseRequired = false, messageId =
      ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:1:1:90442,
      originalDestination = null, originalTransactionId = null, producerId =
      ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:1:1,
      destination = topic://TD_ALL_SIG_AREA, transactionId = null, expiration =
      1561571743449, timestamp = 1561571443449, arrival = 0, brokerInTime =
      1561571443450, brokerOutTime = 1561571443453, correlationId = null, replyTo
      = null, persistent = true, type = null, priority = 4, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = true, userID =
      null, content = org.apache.activemq.util.ByteSequence@5b0352b8,
      marshalledProperties = org.apache.activemq.util.ByteSequence@68ec1f70,
      dataStructure = null, redeliveryCounter = 0, size = 0, properties =
      {transformation=jms-xml}, readOnlyProperties = true, readOnlyBody = true,
      droppable = false, jmsXGroupFirstForConsumer = false, text =
      [{"CA_MSG":{"to":"0335","time":"1561571443000...ata":"CA"}}]} |
      org.apache.camel.component.jms.EndpointMessageListener | Camel (Piper_Alpha)
      thread #1 - JmsConsumer[TD_ALL_SIG_AREA]

       

      DEBUG Capture of bad message:

      2019-06-26 18:50:43,202 | DEBUG |
      nrod://topic:TD_ALL_SIG_AREA?clientId=test.broker1%40test&durableSubscriptionName=td-test-test
      consumer received JMS message: ActiveMQTextMessage {commandId = 2113027,
      responseRequired = false, messageId =
      ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:4:7:90688,
      originalDestination = null, originalTransactionId = null, producerId =
      ID:opendata-backend.rockshore.net-45497-1560093463062-11:26340:4:7,
      destination = topic://TD_ALL_SIG_AREA, transactionId = null, expiration =
      1561571743189, timestamp = 1561571443189, arrival = 0, brokerInTime =
      1561571443190, brokerOutTime = 1561571443194, correlationId = null, replyTo
      = null, persistent = true, type = null, priority = 4, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = true, userID =
      null, content = org.apache.activemq.util.ByteSequence@7eea0857,
      marshalledProperties = org.apache.activemq.util.ByteSequence@dc68e3d,
      dataStructure = null, redeliveryCounter = 0, size = 0, properties = null,
      readOnlyProperties = true, readOnlyBody = true, droppable = false,
      jmsXGroupFirstForConsumer = false, text =
      [{"SF_MSG":{"time":"1561571442000","area_id":...ata":"40"}}]} |
      org.apache.camel.component.jms.EndpointMessageListener | Camel (Piper_Alpha)
      thread #1 - JmsConsumer[TD_ALL_SIG_AREA]

      In the JMS header a good message has;

      properties = {transformation=jms-xml}

      But in a 'bad' message it only has

      properties = null 

      Attachments

        1. Screenshot_20220901_160619.jpg
          231 kB
          mohdngah_777
        2. IMG_20220825_135926.jpg
          2.45 MB
          mohdngah_777

        Activity

          People

            jbonofre Jean-Baptiste Onofré
            jj-amq JJ
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m