Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18518

Add Async RequestReply handler for the Python SDK

    XMLWordPrintableJSON

    Details

      Description

      I/O bound stateful functions can benefit from the built-in async/io support in Python, but the 

      RequestReply handler is not an async-io compatible.  See this question on stackoverflow.

       

      Having an asyncio compatible handler will open the door to the usage of aiohttp for example:

       

      import aiohttp
      import asyncio
      
      ...
      
      async def fetch(session, url):
          async with session.get(url) as response:
              return await response.text()
      
      @function.bind("example/hello")
      async def hello(context, message):
          async with aiohttp.ClientSession() as session:
              html = await fetch(session, 'http://python.org')
              context.pack_and_reply(SomeProtobufMessage(html))
      
      
      from aiohttp import webhandler 
      
      handler = AsyncRequestReplyHandler(functions)
      
      async def handle(request):
          req = await request.read()
          res = await handler(req)
          return web.Response(body=res, content_type="application/octet-stream'")
      
      app = web.Application()
      app.add_routes([web.post('/statefun', handle)])
      if __name__ == '__main__':
          web.run_app(app, port=5000)
       

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                igal Igal Shilman
                Reporter:
                igal Igal Shilman
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: