Uploaded image for project: 'TinkerPop'
  1. TinkerPop
  2. TINKERPOP-2277

Python sdk postpone the timing to create transport

VotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.4.2
    • Fix Version/s: 3.3.9, 3.4.4, 3.5.0
    • Component/s: python
    • Labels:
      None

      Description

      The current timing of transport creation is inside connection._init_. It's OK in all scenarios in one process model. 

      But if users need to run Gremlin Python SDK inside services like `celery` which use multiple processes model, this implementation may cause error.

      As transport(which represents socket) is created inside Main Processes, it will be inherited by Child Processes, then threads inside ThreadPoolExecutor will try to read from the same socket, so it may happen that:

      • Child Process 1 send a request with request id 'xxx' use socket s1
      • Child Process 2 send another request with request id 'yyy' also use socket s1, then able to read the response of request id 'xxx', but unable to find 'xxx' inside '_results'

       If this happens, it will cause KeyError exception like:

      [2019-08-06 15:31:21,822: WARNING/ForkPoolWorker-3] Traceback (most recent call last):
        File "/root/celery_gremlin/gremlin_test/tasks.py", line 15, in graph_query_test
          res = client.submit(dsl).one()
        File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/resultset.py", line 83, in one
          return self.done.result()
        File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
          return self.__get_result()
        File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
          raise self._exception
        File "/usr/lib64/python3.6/concurrent/futures/thread.py", line 56, in run
          result = self.fn(*self.args, **self.kwargs)
        File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/connection.py", line 80, in _receive
          status_code = self._protocol.data_received(data, self._results)
        File "/usr/local/lib/python3.6/site-packages/gremlin_python/driver/protocol.py", line 83, in data_received
          result_set = results_dict[request_id]
      KeyError: '1ff0ad4a-dcea-45d1-b240-27c2dc792f73'
      

      Steps to reproduce:

      1. Following the [tutorial|https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html] of Celery to create a simple app named `tasks`
      2. Refactor `tasks.py` to send request through Gremin Python SDK inside the task method
        from celery import Celery
        from gremlin_python.driver import client
        
        username = "test"
        password = "test"
        
        my_client = client.Client('ws://localhost:8182/gremlin', 'g',
                                  username=username, password=password,
                                  pool_size=1, max_workers=1)
        app = Celery('tasks', broker='redis://localhost', backend='redis://localhost:6379/0')
        
        
        @app.task
        def add(x, y):
            dsl = "g.V().count()"
            my_client.submit(dsl).one()
            return x + y
        
      1. Invoke `add.delay(1,2)`

      There is a quick fix which changes several lines inside `connection.py` without breaking other code:

      class Connection:
      
          def __init__(self, url, traversal_source, protocol, transport_factory,
                       executor, pool):
              self._url = url
              self._traversal_source = traversal_source
              self._protocol = protocol
              self._transport_factory = transport_factory
              self._executor = executor
              self._transport = None
              self._pool = pool
              self._results = {}
              self._inited = False
      
          def connect(self):
              if self._transport:
                  self._transport.close()
              self._transport = self._transport_factory()
              self._transport.connect(self._url)
              self._protocol.connection_made(self._transport)
              self._inited = True
      
          def close(self):
              if self._inited:
                  self._transport.close()
      
          def write(self, request_message):
              if not self._inited:
                  self.connect()
      

       

        Attachments

          Activity

            People

            • Assignee:
              spmallette Stephen Mallette
              Reporter:
              sel-fish selfish finch

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment