Hama
  1. Hama
  2. HAMA-567

BSPPeer should provide means for chaining supersteps to share data among them.

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.6.0
    • Fix Version/s: None
    • Component/s: bsp core
    • Labels:
      None

      Description

      In most scenarios, a superstep would need certain values or objects that were computed in the previous superstep. When using the chaining Superstep design to implement BSP algorithms, this gets a little ugly/difficult to implement. BSPPeer should provide means (preferably a map<String,Object>) so that the next Superstep can ask for the values in previous superstep using String token to query the map. Also, this map could be checkpointed periodically in the background so that we can completely recover the state of a task after failure. The BSPPeer object should have a dedicated get and set function for updating values in the peer.

      1. Mapper.java
        4 kB
        Suraj Menon

        Issue Links

          Activity

          Suraj Menon created issue -
          Thomas Jungblut made changes -
          Field Original Value New Value
          Link This issue is related to HAMA-546 [ HAMA-546 ]
          Hide
          Thomas Jungblut added a comment -

          Highly correlated with HAMA-546, since this here is "remote" memory access to localhost.

          Show
          Thomas Jungblut added a comment - Highly correlated with HAMA-546 , since this here is "remote" memory access to localhost.
          Hide
          Suraj Menon added a comment -

          Let me know if the following API access in BSPPeer is enough and named correctly? Shall we add this in 0.5 release, considering it is a simple change of adding a Map<String, Object> and would be needed for anyone who uses Superstep class. Checkpointing can be added for the next release.

          
            /**
             * Save object in peer for future reference. This could be used by 
             * sequence of supersteps. If there is already an object saved under the key, 
             * calling this function overwrites the object for the key. 
             * @param token The String value that should be used as key to save object.
             * @param value The object to be saved for the key.
             */
            public void save(String token, Object value);
          
            /**
             * Returns true if there is already an object saved for the key.
             * @param token The key for which we check if there is already an object 
             * present.
             * @return Return true if object is present for the key.
             */
            public boolean isObjectPresentFor(String token);
          
            /**
             * Returns the object saved for the key.
             * @param token The key for which an object is to be retrieved.
             * @return object The object saved with the key. If the key is not present 
             * the function returns null.
             */
            public Object getSavedObject(String token);
          
          Show
          Suraj Menon added a comment - Let me know if the following API access in BSPPeer is enough and named correctly? Shall we add this in 0.5 release, considering it is a simple change of adding a Map<String, Object> and would be needed for anyone who uses Superstep class. Checkpointing can be added for the next release. /** * Save object in peer for future reference. This could be used by * sequence of supersteps. If there is already an object saved under the key, * calling this function overwrites the object for the key. * @param token The String value that should be used as key to save object. * @param value The object to be saved for the key. */ public void save( String token, Object value); /** * Returns true if there is already an object saved for the key. * @param token The key for which we check if there is already an object * present. * @ return Return true if object is present for the key. */ public boolean isObjectPresentFor( String token); /** * Returns the object saved for the key. * @param token The key for which an object is to be retrieved. * @ return object The object saved with the key. If the key is not present * the function returns null . */ public Object getSavedObject( String token);
          Hide
          Thomas Jungblut added a comment -

          Sorry, but I dislike this Object usage. That's what generics are for. I currently have no clue about how to make it better, but maybe multiple Maps that are generic for each object added.

          We can delegate contains, "isObjectPresentFor" is a bit too verbose, the same with get as "getSavedObject" and save can be put.
          No matter what, we have to extract some interface for that (I prefer generics) and add implementation for off-heap caching solutions like direct memory. These things are very GC heavy and it is better to not store that on the heap directly.

          Shall we add this in 0.5 release

          Let's put that into 6.0 when we try to add the remote memory things.

          Show
          Thomas Jungblut added a comment - Sorry, but I dislike this Object usage. That's what generics are for. I currently have no clue about how to make it better, but maybe multiple Maps that are generic for each object added. We can delegate contains, "isObjectPresentFor" is a bit too verbose, the same with get as "getSavedObject" and save can be put. No matter what, we have to extract some interface for that (I prefer generics) and add implementation for off-heap caching solutions like direct memory. These things are very GC heavy and it is better to not store that on the heap directly. Shall we add this in 0.5 release Let's put that into 6.0 when we try to add the remote memory things.
          Hide
          Suraj Menon added a comment -

          I was looking for something in lines of how we save reference to an object in an HttpSession ( http://docs.oracle.com/javaee/5/api/javax/servlet/http/HttpSession.html#setAttribute(java.lang.String, java.lang.Object) ) across multiple requests. Let's keep aside remote access to peers here for a moment. First requirement to meet is to how the supersteps running in the same peer could use the output of a previous superstep. Say you have defined 10 superstep classes to run in tandem for the job. What if the 10th superstep needs information that 1st superstep had computed. To prevent these values to go out of scope, the user today would have to create singleton(s) for every object they want to share across supersteps. Even if we use any other distributed caching framework, what if the value you are interested in the 10th superstep is not something that could be accessed by hard-coded references and could only be inferred in one of the previous supersteps?

          Introducing generics would be restrictive. I might want to save reference to a DiskQueue or just a string or List of Integers, etc. This would be difficult to achieve. Heap-usage is something that a programmer always(or expected to) keep in mind for solutions on huge data-sets. This map is intended to hold references to only bunch of already instantiated objects and only if needed.

          Show
          Suraj Menon added a comment - I was looking for something in lines of how we save reference to an object in an HttpSession ( http://docs.oracle.com/javaee/5/api/javax/servlet/http/HttpSession.html#setAttribute(java.lang.String , java.lang.Object) ) across multiple requests. Let's keep aside remote access to peers here for a moment. First requirement to meet is to how the supersteps running in the same peer could use the output of a previous superstep. Say you have defined 10 superstep classes to run in tandem for the job. What if the 10th superstep needs information that 1st superstep had computed. To prevent these values to go out of scope, the user today would have to create singleton(s) for every object they want to share across supersteps. Even if we use any other distributed caching framework, what if the value you are interested in the 10th superstep is not something that could be accessed by hard-coded references and could only be inferred in one of the previous supersteps? Introducing generics would be restrictive. I might want to save reference to a DiskQueue or just a string or List of Integers, etc. This would be difficult to achieve. Heap-usage is something that a programmer always(or expected to) keep in mind for solutions on huge data-sets. This map is intended to hold references to only bunch of already instantiated objects and only if needed.
          Hide
          Suraj Menon added a comment -

          Oops forgot to answer on the API function names. I thought put, get and containsKey are expected behavior of a Map. But I thought the names should explain more about this behavior for BSPPeer class which is not a map. Hence the extra verbosity.

          I am just throwing this idea out. Should we have containsKey and get take the Class-name of the expected object to prevent ClassCastExceptions?

          Show
          Suraj Menon added a comment - Oops forgot to answer on the API function names. I thought put, get and containsKey are expected behavior of a Map. But I thought the names should explain more about this behavior for BSPPeer class which is not a map. Hence the extra verbosity. I am just throwing this idea out. Should we have containsKey and get take the Class-name of the expected object to prevent ClassCastExceptions?
          Hide
          Suraj Menon added a comment -

          Hi, Please check the simplest Mapper that I have written, it is a work in progress and not tested at all. The WritableKeyValues class is WritableComparable on the key. The idea is that every mapper would read and exchange the key distribution of each peer among themselves while writing everything to a diskqueue. I am working on Spilling Queue with combiner. So in the first step all the mapper superstep understands the global key distribution and assigns each peer the responsibility for partition of keys such that there is a minimum of messages exchaged. The message exchange happens in the next superstep. Hence I need to provide a reference to the message queue in the next superstep. I also want to achieve parallelism by having a thread work on the combiners during the expensive sync operation. Also you can see how getting peer ID is ugly today, we need a new API to find peer id from the task id provided. All this made me feel the necessity for the API changes.

          Show
          Suraj Menon added a comment - Hi, Please check the simplest Mapper that I have written, it is a work in progress and not tested at all. The WritableKeyValues class is WritableComparable on the key. The idea is that every mapper would read and exchange the key distribution of each peer among themselves while writing everything to a diskqueue. I am working on Spilling Queue with combiner. So in the first step all the mapper superstep understands the global key distribution and assigns each peer the responsibility for partition of keys such that there is a minimum of messages exchaged. The message exchange happens in the next superstep. Hence I need to provide a reference to the message queue in the next superstep. I also want to achieve parallelism by having a thread work on the combiners during the expensive sync operation. Also you can see how getting peer ID is ugly today, we need a new API to find peer id from the task id provided. All this made me feel the necessity for the API changes.
          Suraj Menon made changes -
          Attachment Mapper.java [ 12525436 ]
          Hide
          Thomas Jungblut added a comment -

          Hey Suraj, please give me a bit of time to sort the things out we need. I wanted to give a shot at a prototype for the other issue first, we can then discuss which parts can be shared through both cases.

          I wanted to work through some caching solutions like EHCache or DirectMemory and see how they do that.
          For us I guess retrieval by int should be enough, so we can use Trove library to have low overhead hashmaps.

          However, there are cases where someone wants to build their own key objects, therefore a generic solution wouldn't be so wrong.
          So there must be some polymorphism in this case.

          BTW I think your mapper can be drastically improved Maybe we can have a talk later.

          Show
          Thomas Jungblut added a comment - Hey Suraj, please give me a bit of time to sort the things out we need. I wanted to give a shot at a prototype for the other issue first, we can then discuss which parts can be shared through both cases. I wanted to work through some caching solutions like EHCache or DirectMemory and see how they do that. For us I guess retrieval by int should be enough, so we can use Trove library to have low overhead hashmaps. However, there are cases where someone wants to build their own key objects, therefore a generic solution wouldn't be so wrong. So there must be some polymorphism in this case. BTW I think your mapper can be drastically improved Maybe we can have a talk later.
          Hide
          ChiaHung Lin added a comment -

          My original thought was that we can make use of command pattern (as stated in HAMA-503), with which ideally we still can construct flow as usual.For example, constructing for loop

          For for = new For(conidtion);
          for.add(superstepN).add(superstepN1)...;
          configurator.add(superstep1).add(superstep2).add(for)...;

          ... class For implements Comand{
          For(... conditions)

          { ... }

          public void execute() {
          for(/* extract from conditions */)

          { superstep.execute();// extract superstep and then execute }

          }
          }

          With such method, it looks variables can be cross several supersteps.

          Show
          ChiaHung Lin added a comment - My original thought was that we can make use of command pattern (as stated in HAMA-503 ), with which ideally we still can construct flow as usual.For example, constructing for loop For for = new For(conidtion); for.add(superstepN).add(superstepN1)...; configurator.add(superstep1).add(superstep2).add(for)...; ... class For implements Comand{ For(... conditions) { ... } public void execute() { for(/* extract from conditions */) { superstep.execute();// extract superstep and then execute } } } With such method, it looks variables can be cross several supersteps.
          Hide
          Suraj Menon added a comment -

          Yes, we can share the variables in this case. I would like to understand more about this design. Is this code inside the bsp function and has to be written by a Hama user developer every time he uses chaining model. The above code struck me as the code that we would write in BSPPeer to support this chaining framework. The question here is how would a developer share objects he creates across different classes he designs that is going to run on a different machines?

          Show
          Suraj Menon added a comment - Yes, we can share the variables in this case. I would like to understand more about this design. Is this code inside the bsp function and has to be written by a Hama user developer every time he uses chaining model. The above code struck me as the code that we would write in BSPPeer to support this chaining framework. The question here is how would a developer share objects he creates across different classes he designs that is going to run on a different machines?
          Suraj Menon made changes -
          Link This issue relates to HAMA-639 [ HAMA-639 ]
          Edward J. Yoon made changes -
          Fix Version/s 0.7.0 [ 12320349 ]
          Fix Version/s 0.6.0 [ 12319740 ]
          Edward J. Yoon made changes -
          Fix Version/s 0.7.0 [ 12320349 ]

            People

            • Assignee:
              Unassigned
              Reporter:
              Suraj Menon
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:

                Development