Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-5282

Sender timeouts may allow the peer to suffer an AMQFrameDecodingException

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      If an IoSender#send suffers a timeout (IoSender.java#159) whilst awaiting space in the sender's buffer, the socket which was being written to is left open and a SenderException is thrown to the caller. The bytes that were to be sent are lost. If the socket is subsequent written to again, the peer may suffer an AMQFrameDecodingException as is tries to decode an incomplete stream of data.

      We saw this scenario in a support call. The working theory is that the application'sJVM (using a JMS consumer) was under pressue, and this caused TCP/IP back pressure to be felt by the broker. The Broker IoSender#send method timed-out, but as the socket remained open, when a later message was sent to the same JMS consumer application, the consuming application failed with an AMQFrameDecodingException: the lost bytes meant it tried to process message payload as if it were an AMQFrame.

      The two exceptions of interest are (reproduced on trunk):

      Broker side:

      2013-10-31 14:41:18,471 ERROR [IoReceiver - /127.0.0.1:63867] (v0_8.AMQProtocolEngine) - Error informing channel that receiving is complete. Channel: [/127.0.0.1:63867(guest):1]org.apache.qpid.transport.SenderException: write timed out: -2147471360, -2147475456
          at org.apache.qpid.transport.network.io.IoSender.send(IoSender.java:159)
          at org.apache.qpid.transport.network.io.IoSender.send(IoSender.java:40)
          at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.writeFrame(AMQProtocolEngine.java:689)
          at org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterImpl.writeFrame(ProtocolOutputConverterImpl.java:337)
          at org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:127)
          at org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:97)
          at org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterImpl.writeDeliver(ProtocolOutputConverterImpl.java:72)
          at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine$WriteDeliverMethod.deliverToClient(AMQProtocolEngine.java:1679)
          at org.apache.qpid.server.protocol.v0_8.SubscriptionImpl.sendToClient(SubscriptionImpl.java:693)
          at org.apache.qpid.server.protocol.v0_8.SubscriptionImpl$AckSubscription.send(SubscriptionImpl.java:303)
          at org.apache.qpid.server.queue.SimpleAMQQueue.deliverMessage(SimpleAMQQueue.java:826)
          at org.apache.qpid.server.queue.SimpleAMQQueue.deliverToSubscription(SimpleAMQQueue.java:745)
          at org.apache.qpid.server.queue.SimpleAMQQueue.enqueue(SimpleAMQQueue.java:693)
          at org.apache.qpid.server.protocol.v0_8.AMQChannel$MessageDeliveryAction.postCommit(AMQChannel.java:1245)
          at org.apache.qpid.server.protocol.v0_8.AMQChannel$AsyncCommand.complete(AMQChannel.java:1623)
          at org.apache.qpid.server.protocol.v0_8.AMQChannel.sync(AMQChannel.java:1593)
          at org.apache.qpid.server.protocol.v0_8.AMQChannel.receivedComplete(AMQChannel.java:218)
          at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.receivedComplete(AMQProtocolEngine.java:324)
          at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.received(AMQProtocolEngine.java:304)
          at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.received(AMQProtocolEngine.java:104)
          at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:131)
          at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:47)
          at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
          at java.lang.Thread.run(Thread.java:662)
      

      Then later client side:

      2013-10-31 14:47:49,099 DEBUG [Dispatcher-1-Conn-1] [Dispatcher] Dispatcher-1-Conn-1 thread terminating for channel 1:org.apache.qpid.client.AMQSession_0_8@560932fe
      javax.jms.JMSException: Message consumer forcibly closed due to error: org.apache.qpid.framing.AMQFrameDecodingException: End of frame marker not found. Read 53 length=65527 type=3
      	at org.apache.qpid.client.BasicMessageConsumer.returnMessageOrThrow(BasicMessageConsumer.java:531)
      	at org.apache.qpid.client.BasicMessageConsumer.receive(BasicMessageConsumer.java:419)
      	at org.apache.qpid.client.BasicMessageConsumer.receive(BasicMessageConsumer.java:393)
      	at org.apache.qpid.example.Hello.runTest(Hello.java:60)
      	at org.apache.qpid.example.Hello.main(Hello.java:40)
      Caused by: org.apache.qpid.framing.AMQFrameDecodingException: End of frame marker not found. Read 53 length=65527 type=3
      	at org.apache.qpid.framing.AMQDataBlockDecoder.createAndPopulateFrame(AMQDataBlockDecoder.java:104)
      	at org.apache.qpid.codec.AMQDecoder.decodeBuffer(AMQDecoder.java:250)
      	at org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:451)
      	at org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:1)
      	at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
      	at java.lang.Thread.run(Thread.java:662)
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            orudyy Alex Rudyy
            kwall Keith Wall
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment