Details
-
New Feature
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
-
None
Description
https://github.com/nathanmarz/storm/issues/168
void progress(Tuple input)
This would send a message back to the spout (through the acker) to increase the timeout for the roots of the tuple. Would be useful if the processing times of a tuple is highly variable. The timeout should reset to TOPOLOGY_MESSAGE_TIMEOUT plus the current time.
-----------
xumingming: what's the content of the tuple passed to the method progress?
-----------
nathanmarz: It would be the tuple that was passed to "execute". For example, in pseudocode:
execute(Tuple input):
while(...):
// do some processing
_collector.progress(input)
_collector.ack(input)
So the progress method would extend the timeout for the spout tuples at the root of "input"
Just a note – a "progress" method could be a key to implementing MapReduce on top of Storm (along with more powerful grouping/scheduling capabilities and the ability to make use of local disk).
-----------
rohitprasad15: I wanted to take up this issue. I learnt Clojure and Storm a week back, but want to get deep into both.
I can briefly describe my approach -
1. Modify task.clj. Implement ^void progress [this ^Tuple tuple] as part of output-collector. Send a message to the acker, in a way similar to how ack method does it.
2. Modify acker.clj. Change execute() of IBolt implementation. Need to add ACKER-PROGRESS-STREAM-ID, to distinguish that its a progress message, and then finally reset the timeout to maxTopologyMessageTimeout.
Is this approach in the right direction?
Attachments
Issue Links
- is duplicated by
-
STORM-1549 Add support for extending tuple tree timeout
- Resolved