Details

    • Type: New Feature New Feature
    • Status: Reopened
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      In some cases it would be useful to be able to "signal" a job and its tasks about some external condition, or to broadcast a specific message to all tasks in a job. Currently we can only send a single pseudo-signal, that is to kill a job.

      Example 1: some jobs may be gracefully terminated even if they didn't complete all their work, e.g. Fetcher in Nutch may be running for a very long time if it blocks on relatively few sites left over from the fetchlist. In such case it would be very useful to send it a message requesting that it discards the rest of its input and gracefully completes its map tasks.

      Example 2: available bandwidth for fetching may be different at different times of day, e.g. daytime vs. nighttime, or total external link usage by other applications. Fetcher jobs often run for several hours. It would be good to be able to send a "signal" to the Fetcher to throttle or un-throttle its bandwidth usage depending on external conditions.

      Job implementations could react to these messages either by implementing a method, or by registering a listener, whichever seems more natural.

      I'm not quite sure how to go about implementing it, I guess this would have to be a part of TaskUmbilicalProtocol but my knowledge here is a bit fuzzy ... Comments are welcome.

        Issue Links

          Activity

          Hide
          Harsh J added a comment -

          In hindsight, I think this was valid but it still has some stale information and the discussion would be better moved to a new ticket so that it fits in with the MR2 model we have in trunk/0.23 now, 'stead of the older protocols.

          Show
          Harsh J added a comment - In hindsight, I think this was valid but it still has some stale information and the discussion would be better moved to a new ticket so that it fits in with the MR2 model we have in trunk/0.23 now, 'stead of the older protocols.
          Hide
          Harsh J added a comment -

          This never seemed to generate any demand and the discussion has also grown stale. Should be doable in user-land in MR2 though, so closing out for now.

          Show
          Harsh J added a comment - This never seemed to generate any demand and the discussion has also grown stale. Should be doable in user-land in MR2 though, so closing out for now.
          Hide
          Andrzej Bialecki added a comment -

          I'd like to call for re-evaluation of this issue. With the introduction of TaskTrackerAction it seems to me that signals could be accommodated easier than before, simply by sending yet another type of TaskTrackerAction. The original reasons for this issue are still valid - the need to pass bits of information to all tasks in a job.

          The message queue approach mentioned before has been tested in practice, and found useful for small-scale clusters and infrequent (control-type) messages. However, it's not scalable due to the heavy load it puts on the namenode.

          Show
          Andrzej Bialecki added a comment - I'd like to call for re-evaluation of this issue. With the introduction of TaskTrackerAction it seems to me that signals could be accommodated easier than before, simply by sending yet another type of TaskTrackerAction. The original reasons for this issue are still valid - the need to pass bits of information to all tasks in a job. The message queue approach mentioned before has been tested in practice, and found useful for small-scale clusters and infrequent (control-type) messages. However, it's not scalable due to the heavy load it puts on the namenode.
          Hide
          Andrzej Bialecki added a comment -

          Re: namenode issue: yes, that's a good point - I didn't think of that, mainly because I'm working with smaller clusters (dozens machines at most).

          Re: cost of scanning: that's true as well, although tasks don't have to poll so often, in some cases you could configure the poll interval to be in the range of minutes. However, this points back to a deficiency in the current framework, namely that there is no support for sending arbitrary messages to tasks. If there were a way to do this (well, then the issue would be solved and we wouldn't need this MQ api ... then we could run a separate filesystem monitor, which would dispatch events to all listening tasks concerning filesystem changes such as file/dir creation/deletion/update.

          Overall, I'm aware that this is a less than ideal solution to the problem - IMHO my original proposal explained in this issue would be better.

          Show
          Andrzej Bialecki added a comment - Re: namenode issue: yes, that's a good point - I didn't think of that, mainly because I'm working with smaller clusters (dozens machines at most). Re: cost of scanning: that's true as well, although tasks don't have to poll so often, in some cases you could configure the poll interval to be in the range of minutes. However, this points back to a deficiency in the current framework, namely that there is no support for sending arbitrary messages to tasks. If there were a way to do this (well, then the issue would be solved and we wouldn't need this MQ api ... then we could run a separate filesystem monitor, which would dispatch events to all listening tasks concerning filesystem changes such as file/dir creation/deletion/update. Overall, I'm aware that this is a less than ideal solution to the problem - IMHO my original proposal explained in this issue would be better.
          Hide
          eric baldeschwieler added a comment -

          -1

          Unless I'm reading this wrong, a file per message would kill the name node at any scale. Also, in a large task, the cost of having every mapper/task scan all the messages could be fairly prohibitive.

          I'd suggest making it available in contrib or some other mechanism until we see how much uptake it gets. This would leave specific applications free to use it. Perhaps if this gains wide acceptance we could explore moving the concepts into core, but we would need to address the scaling issues to make a general facility.

          A very interesting set of ideas here, but very complicated if you want to make it work in large general cases.

          Show
          eric baldeschwieler added a comment - -1 Unless I'm reading this wrong, a file per message would kill the name node at any scale. Also, in a large task, the cost of having every mapper/task scan all the messages could be fairly prohibitive. I'd suggest making it available in contrib or some other mechanism until we see how much uptake it gets. This would leave specific applications free to use it. Perhaps if this gains wide acceptance we could explore moving the concepts into core, but we would need to address the scaling issues to make a general facility. A very interesting set of ideas here, but very complicated if you want to make it work in large general cases.
          Hide
          Andrzej Bialecki added a comment -

          (Oops, I thought JIRA would include the link in the comment).

          NUTCH-368 provides an implementation of a filesystem-based message queue system. This implementation is not Nutch specific, and can be easily moved to Hadoop if users find it useful.

          Show
          Andrzej Bialecki added a comment - (Oops, I thought JIRA would include the link in the comment). NUTCH-368 provides an implementation of a filesystem-based message queue system. This implementation is not Nutch specific, and can be easily moved to Hadoop if users find it useful.
          Hide
          Andrzej Bialecki added a comment -

          This implementation is not Nutch specific, and can be easily moved to Hadoop if users find it useful.

          Show
          Andrzej Bialecki added a comment - This implementation is not Nutch specific, and can be easily moved to Hadoop if users find it useful.
          Hide
          Doug Cutting added a comment -

          > each time application writers would have to design their own way to do this

          I prefer to wait until a few application writers have done it, then generalize, rather than try to guess what is universal. Otherwise the framework gets bloated with features that are only used by one application.

          Are there other folks who need to send messages to running map and reduce tasks?

          Show
          Doug Cutting added a comment - > each time application writers would have to design their own way to do this I prefer to wait until a few application writers have done it, then generalize, rather than try to guess what is universal. Otherwise the framework gets bloated with features that are only used by one application. Are there other folks who need to send messages to running map and reduce tasks?
          Hide
          Andrzej Bialecki added a comment -

          It could address this particular problem, yes. However, each time application writers would have to design their own way to do this - it would be better if the framework provided some support for this.

          Show
          Andrzej Bialecki added a comment - It could address this particular problem, yes. However, each time application writers would have to design their own way to do this - it would be better if the framework provided some support for this.
          Hide
          Doug Cutting added a comment -

          The mapper could periodically poll a server for new messages. For example, a DFS directory could be used per job with a message per file, named with a timestamp. This would not require changes to the MapReduce system. Would this be impractical for the fetcher application?

          Show
          Doug Cutting added a comment - The mapper could periodically poll a server for new messages. For example, a DFS directory could be used per job with a message per file, named with a timestamp. This would not require changes to the MapReduce system. Would this be impractical for the fetcher application?

            People

            • Assignee:
              Unassigned
              Reporter:
              Andrzej Bialecki
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development