Details
Description
In the context of a Flask application using multi threads, it is currently not possible to close the DriverRemoteConnection due to two issues. As our Flask application initiates a new connection on every new request (because we don't want the trouble of reusing the connections), the process eventually runs out of file descriptors.
How to reproduce
Given a gremlin server running on 127.0.0.1:8182, this can reproduce the first error:
import threading from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.process.anonymous_traversal import traversal if __name__ == "__main__": def handle_request(): remote_connection = DriverRemoteConnection("ws://127.0.0.1:8182/gremlin", "g") g = traversal().withRemote(remote_connection) print(g.V().limit(1).toList()) remote_connection.close() for i in range(10): t = threading.Thread(target=handle_request) t.start() t.join() print("Press ENTER to terminate") s = input()
Error due to not finding current event loop
When a thread tries to execute remote_connection.close(), the following error happens:
asyncio/events.py", line 639, in get_event_loop raise RuntimeError('There is no current event loop in thread %r.' RuntimeError: There is no current event loop in thread 'Thread-10'.
This is caused by TornadoTransport.close() does not close the websocket in a loop.
I can fix that by providing my own transport to close the websocket with self._loop.run_sync(lambda: self._ws.close()):
import threading from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.tornado.transport import TornadoTransport from gremlin_python.process.anonymous_traversal import traversal class CustomTornadoTransport(TornadoTransport): def close(self): self._loop.run_sync(lambda: self._ws.close()) self._loop.close() if __name__ == "__main__": def handle_request(): remote_connection = DriverRemoteConnection( "ws://127.0.0.1:8182/gremlin", "g", transport_factory=CustomTornadoTransport ) g = traversal().withRemote(remote_connection) print(g.V().limit(1).toList()) remote_connection.close() for i in range(10): t = threading.Thread(target=handle_request) t.start() t.join() print("Press ENTER to terminate") s = input()
Connections are kept in CLOSE_WAIT state
Now, apparently the connection is closed successfully, but if we look at the open connections, we will find a bunch of tcp connections in CLOSE_WAIT state.
For example, using netstat on Linux while the script is still running:
netstat -nt4p | grep 8182 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp 3 0 127.0.0.1:52092 127.0.0.1:8182 CLOSE_WAIT 26886/ld-linux-x86- tcp 3 0 127.0.0.1:52110 127.0.0.1:8182 CLOSE_WAIT 26886/ld-linux-x86- tcp 3 0 127.0.0.1:52098 127.0.0.1:8182 CLOSE_WAIT 26886/ld-linux-x86- tcp 3 0 127.0.0.1:52104 127.0.0.1:8182 CLOSE_WAIT 26886/ld-linux-x86-
Digging into the code, I found out that tornado does not terminate the connection right away. This is what happens when the websocket is closed:
- It sends a message to the server
- It schedules a 5s timer to abort the connection in case the client does not close it
- On the next IO loop iteration, it receives the client message for closing the connection
- As the client closed the connection cleanly, it cancels the timeout
So, for the websocket to properly close, the loop needs to run again for Tornado to receive the client's close message or for the timeout to call abort. But that never happens, because TornadoTransport.close also closes the loop, leaking those connections.
I don't know if that is the best solution, but reading a message from the socket after closing it, makes tornado receive the client message to close the connection:
class CustomTornadoTransport(TornadoTransport): def close(self): self._loop.run_sync(lambda: self._ws.close()) message = self._loop.run_sync(lambda: self._ws.read_message()) # This situation shouldn't really happen. Since the connection was closed, # the next message should be None if message is not None: raise RuntimeError("Connection was not properly closed") self._loop.close()
Now, after running the script with the change above, netstat -nt4p | grep 8182 does not show any connections any more.