Description
The current timing of transport creation is inside connection._init_. It's OK in all scenarios in one process model.
But if users need to run Gremlin Python SDK inside services like `celery` which use multiple processes model, this implementation may cause error.
As transport(which represents socket) is created inside Main Processes, it will be inherited by Child Processes, then threads inside ThreadPoolExecutor will try to read from the same socket, so it may happen that:
- Child Process 1 send a request with request id 'xxx' use socket s1
- Child Process 2 send another request with request id 'yyy' also use socket s1, then able to read the response of request id 'xxx', but unable to find 'xxx' inside '_results'
If this happens, it will cause KeyError exception like:
[2019-08-06 15:31:21,822: WARNING/ForkPoolWorker-3] Traceback (most recent call last): File "/root/celery_gremlin/gremlin_test/tasks.py", line 15, in graph_query_test res = client.submit(dsl).one() File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/resultset.py", line 83, in one return self.done.result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result return self.__get_result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/usr/lib64/python3.6/concurrent/futures/thread.py", line 56, in run result = self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/connection.py", line 80, in _receive status_code = self._protocol.data_received(data, self._results) File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/protocol.py", line 83, in data_received result_set = results_dict[request_id] KeyError: '1ff0ad4a-dcea-45d1-b240-27c2dc792f73'
Steps to reproduce:
- Following the [tutorial|https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html] of Celery to create a simple app named `tasks`
- Refactor `tasks.py` to send request through Gremin Python SDK inside the task method
from celery import Celery from gremlin_python.driver import client username = "test" password = "test" my_client = client.Client('ws://localhost:8182/gremlin', 'g', username=username, password=password, pool_size=1, max_workers=1) app = Celery('tasks', broker='redis://localhost', backend='redis://localhost:6379/0') @app.task def add(x, y): dsl = "g.V().count()" my_client.submit(dsl).one() return x + y
- Invoke `add.delay(1,2)`
There is a quick fix which changes several lines inside `connection.py` without breaking other code:
class Connection: def __init__(self, url, traversal_source, protocol, transport_factory, executor, pool): self._url = url self._traversal_source = traversal_source self._protocol = protocol self._transport_factory = transport_factory self._executor = executor self._transport = None self._pool = pool self._results = {} self._inited = False def connect(self): if self._transport: self._transport.close() self._transport = self._transport_factory() self._transport.connect(self._url) self._protocol.connection_made(self._transport) self._inited = True def close(self): if self._inited: self._transport.close() def write(self, request_message): if not self._inited: self.connect()