Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
proton-j-0.33.3
-
None
-
None
Description
Invoking free() on a link, or the processing of a received detach frame, may result in such an exception if the preconditions below are met:
java.lang.IllegalStateException at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:54) at org.apache.qpid.proton.engine.impl.LinkImpl.postFinal(LinkImpl.java:128) at org.apache.qpid.proton.engine.impl.EndpointImpl.decref(EndpointImpl.java:52) at org.apache.qpid.proton.engine.impl.TransportLink.clearRemoteHandle(TransportLink.java:125) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:1379) at org.apache.qpid.proton.engine.impl.TransportImpl.handleDetach(TransportImpl.java:70) at org.apache.qpid.proton.amqp.transport.Detach.invoke(Detach.java:86) at org.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1453) at org.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425) at org.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536) at org.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1570) at org.apache.qpid.proton.engine.impl.TransportImpl.processInput(TransportImpl.java:1528)
The scenario:
We have implemented the logic for creating AMQP links with a timeout-mechanism. That means that after invoking link.open() we wait for a predefined time and if we haven't received the attach frame from the server at that point, we call link.close() and then link.free() to avoid having a memory leak.
This has mostly worked well so far. In cases where the server was not sending the attach frame in time, after calling sender.close() the server usually finally responded with a detach frame (instead of sending an attach in between).
But lately, when testing with a high number of links (>10000) we sometimes encountered such a server behaviour (with Qpid Dispatch Router as server):
- client sends attach frame
- linkEstablishmentTimeout: server doesn't respond in time so client invokes link.close() and then link.free()
- server sends an attach frame
- server sends a detach frame
If that happens multiple times on the same session, the above exception occurs when calling link.free(). Afterwards there are 'socked closed' exceptions.
If we don't invoke link.free as part of the linkEstablishmentTimeout handling, the exception doesn't occur.
But not invoking link.free() would create a memory leak if the server doesn't respond at all to the attach frame.
—
The issue can be reproduced with this test method (to be run as an additional method in the "org.apache.qpid.proton.systemtests.FreeTest" class)
@Test public void testFreeOnLinkEstablishmentTimeout() throws Exception { LOGGER.fine(bold("======== About to create transports")); getClient().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); getServer().transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); getClient().connection = Proton.connection(); getClient().transport.bind(getClient().connection); getServer().connection = Proton.connection(); getServer().transport.bind(getServer().connection); LOGGER.fine(bold("======== About to open connections")); getClient().connection.open(); getServer().connection.open(); doOutputInputCycle(); LOGGER.fine(bold("======== About to open session")); getClient().session = getClient().connection.session(); getClient().session.open(); pumpClientToServer(); getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); getServer().session.open(); assertEndpointState(getServer().session, ACTIVE, ACTIVE); pumpServerToClient(); assertEndpointState(getClient().session, ACTIVE, ACTIVE); for (int i = 0; i < 5; i++) { LOGGER.fine("\n\n"); LOGGER.fine(bold("======== About to create client sender " + i + "; refcount on session: " + getSessionRefCount(getClient().session))); getClient().source = new Source(); getClient().source.setAddress(null); getClient().target = new Target(); getClient().target.setAddress("myQueue"); getClient().sender = getClient().session.sender("sender" + i); getClient().sender.setTarget(getClient().target); getClient().sender.setSource(getClient().source); getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); getClient().sender.open(); assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); pumpClientToServer(); LOGGER.fine(bold("======== client: invoke close & free already, as would be done in linkEstablishmentTimeout handling")); getClient().sender.close(); getClient().sender.free(); // skipping this, the error doesn't occur; but how to prevent a memory leak if no attach frame ever is received? // write to outputBuffer already ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); LOGGER.fine(bold("======== About to set up server receiver (not having gotten the detach yet)")); getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); assertTerminusEquals(getClient().target, serverRemoteTarget); getServer().receiver.setTarget(serverRemoteTarget); assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); getServer().receiver.open(); assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); // write server attach to server outputBuffer already ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent clientBuffer (with 'detach') to the server")); getServer().transport.getInputBuffer().put(clientBuffer); getClient().transport.outputConsumed(); getServer().transport.processInput().checkIsOk(); LOGGER.fine(bold("~~~~~~~~~~ now write the yet-unsent serverBuffer (with 'attach') to the client")); getClient().transport.getInputBuffer().put(serverBuffer); getClient().transport.processInput().checkIsOk(); getServer().transport.outputConsumed(); // assert the server got the detach assertEndpointState(getServer().receiver, ACTIVE, CLOSED); // let the server respond with a detach getServer().receiver.close(); LOGGER.fine(bold("~~~~~~~~~~ pump detach to client")); pumpServerToClient(); assertEndpointState(getClient().sender, CLOSED, CLOSED); getClient().sender.free(); } getClient().transport.unbind(); LOGGER.fine(bold("======== About to close and free client's connection")); getClient().connection.close(); getClient().connection.free(); } private Integer getSessionRefCount(final Session protonSession) throws NoSuchFieldException, IllegalAccessException { if (!(protonSession instanceof EndpointImpl)) { return null; } final Field refcountField = EndpointImpl.class.getDeclaredField("refcount"); refcountField.setAccessible(true); return (Integer) refcountField.get(protonSession); }
(Testcode using vertx-proton and separate client/server classes can also be provided if needed.)