Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
statefun-2.2.0
-
None
Description
Async RequestReply handler implemented in https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem.
Lines 151 to 152 of https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py
The coro is awaiting and may yield. Another coro may continue that was yielding and call ic.complete() which sets the ic.context to None
In short:
ic.setup(request_bytes)
await self.handle_invocation(ic)
return ic.complete()
Needs to happen atomically.
I worked around this by creating an AsyncRequestReplyHandler for each request.
It should be possible to re-produce this by putting an await asyncio.sleep(5) in the greeter example and then run in gunicorn with a single asyncio thread/event loop (-w 1).
response_data = await handler(request_data) File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 152, in __call__ return ic.complete() File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 57, in complete self.add_mutations(context, invocation_result) File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations for name, handle in context.states.items(): AttributeError: 'NoneType' object has no attribute 'states'