Description
We recently had a use case where we needed to wrap the Samza process() method in some code. The TaskLifecycleListener was insufficient to do this. We get a beforeProcess and afterProcess, but what we really wanted was:
def wrapProcess(...) {
foo.doSomething(new Wrapper() {
task.process(...)
})
}
We ended up just writing a wrapper task, and having the normal code defined via a subtask config:
task.class=foo.bar.WrapperTask task.subtask.class=foo.bar.NormalTask
Both of these tasks implement StreamTask. Samza just sees WrapperTask, and treats it like a normal task. Wrapper task instantiates the subtask, and manages its lifecycle internally.
This approach seems superior to the TaskLifecycleListener.
- Allows tasks to be composed multiple times.
- Removes this complexity from the Samza framework, and makes it a concern of the job owner.
- Allows the wrapper task to do things like filtering messages, tweaking configs and serialization, catching exceptions, etc.
Given this, it seems that TaskLifecycleListener is a degenerate case, and adds complexity to the framework. I propose removing it.
Attachments
Attachments
Issue Links
- is related to
-
SAMZA-364 Refactor lifecycle listener APIs
- Open