Hama
  1. Hama
  2. HAMA-511

Submitting heterogenous supersteps with precedence constraints on Hama

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Hama should support submission of jobs with support for:
      1) Skipping unwanted superstep synchronization.
      2) Run supersteps with heterogenous nature of computation
      3) Scheduling supersteps with precedence constraints.

      An explanation of these is provided in the attachment.

        Activity

        Suraj Menon created issue -
        Hide
        Suraj Menon added a comment -

        An informal explanation of the HAMA-511. Has scope for more changes and updates.

        Show
        Suraj Menon added a comment - An informal explanation of the HAMA-511 . Has scope for more changes and updates.
        Suraj Menon made changes -
        Field Original Value New Value
        Attachment Defining supersteps for BSP.pdf [ 12513964 ]
        Hide
        Thomas Jungblut added a comment -

        We need to let the user define a "flow-graph" of how the supersteps should be executed.
        According to this, we need an engine which will execute this.

        Also we need a mapping between the input and a specific superstep.

        This isn't hacked in a few hours, this is a larger task.
        However, our current "execution" is just a very simple graph. So we can make the whole system much more flexible and keep the current functionality.

        So I would be +1.

        Show
        Thomas Jungblut added a comment - We need to let the user define a "flow-graph" of how the supersteps should be executed. According to this, we need an engine which will execute this. Also we need a mapping between the input and a specific superstep. This isn't hacked in a few hours, this is a larger task. However, our current "execution" is just a very simple graph. So we can make the whole system much more flexible and keep the current functionality. So I would be +1.
        Hide
        Edward J. Yoon added a comment -

        I like the idea of supporting selective synchronization for high-level BSP developers.

        BTW, I'm reading Valiant's BSP and MS's dryad papers and just wondered why these jobs should running in timeshared-fashion on a cluster. If we schedules job based on resource sharing architecture like Hadoop, it may be possible. But what's the disadvantages?

        I would recommend you to start this as a sub-module.

        Show
        Edward J. Yoon added a comment - I like the idea of supporting selective synchronization for high-level BSP developers. BTW, I'm reading Valiant's BSP and MS's dryad papers and just wondered why these jobs should running in timeshared-fashion on a cluster. If we schedules job based on resource sharing architecture like Hadoop, it may be possible. But what's the disadvantages? I would recommend you to start this as a sub-module.
        Hide
        Thomas Jungblut added a comment -

        I see this a bit more important than you.

        Currently GoldenOrb has folded, so our only "competitor" is Giraph.
        Giraph is focused on Graph computing solely with the "simple" BSP case.

        Suraj is right, we should support a flexible engine for every execution kind. We can cover the simple case with the normal BSP and Pregel-api as well, but can be much wider usable.

        An interesting point is as well, that this system equals to Dryad.
        Dryad has been dropped by Microsoft in favor of Hadoop and we can bring it back on top of Hadoop.
        We should just ask ourselfs, why they dropped dryad.

        I wouldn't make this a submodule, anyways this would be a greater change throughout the whole code base. But afterwards our system is much more powerful than it is now.

        BTW, I'm reading Valiant's BSP and MS's dryad papers and just wondered why these jobs should running in timeshared-fashion on a cluster. If we schedules job based on resource sharing architecture like Hadoop, it may be possible. But what's the disadvantages?

        I believe it has performance reasons. Timesharing looks like a more ancient scheduling to me.

        Show
        Thomas Jungblut added a comment - I see this a bit more important than you. Currently GoldenOrb has folded, so our only "competitor" is Giraph. Giraph is focused on Graph computing solely with the "simple" BSP case. Suraj is right, we should support a flexible engine for every execution kind. We can cover the simple case with the normal BSP and Pregel-api as well, but can be much wider usable. An interesting point is as well, that this system equals to Dryad. Dryad has been dropped by Microsoft in favor of Hadoop and we can bring it back on top of Hadoop. We should just ask ourselfs, why they dropped dryad. I wouldn't make this a submodule, anyways this would be a greater change throughout the whole code base. But afterwards our system is much more powerful than it is now. BTW, I'm reading Valiant's BSP and MS's dryad papers and just wondered why these jobs should running in timeshared-fashion on a cluster. If we schedules job based on resource sharing architecture like Hadoop, it may be possible. But what's the disadvantages? I believe it has performance reasons. Timesharing looks like a more ancient scheduling to me.
        Hide
        Suraj Menon added a comment -

        Nice to see some views expressed. There is no way we should be giving up the simple or rather the new simple fault tolerant BSP API.
        The reason, I rushed to express this idea, is for us to keep it in mind while we design and implement fault tolerance, which is our current focus and there are people already working on it.

        As an example, when I am making the checkpointing configurable with a simple modulo logic today, I was making it a modulo function of a counter for number of times sync() function is called. Now with selective superstep synchronization in mind, I have to make the checkpointing logic a function of current superstep number.

        Regarding the changes to be made, I was encouraged by the design that Thomas had in his github repo. https://github.com/thomasjungblut/thomasjungblut-common/blob/master/src/de/jungblut/bsp/ft/FaultTolerantBSP.java
        I think It already necessitates sending the Superstep array to BSPPeer. For selective synchronization, we would need multidimensional array to be sent with the column of the array to be executed. The column number would also have to be part of the identity of Zookeeper node for synchronization. I agree this is not a small task.

        In offline mode, we can always implement task precedence constraints with multiple batch processing (like Oozie for hadoop-mapreduce). However, I think having this flexibility would be really useful for real-time Hama tasks. I feel this would give Hama capability to be a framework for implementing distributed real-time computation tasks as well. We can evaluate the design of S4(Y!), EarlyBird(Twitter) and others to verify this.

        Show
        Suraj Menon added a comment - Nice to see some views expressed. There is no way we should be giving up the simple or rather the new simple fault tolerant BSP API. The reason, I rushed to express this idea, is for us to keep it in mind while we design and implement fault tolerance, which is our current focus and there are people already working on it. As an example, when I am making the checkpointing configurable with a simple modulo logic today, I was making it a modulo function of a counter for number of times sync() function is called. Now with selective superstep synchronization in mind, I have to make the checkpointing logic a function of current superstep number. Regarding the changes to be made, I was encouraged by the design that Thomas had in his github repo. https://github.com/thomasjungblut/thomasjungblut-common/blob/master/src/de/jungblut/bsp/ft/FaultTolerantBSP.java I think It already necessitates sending the Superstep array to BSPPeer. For selective synchronization, we would need multidimensional array to be sent with the column of the array to be executed. The column number would also have to be part of the identity of Zookeeper node for synchronization. I agree this is not a small task. In offline mode, we can always implement task precedence constraints with multiple batch processing (like Oozie for hadoop-mapreduce). However, I think having this flexibility would be really useful for real-time Hama tasks. I feel this would give Hama capability to be a framework for implementing distributed real-time computation tasks as well. We can evaluate the design of S4(Y!), EarlyBird(Twitter) and others to verify this.
        Hide
        ChiaHung Lin added a comment -

        Some issues I can think of at the moment

        For selective synchronization, if recovery is required to take place, configurable checkpoint might not work well. For example, user configures checkpoint every 5 supersteps, and some tasks cross over 5 supersteps (e.g. 7 supersteps). Then checkpoints can not correctly serialize messages because the long task is not yet ready for be checkpointed. Or the system may checkpoint the process image, which is not portable. The disadvantage would be that, in the case of configurable checkpoint set to 5 and a task cross 7 supersteps, the system can only checkpoint once per 35 supersteps.

        Concerning task execution/ dependency, it might be interesting to have a look at CIEL[1], which allows dynamic task composition and is suitable for iterative and recursive algorithm. Also it might be good if the system allows users to choose this as an option because probably other tasks may not need such style job execution.

        [1]. CIEL is a universal execution engine for distributed computation. http://www.cl.cam.ac.uk/research/srg/netos/ciel/

        Show
        ChiaHung Lin added a comment - Some issues I can think of at the moment For selective synchronization, if recovery is required to take place, configurable checkpoint might not work well. For example, user configures checkpoint every 5 supersteps, and some tasks cross over 5 supersteps (e.g. 7 supersteps). Then checkpoints can not correctly serialize messages because the long task is not yet ready for be checkpointed. Or the system may checkpoint the process image, which is not portable. The disadvantage would be that, in the case of configurable checkpoint set to 5 and a task cross 7 supersteps, the system can only checkpoint once per 35 supersteps. Concerning task execution/ dependency, it might be interesting to have a look at CIEL [1] , which allows dynamic task composition and is suitable for iterative and recursive algorithm. Also it might be good if the system allows users to choose this as an option because probably other tasks may not need such style job execution. [1] . CIEL is a universal execution engine for distributed computation. http://www.cl.cam.ac.uk/research/srg/netos/ciel/
        Hide
        Edward J. Yoon added a comment -

        +1 to chiahung's comment.

        Show
        Edward J. Yoon added a comment - +1 to chiahung's comment.
        Hide
        Suraj Menon added a comment - - edited

        Thanks ChiaHung for the information. It looks like a good read and I agree with your opinion of user specifically selecting this as an option.

        Regarding your comment on checkpointing, it would be a bad choice if someone sets the checkpointing interval to 5 when the "largest superstep unit"(If I am allowed to term so) of their computation is 7. Even if it is set at 5, Failure at superstep 6 would necessitate the large task (task requiring 7 supersteps) to start over again, but not the smaller ones checkpointed at 5th superstep. In today's design, we would have the first superstep of other small tasks waiting till the large task finishes. and on failure at 6th superstep, would require restart of all the tasks from superstep 0. Don't miss the point that the larger task was not receiving messages from other tasks during this period. Please note that the superstep count required for a job would be configurable in such a scenario and when a task goes into sync it is informing ZK which superstep is the task seeking a sync for. The getSuperStepCount for large task would return its start superstep count + 7. Your situation also reiterates my aforesaid point that instead of coding the checkpoint function -

        private final boolean shouldCheckPointNow()

        { return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false) && (checkPointInterval != 0) && (getSuperstepCount() % checkPointInterval) == 0); }

        We should have -

        private final boolean shouldCheckPointNow()

        { // previousCheckpointSuperstep is the superstep at which a checkpoint was done return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false) && (checkPointInterval != 0) && (getSuperstepCount() - previousCheckpointSuperstep) >= 0); }

        The change was necessary here because we have selective superstep design in mind.

        Show
        Suraj Menon added a comment - - edited Thanks ChiaHung for the information. It looks like a good read and I agree with your opinion of user specifically selecting this as an option. Regarding your comment on checkpointing, it would be a bad choice if someone sets the checkpointing interval to 5 when the "largest superstep unit"(If I am allowed to term so) of their computation is 7. Even if it is set at 5, Failure at superstep 6 would necessitate the large task (task requiring 7 supersteps) to start over again, but not the smaller ones checkpointed at 5th superstep. In today's design, we would have the first superstep of other small tasks waiting till the large task finishes. and on failure at 6th superstep, would require restart of all the tasks from superstep 0. Don't miss the point that the larger task was not receiving messages from other tasks during this period. Please note that the superstep count required for a job would be configurable in such a scenario and when a task goes into sync it is informing ZK which superstep is the task seeking a sync for. The getSuperStepCount for large task would return its start superstep count + 7. Your situation also reiterates my aforesaid point that instead of coding the checkpoint function - private final boolean shouldCheckPointNow() { return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false) && (checkPointInterval != 0) && (getSuperstepCount() % checkPointInterval) == 0); } We should have - private final boolean shouldCheckPointNow() { // previousCheckpointSuperstep is the superstep at which a checkpoint was done return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false) && (checkPointInterval != 0) && (getSuperstepCount() - previousCheckpointSuperstep) >= 0); } The change was necessary here because we have selective superstep design in mind.
        Hide
        ChiaHung Lin added a comment -

        As I understand this strategy is called local checkpoint or independent checkpoint. The issues of this design is that it would have domino effect[1], resulting in the recovery process starting from the initial state and the frequency may be higher than expected. In addition, due to lack of consistent state for the whole system at specific time point, every individual checkpointed data can not be garbage collected[2], indicating that the system needs to preserve large amount of the checkpointed data so that rollback/ recovery would be possible.

        Coordinated checkpoint synchronizes at specific time point in order to form a consistent state. Although this is not a perfect solution, it is somehow reliable compared with local checkpoint and relative simpler than communication-induced checkpoint.

        [1]. Brian Randell. System Structure for Software Fault Tolerance.

        [2]. Titos Saridakis. Design Patterns for Checkpoint-Based Rollback Recovery

        Show
        ChiaHung Lin added a comment - As I understand this strategy is called local checkpoint or independent checkpoint. The issues of this design is that it would have domino effect [1] , resulting in the recovery process starting from the initial state and the frequency may be higher than expected. In addition, due to lack of consistent state for the whole system at specific time point, every individual checkpointed data can not be garbage collected [2] , indicating that the system needs to preserve large amount of the checkpointed data so that rollback/ recovery would be possible. Coordinated checkpoint synchronizes at specific time point in order to form a consistent state. Although this is not a perfect solution, it is somehow reliable compared with local checkpoint and relative simpler than communication-induced checkpoint. [1] . Brian Randell. System Structure for Software Fault Tolerance. [2] . Titos Saridakis. Design Patterns for Checkpoint-Based Rollback Recovery
        Hide
        Edward J. Yoon added a comment -

        Do you have a plan on this? If so, please share with me.

        Show
        Edward J. Yoon added a comment - Do you have a plan on this? If so, please share with me.
        Hide
        Suraj Menon added a comment -

        HAMA-639 and HAMA-652 should be 50% of work. For HAMA-652, I have implemented the scenario when selective synchronizations are to be done on pre-determined synchronization points before execution. It gets a little tricky when the synchronizations points (or even the members) to sync are to be determined during the execution. HAMA-639 API could be finalized once code for HAMA-652 is finalized.

        The next issue is when a task who is at superstep (say number 10) receives messages from a remote peer for a distant superstep ( say anything more than 11 in this case ). I think we would need to change the send protocol to handle this case. Also, we would need BSPJobClient to provide input splits for multiple superstep chains.

        Show
        Suraj Menon added a comment - HAMA-639 and HAMA-652 should be 50% of work. For HAMA-652 , I have implemented the scenario when selective synchronizations are to be done on pre-determined synchronization points before execution. It gets a little tricky when the synchronizations points (or even the members) to sync are to be determined during the execution. HAMA-639 API could be finalized once code for HAMA-652 is finalized. The next issue is when a task who is at superstep (say number 10) receives messages from a remote peer for a distant superstep ( say anything more than 11 in this case ). I think we would need to change the send protocol to handle this case. Also, we would need BSPJobClient to provide input splits for multiple superstep chains.
        Hide
        Thomas Jungblut added a comment -

        Thought we are going to do something similar to dryad. This paradigm for branch&bound algorithms would be cool to implement (http://research.microsoft.com/apps/pubs/default.aspx?id=144832).

        Show
        Thomas Jungblut added a comment - Thought we are going to do something similar to dryad. This paradigm for branch&bound algorithms would be cool to implement ( http://research.microsoft.com/apps/pubs/default.aspx?id=144832 ).
        Hide
        Suraj Menon added a comment -

        Yes, we would need the listed issues to be implemented before we can even start working on implementing vertice operators.

        Show
        Suraj Menon added a comment - Yes, we would need the listed issues to be implemented before we can even start working on implementing vertice operators.

          People

          • Assignee:
            Unassigned
            Reporter:
            Suraj Menon
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development