Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19515

Async RequestReply handler concurrency bug

    XMLWordPrintableJSON

Details

    Description

      Async RequestReply handler implemented in https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem.

       

      Lines 151 to 152 of https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py

      The coro is awaiting and may yield.  Another coro may continue that was yielding and call ic.complete() which sets the ic.context to None

       

      In short:

       

      ic.setup(request_bytes)        
      await self.handle_invocation(ic)        
      return ic.complete()
       
      

      Needs to happen atomically.

       

      I worked around this by creating an AsyncRequestReplyHandler for each request.

       

      It should be possible to re-produce this by putting an await asyncio.sleep(5) in the greeter example and then run in gunicorn with a single asyncio thread/event loop (-w 1).  

       

       

       

          response_data = await handler(request_data)
        File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 152, in __call__
          return ic.complete()
        File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 57, in complete
          self.add_mutations(context, invocation_result)
        File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations
          for name, handle in context.states.items():
      AttributeError: 'NoneType' object has no attribute 'states'
      

       

      Attachments

        Activity

          People

            fransking Frans King
            fransking Frans King
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: