Issue Details (XML | Word | Printable)

Key: MAPREDUCE-205
Type: New Feature New Feature
Status: Open Open
Priority: Major Major
Assignee: Owen O'Malley
Reporter: Andrzej Bialecki
Votes: 0
Watchers: 4
Operations

If you were logged in you would be able to see more operations.
Hadoop Map/Reduce

Add ability to send "signals" to jobs and tasks

Created: 29/Aug/06 08:06 PM   Updated: 20/Jun/09 07:51 AM
Return to search
Component/s: None
Affects Version/s: None
Fix Version/s: None

Time Tracking:
Not Specified

Issue Links:
Reference
 


 Description  « Hide
In some cases it would be useful to be able to "signal" a job and its tasks about some external condition, or to broadcast a specific message to all tasks in a job. Currently we can only send a single pseudo-signal, that is to kill a job.

Example 1: some jobs may be gracefully terminated even if they didn't complete all their work, e.g. Fetcher in Nutch may be running for a very long time if it blocks on relatively few sites left over from the fetchlist. In such case it would be very useful to send it a message requesting that it discards the rest of its input and gracefully completes its map tasks.

Example 2: available bandwidth for fetching may be different at different times of day, e.g. daytime vs. nighttime, or total external link usage by other applications. Fetcher jobs often run for several hours. It would be good to be able to send a "signal" to the Fetcher to throttle or un-throttle its bandwidth usage depending on external conditions.

Job implementations could react to these messages either by implementing a method, or by registering a listener, whichever seems more natural.

I'm not quite sure how to go about implementing it, I guess this would have to be a part of TaskUmbilicalProtocol but my knowledge here is a bit fuzzy ... Comments are welcome.



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Doug Cutting added a comment - 29/Aug/06 08:16 PM
The mapper could periodically poll a server for new messages. For example, a DFS directory could be used per job with a message per file, named with a timestamp. This would not require changes to the MapReduce system. Would this be impractical for the fetcher application?

Andrzej Bialecki added a comment - 29/Aug/06 08:24 PM
It could address this particular problem, yes. However, each time application writers would have to design their own way to do this - it would be better if the framework provided some support for this.

Doug Cutting added a comment - 29/Aug/06 09:09 PM
> each time application writers would have to design their own way to do this

I prefer to wait until a few application writers have done it, then generalize, rather than try to guess what is universal. Otherwise the framework gets bloated with features that are only used by one application.

Are there other folks who need to send messages to running map and reduce tasks?


Andrzej Bialecki added a comment - 15/Sep/06 09:16 PM
This implementation is not Nutch specific, and can be easily moved to Hadoop if users find it useful.

Andrzej Bialecki added a comment - 16/Sep/06 11:21 AM
(Oops, I thought JIRA would include the link in the comment).

NUTCH-368 provides an implementation of a filesystem-based message queue system. This implementation is not Nutch specific, and can be easily moved to Hadoop if users find it useful.


eric baldeschwieler added a comment - 19/Sep/06 05:41 AM
-1

Unless I'm reading this wrong, a file per message would kill the name node at any scale. Also, in a large task, the cost of having every mapper/task scan all the messages could be fairly prohibitive.

I'd suggest making it available in contrib or some other mechanism until we see how much uptake it gets. This would leave specific applications free to use it. Perhaps if this gains wide acceptance we could explore moving the concepts into core, but we would need to address the scaling issues to make a general facility.

A very interesting set of ideas here, but very complicated if you want to make it work in large general cases.


Andrzej Bialecki added a comment - 19/Sep/06 07:47 AM
Re: namenode issue: yes, that's a good point - I didn't think of that, mainly because I'm working with smaller clusters (dozens machines at most).

Re: cost of scanning: that's true as well, although tasks don't have to poll so often, in some cases you could configure the poll interval to be in the range of minutes. However, this points back to a deficiency in the current framework, namely that there is no support for sending arbitrary messages to tasks. If there were a way to do this (well, then the issue would be solved and we wouldn't need this MQ api ... then we could run a separate filesystem monitor, which would dispatch events to all listening tasks concerning filesystem changes such as file/dir creation/deletion/update.

Overall, I'm aware that this is a less than ideal solution to the problem - IMHO my original proposal explained in this issue would be better.


Andrzej Bialecki added a comment - 23/Feb/07 08:50 PM
I'd like to call for re-evaluation of this issue. With the introduction of TaskTrackerAction it seems to me that signals could be accommodated easier than before, simply by sending yet another type of TaskTrackerAction. The original reasons for this issue are still valid - the need to pass bits of information to all tasks in a job.

The message queue approach mentioned before has been tested in practice, and found useful for small-scale clusters and infrequent (control-type) messages. However, it's not scalable due to the heavy load it puts on the namenode.