Details

    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: bsp core
    • Labels:
      None

      Description

      Add StreamInputFormat that reads newly appended records from previous superstep.

      I roughly guess it will be possible using reopen() method and file offset.

        Issue Links

          Activity

          Hide
          bsikander Behroz Sikander added a comment -

          Edward J. Yoon This seems to be an interesting feature. Can you add some more details to this ?

          Show
          bsikander Behroz Sikander added a comment - Edward J. Yoon This seems to be an interesting feature. Can you add some more details to this ?
          Hide
          udanax Edward J. Yoon added a comment -

          As I mentioned in Description, we can simply check whether there's an newly appended records to the input file, keeping last read offset.

          To implement this, first of all, you should see the InputFormat interface class. The tricky issue is how we implement the getSplits() method and multiple tasks.

          At the moment, my simple idea is that one bsp task acts as a "Stream input queue" without implement StreamInputFormat and change the framework core. For example, we set the file path in job configuration. The master task acts like below:

          if(isMaster(peer.me)) {
            while(true) {
               peer.reopen(); // reopen
               peer.skip(offset); // jump to last offset
               if(peer.readNext()) {
                   // at here we do load-balance.
                  sendTo("send a newly appended record to free slave tasks");
               } else {
                  Thread.sleep();
               }
            }
          }
          
          Show
          udanax Edward J. Yoon added a comment - As I mentioned in Description, we can simply check whether there's an newly appended records to the input file, keeping last read offset. To implement this, first of all, you should see the InputFormat interface class. The tricky issue is how we implement the getSplits() method and multiple tasks. At the moment, my simple idea is that one bsp task acts as a "Stream input queue" without implement StreamInputFormat and change the framework core. For example, we set the file path in job configuration. The master task acts like below: if (isMaster(peer.me)) { while ( true ) { peer.reopen(); // reopen peer.skip(offset); // jump to last offset if (peer.readNext()) { // at here we do load-balance. sendTo( "send a newly appended record to free slave tasks" ); } else { Thread .sleep(); } } }
          Hide
          udanax Edward J. Yoon added a comment -

          If we can hide these implmentations and simplified APIs for processing stream data, I think this way is the better.

          Show
          udanax Edward J. Yoon added a comment - If we can hide these implmentations and simplified APIs for processing stream data, I think this way is the better.
          Hide
          bsikander Behroz Sikander added a comment -

          Your idea seems to be workable. I will study the InputFormat interface and will update you soon.

          Show
          bsikander Behroz Sikander added a comment - Your idea seems to be workable. I will study the InputFormat interface and will update you soon.

            People

            • Assignee:
              Unassigned
              Reporter:
              udanax Edward J. Yoon
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:

                Development