Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
With the Async High Level API changes, we changed StreamOperatorTask from a StreamTask to an AsyncStreamTask. This means that instead of using the AsyncStreamTaskAdapter, its processAsync() is invoked directly on the run loop. This means that the job.container.thread.pool.size has no effect, and the entire DAG (outside of asyncFlatMap) is executed on the run loop, one TaskInstance at a time.
... at org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:184) at org.apache.samza.task.StreamOperatorTask.processAsync(StreamOperatorTask.java:109) at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:176) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) at at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:174) at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:470) at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:412) at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:346) at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:233) at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:165) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:763) at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:150) at org.apache.samza.runtime.ContainerLaunchUtil.run(ContainerLaunchUtil.java:78) at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:76)
Attachments
Issue Links
- links to