Uploaded image for project: 'Qpid Proton'
  1. Qpid Proton
  2. PROTON-2177

IllegalStateException when freeing link as part of timeout handling

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: proton-j-0.33.3
    • Fix Version/s: None
    • Component/s: proton-j
    • Labels:
      None

      Description

      Invoking free() on a link, or the processing of a received detach frame, may result in such an exception if the preconditions below are met:

      java.lang.IllegalStateException
      	at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54)
      	at org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128)
      	at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52)
      	at org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125)
      	at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379)
      	at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70)
      	at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86)
      	at org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453)
      	at org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)
      	at org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)
      	at org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570)
      	at org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
      

      The scenario:
      We have implemented the logic for creating AMQP links with a timeout-mechanism. That means that after invoking link.open() we wait for a predefined time and if we haven't received the attach frame from the server at that point, we call link.close() and then link.free() to avoid having a memory leak.

      This has mostly worked well so far. In cases where the server was not sending the attach frame in time, after calling sender.close() the server usually finally responded with a detach frame (instead of sending an attach in between).

      But lately, when testing with a high number of links (>10000) we sometimes encountered such a server behaviour (with Qpid Dispatch Router as server):

      • client sends attach frame
      • linkEstablishmentTimeout: server doesn't respond in time so client invokes link.close() and then link.free()
      • server sends an attach frame
      • server sends a detach frame

      If that happens multiple times on the same session, the above exception occurs when calling link.free(). Afterwards there are 'socked closed' exceptions.

      If we don't invoke link.free as part of the linkEstablishmentTimeout handling, the exception doesn't occur.
      But not invoking link.free() would create a memory leak if the server doesn't respond at all to the attach frame.


      The issue can be reproduced with this test method (to be run as an additional method in the "org.apache.qpid.proton.systemtests.FreeTest" class)

          @Test
          public void testFreeOnLinkEstablishmentTimeout() throws Exception {
              LOGGER.fine(bold("======== About to create transports"));
              getClient().transport = Proton.transport();
              ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
              getServer().transport = Proton.transport();
              ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "            " + TestLoggingHelper.SERVER_PREFIX);
              getClient().connection = Proton.connection();
              getClient().transport.bind(getClient().connection);
              getServer().connection = Proton.connection();
              getServer().transport.bind(getServer().connection);
              LOGGER.fine(bold("======== About to open connections"));
              getClient().connection.open();
              getServer().connection.open();
              doOutputInputCycle();
              LOGGER.fine(bold("======== About to open session"));
              getClient().session = getClient().connection.session();
              getClient().session.open();
              pumpClientToServer();
              getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
              assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
              getServer().session.open();
              assertEndpointState(getServer().session, ACTIVE, ACTIVE);
              pumpServerToClient();
              assertEndpointState(getClient().session, ACTIVE, ACTIVE);
      
              for (int i = 0; i < 5; i++) {
                  LOGGER.fine("\n\n");
                  LOGGER.fine(bold("======== About to create client sender " + i + "; refcount on session: " + getSessionRefCount(getClient().session)));
                  getClient().source = new Source();
                  getClient().source.setAddress(null);
                  getClient().target = new Target();
                  getClient().target.setAddress("myQueue");
                  getClient().sender = getClient().session.sender("sender" + i);
                  getClient().sender.setTarget(getClient().target);
                  getClient().sender.setSource(getClient().source);
                  getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
                  getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
                  assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED);
                  getClient().sender.open();
                  assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
                  pumpClientToServer();
      
                  LOGGER.fine(bold("======== client: invoke close & free already, as would be done in linkEstablishmentTimeout handling"));
                  getClient().sender.close();
                  getClient().sender.free();  // skipping this, the error doesn't occur; but how to prevent a memory leak if no attach frame ever is received?
                  // write to outputBuffer already
                  ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
      
                  LOGGER.fine(bold("======== About to set up server receiver (not having gotten the detach yet)"));
                  getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
                  getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
                  getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
                  org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget();
                  assertTerminusEquals(getClient().target, serverRemoteTarget);
                  getServer().receiver.setTarget(serverRemoteTarget);
                  assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
                  getServer().receiver.open();
                  assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
      
                  // write server attach to server outputBuffer already
                  ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
      
                  LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer (with 'detach') to the server"));
                  getServer().transport.getInputBuffer().put(clientBuffer);
                  getClient().transport.outputConsumed();
                  getServer().transport.processInput().checkIsOk();
      
                  LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer (with 'attach') to the client"));
                  getClient().transport.getInputBuffer().put(serverBuffer);
                  getClient().transport.processInput().checkIsOk();
                  getServer().transport.outputConsumed();
      
                  // assert the server got the detach
                  assertEndpointState(getServer().receiver, ACTIVE, CLOSED);
      
                  // let the server respond with a detach
                  getServer().receiver.close();
                  LOGGER.fine(bold("~~~~~~~~~~ pump detach to client"));
                  pumpServerToClient();
                  assertEndpointState(getClient().sender, CLOSED, CLOSED);
                  getClient().sender.free();
              }
      
              getClient().transport.unbind();
              LOGGER.fine(bold("======== About to close and free client's connection"));
              getClient().connection.close();
              getClient().connection.free();
          }
      
          private Integer getSessionRefCount(final Session protonSession) throws NoSuchFieldException, IllegalAccessException {
              if (!(protonSession instanceof EndpointImpl)) {
                  return null;
              }
              final Field refcountField = EndpointImpl.class.getDeclaredField("refcount");
              refcountField.setAccessible(true);
              return (Integer) refcountField.get(protonSession);
          }
      

      (Testcode using vertx-proton and separate client/server classes can also be provided if needed.)

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              calohmn Carsten Lohmann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: