Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-6968

[Java Broker] Decoding of pipelined AMQP 0-9.x frames can fail when multiple frames are received as part of the same byte buffer

    XMLWordPrintableJSON

Details

    Description

      Decoding of pipelined protocol initiation and frames for connection opening (ConnectionStartOk, etc) might fail with AMQFrameDecodingException when protocol initiation and following frames (connection start ok, connection tune ok, etc) are received as part of the same byte buffer.

      The following exception is reported on frame decoding:

      016-01-04 18:31:37,152         ERROR [IO-/127.0.0.1:33680] o.a.q.s.p.v.AMQPConnection_0_8 Unexpected exception
      org.apache.qpid.framing.AMQFrameDecodingException: Unsupported frame type: 65
              at org.apache.qpid.codec.AMQDecoder.processFrame(AMQDecoder.java:215) ~[qpid-common-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.doProcessFrame(BrokerDecoder.java:118) ~[qpid-broker-plugins-amqp-0-8-protocol-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processFrame(BrokerDecoder.java:65) ~[qpid-broker-plugins-amqp-0-8-protocol-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.codec.AMQDecoder.processInput(AMQDecoder.java:185) ~[qpid-common-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.codec.AMQDecoder.decode(AMQDecoder.java:125) ~[qpid-common-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.codec.ServerDecoder.decodeBuffer(ServerDecoder.java:43) ~[qpid-common-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8$1.run(AMQPConnection_0_8.java:266) [qpid-broker-plugins-amqp-0-8-protocol-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8$1.run(AMQPConnection_0_8.java:258) [qpid-broker-plugins-amqp-0-8-protocol-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at java.security.AccessController.doPrivileged(Native Method) [na:1.7.0_80]
              at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.received(AMQPConnection_0_8.java:257) [qpid-broker-plugins-amqp-0-8-protocol-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.MultiVersionProtocolEngine$SelfDelegateProtocolEngine.received(MultiVersionProtocolEngine.java:526) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:142) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.NonBlockingConnection.processAmqpData(NonBlockingConnection.java:547) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.NonBlockingConnectionPlainDelegate.processData(NonBlockingConnectionPlainDelegate.java:58) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.NonBlockingConnection.doRead(NonBlockingConnection.java:446) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:253) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:108) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.processConnection(SelectorThread.java:499) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.SelectorThread$SelectionTask.performSelect(SelectorThread.java:337) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.SelectorThread$SelectionTask.run(SelectorThread.java:86) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:457) [qpid-broker-core-6.1.0-SNAPSHOT.jar:6.1.0-SNAPSHOT]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_80]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_80]
              at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]
      

      After receiving protocol initiation QpidByteBuffer.BufferDataInput#_offset is set to 8. On decoding ConnectionStartOk, invocation of org.apache.qpid.codec.AMQDecoder#decodable sets the QpidByteBuffer position to 0 after parsing frame body size and reseting of QpidByteBuffer.BufferDataInput.

      QpidByteBuffer.BufferDataInput#reset sets the position of byte buffer without accounting for QpidByteBuffer.BufferDataInput#_offset:

              public void reset()
              {
                  _buffer.position(_mark);
              }
      

      IMHO, it should be

              public void reset()
              {
                  position(_mark);
              }
      

      It looks like field QpidByteBuffer.BufferDataInput#_offset is redundant. At least, I do not see any obvious reasons to have this field. Thus, another way to fix the problem is deletion of QpidByteBuffer.BufferDataInput#_offset and invocation of methods of QpidByteBuffer to set/get the position from org.apache.qpid.bytebuffer.QpidByteBuffer.BufferDataInput#position()
      org.apache.qpid.bytebuffer.QpidByteBuffer.BufferDataInput#position(int)

      Additionally, methods QpidByteBuffer.BufferDataInput#position... could be in-lined.

      We might want to port the fix into 6.0.x branch

      Attachments

        Activity

          People

            Unassigned Unassigned
            orudyy Alex Rudyy
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: