Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: 0.24.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently master worker scenarios are forced fit into Map-Reduce. Now with YARN, these can be first class and would benefit real/near realtime workloads and be more effective in using the cluster resources.

      1. MAPREDUCE-3315.patch
        78 kB
        Nikhil S. Ketkar
      2. MAPREDUCE-3315-1.patch
        79 kB
        Nikhil S. Ketkar
      3. MAPREDUCE-3315-2.patch
        79 kB
        Nikhil S. Ketkar
      4. MAPREDUCE-3315-3.patch
        76 kB
        Nikhil S. Ketkar

        Activity

        Sharad Agarwal created issue -
        Hide
        Sharad Agarwal added a comment -

        some thoughts:

        • AM decides the Tasks.
        • worker keep polling AM for getting the next Task instance.
        • should be able to dynamically increase or shutdown workers - have minWorkers, maxWorkers, keepAlive
        Show
        Sharad Agarwal added a comment - some thoughts: AM decides the Tasks. worker keep polling AM for getting the next Task instance. should be able to dynamically increase or shutdown workers - have minWorkers, maxWorkers, keepAlive
        Hide
        Nikhil S. Ketkar added a comment -

        I have started some work on this issue and I wanted to describe the overall approach I am taking and get some feedback.

        Let me start by describing how an application will be written using Master-Worker. The pseudocode below illustrates the usage.

        class WordCount {
            // Word count implementation using Master-Worker paradigm.
            // Master sends portions of the input file to workers
            // Worker builds a word count hash map (word -> count) and sends it as a result unit
            // Master uses the results to update master dictionary 
        
            class WorkUnit {
            // A list of words
            }
        
            class ResultUnit {
            // Map of word -> count
            }
        
            class Master extends MasterRunner<WorkUnit, ResultUnit> {
        	public manageWorkers() {
                    // Spawn a bunch of workers using spawnNewWorker();
                    // For each spawned worker spawnNewWorker() will return a WorkerReference, keep it around for bookkeeping
                    // Read a file and create WorkUnits with a fixed number of lines to each workers
                    // Assign WorkUnits to Workers using assignWork(), use the previously obtained WorkerReference
                    // Wait for ResultUnits using waitForResult();
                    // Update master dictionary based on a received result unit
                    // After work is done, kill workers using terminateWorker()
               }
            }
        
           class Worker extends WorkerRunner<WorkUnit, ResultUnit>  {
        	public ResultUnit doWork(WorkUnit wu) {
               // Build a word count hash map (word -> count) and sends it as a result unit
             }
           }
        }
        
        // User writes code to setup and launch job
        public static void main(String[] args) throws Exception {
            MWJobConf conf = new MWJobConf(WordCount.class);
            conf.setJobName("WordCount");
            conf.setMasterClass(Master.class);
            conf.setWorkerClass(Worker.class);
            conf.setWorkUnitClass(WorkUnit.class);
            conf.setResultUnitClass(ResultUnit.class);
           MWJobClient.runJob(conf);
         }
        }
        

        Key functionality for the Master Worker will be implemented in the following classes.

        class MasterRunner<W, R> {
            // Spawn new Worker
            protected WorkerReference spawnNewWorker();
        
            // Spawn new Workers
            protected ArrayList<WorkerReference> spawnNewWorkers();
        
            // Assign work to any Worker, returns WorkerReference to whom it was assigned
            protected WorkerReference assignWork(WorkUnit wu);
        
            // Assign work to a specific Worker
            protected void assignWork(WorkerReference wf, WorkUnit wu);
        
            // Wait for result from any Worker. Blocking Call
            protected ResultUnit waitForResult();
        
            // Wait for result from a specific worker. Blocking Call
            protected ResultUnit waitForResult(WorkerReference wf);
        
            // Is this specific worker alive? Blocking Call
            protected boolean isWorkerAlive(WorkerReference wf);
        
            // Get alive workers? Blocking Call
            protected ArrayList<WorkerReference> isWorkerAlive(WorkerReference wf);
        
            // Terminate a specific Worker
            protected void terminateWorker(WorkerReference wf);
        
            // To be implemented by user
            public manageWorkers() = 0;
        }
        
        class WorkerRunner<W, R> {
            // To be implemented by user
            public ResultUnit doWork(WorkUnit wu) = 0;
        }
        
        class WorkUnitContainer<W> {
          // The framework passes around WorkUnitContainers which contain the user defined 
          // WorkUnit and some additional bookkeeping information
        }
        
        class ResultUnitContainer<R> {
         // The framework passes around ResultUnitContainers which contain the user defined 
         // ResultUnit and some additional bookkeeping information
        }
        
        class WorkerReference {
         // Uniquely identifies the Workers
        }
        

        A few questions I have been thinking about are:

        1. Should I use Hadoop IPC or RMI or something else?
        2. Should the Master be "in" the ApplicationManager or be run as a Container?
        Show
        Nikhil S. Ketkar added a comment - I have started some work on this issue and I wanted to describe the overall approach I am taking and get some feedback. Let me start by describing how an application will be written using Master-Worker. The pseudocode below illustrates the usage. class WordCount { // Word count implementation using Master-Worker paradigm. // Master sends portions of the input file to workers // Worker builds a word count hash map (word -> count) and sends it as a result unit // Master uses the results to update master dictionary class WorkUnit { // A list of words } class ResultUnit { // Map of word -> count } class Master extends MasterRunner<WorkUnit, ResultUnit> { public manageWorkers() { // Spawn a bunch of workers using spawnNewWorker(); // For each spawned worker spawnNewWorker() will return a WorkerReference, keep it around for bookkeeping // Read a file and create WorkUnits with a fixed number of lines to each workers // Assign WorkUnits to Workers using assignWork(), use the previously obtained WorkerReference // Wait for ResultUnits using waitForResult(); // Update master dictionary based on a received result unit // After work is done, kill workers using terminateWorker() } } class Worker extends WorkerRunner<WorkUnit, ResultUnit> { public ResultUnit doWork(WorkUnit wu) { // Build a word count hash map (word -> count) and sends it as a result unit } } } // User writes code to setup and launch job public static void main( String [] args) throws Exception { MWJobConf conf = new MWJobConf(WordCount.class); conf.setJobName( "WordCount" ); conf.setMasterClass(Master.class); conf.setWorkerClass(Worker.class); conf.setWorkUnitClass(WorkUnit.class); conf.setResultUnitClass(ResultUnit.class); MWJobClient.runJob(conf); } } Key functionality for the Master Worker will be implemented in the following classes. class MasterRunner<W, R> { // Spawn new Worker protected WorkerReference spawnNewWorker(); // Spawn new Workers protected ArrayList<WorkerReference> spawnNewWorkers(); // Assign work to any Worker, returns WorkerReference to whom it was assigned protected WorkerReference assignWork(WorkUnit wu); // Assign work to a specific Worker protected void assignWork(WorkerReference wf, WorkUnit wu); // Wait for result from any Worker. Blocking Call protected ResultUnit waitForResult(); // Wait for result from a specific worker. Blocking Call protected ResultUnit waitForResult(WorkerReference wf); // Is this specific worker alive? Blocking Call protected boolean isWorkerAlive(WorkerReference wf); // Get alive workers? Blocking Call protected ArrayList<WorkerReference> isWorkerAlive(WorkerReference wf); // Terminate a specific Worker protected void terminateWorker(WorkerReference wf); // To be implemented by user public manageWorkers() = 0; } class WorkerRunner<W, R> { // To be implemented by user public ResultUnit doWork(WorkUnit wu) = 0; } class WorkUnitContainer<W> { // The framework passes around WorkUnitContainers which contain the user defined // WorkUnit and some additional bookkeeping information } class ResultUnitContainer<R> { // The framework passes around ResultUnitContainers which contain the user defined // ResultUnit and some additional bookkeeping information } class WorkerReference { // Uniquely identifies the Workers } A few questions I have been thinking about are: Should I use Hadoop IPC or RMI or something else? Should the Master be "in" the ApplicationManager or be run as a Container?
        Hide
        Sharad Agarwal added a comment -

        Should I use Hadoop IPC or RMI

        Hadoop IPC

        Should the Master be "in" the ApplicationManager or be run as a Container?

        For Master, you need to write a yarn ApplicationMaster (AM) see - http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html
        AM runs in one of the container on the cluster.

        Also it would be good to completely separate the user API (writing master-worker apps) and the runtime implementation clearly.

        Show
        Sharad Agarwal added a comment - Should I use Hadoop IPC or RMI Hadoop IPC Should the Master be "in" the ApplicationManager or be run as a Container? For Master, you need to write a yarn ApplicationMaster (AM) see - http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html AM runs in one of the container on the cluster. Also it would be good to completely separate the user API (writing master-worker apps) and the runtime implementation clearly.
        Hide
        Nikhil S. Ketkar added a comment -

        I have a preliminary implementation done (as described above) and would like to submit a patch. Where should I place the implementation in the source hierarchy? There are 2 maven projects, firstly the Master-Worker framework part and second a very simple example application using the framework.

        Show
        Nikhil S. Ketkar added a comment - I have a preliminary implementation done (as described above) and would like to submit a patch. Where should I place the implementation in the source hierarchy? There are 2 maven projects, firstly the Master-Worker framework part and second a very simple example application using the framework.
        Hide
        Sharad Agarwal added a comment -

        can have it in hadoop-yarn-applications module. Example app could be sub-module of the master-worker app.

        Show
        Sharad Agarwal added a comment - can have it in hadoop-yarn-applications module. Example app could be sub-module of the master-worker app.
        Hide
        Nikhil S. Ketkar added a comment -

        This is a preliminary patch, only for feedback.

        The directory hadoop-yarn-applications-masterworker-core contains the framework. A simple example using the framework is placed in hadoop-yarn-applications-masterworker-example.

        Show
        Nikhil S. Ketkar added a comment - This is a preliminary patch, only for feedback. The directory hadoop-yarn-applications-masterworker-core contains the framework. A simple example using the framework is placed in hadoop-yarn-applications-masterworker-example.
        Nikhil S. Ketkar made changes -
        Field Original Value New Value
        Attachment MAPREDUCE-3315.patch [ 12521145 ]
        Hide
        Vinod Kumar Vavilapalli added a comment -

        We have a hadoop-yarn/hadoop-yarn-applications/ module under which you can put your framework (as a sibling to hadoop-yarn-applications-distributedshell/). The example could go as a sub-module under yours.

        Taking a step back, I think you should start with writing down the APIs:
        (1) The client API for submission and getting the app-status.
        (2) The task API: which the users can override and provide their own impl.
        Advanced: (3) The master API: as to how to order the tasks (a queue?) etc.

        Show
        Vinod Kumar Vavilapalli added a comment - We have a hadoop-yarn/hadoop-yarn-applications/ module under which you can put your framework (as a sibling to hadoop-yarn-applications-distributedshell/). The example could go as a sub-module under yours. Taking a step back, I think you should start with writing down the APIs: (1) The client API for submission and getting the app-status. (2) The task API: which the users can override and provide their own impl. Advanced: (3) The master API: as to how to order the tasks (a queue?) etc.
        Hide
        Nikhil S. Ketkar added a comment -

        Here is a brief description of the API in the current patch. This is quite preliminary and I will be improving on this based on feedback.

        In order to implement a new Master-Worker job, the user has to implement 4 classes, which are, WorkUnit, ResultUnit, Master and Worker. The WorkUnit and ResultUnit classes extend the MWMessage class which is an abstract class and is placed in the Master-Worker framework. Similarly, Master extends the MWApplicationMaster and Worker extends the MWWorkerRunner. Lets look at each of the classes one by one.

        Here is the code for the WorkUnit. Note that here, a single integer has been added as a payload and it represents the data that the Master will populate and the Worker will work on. The framework passes around MWMessage objects and is unaware of the additional data that might be contained in the MWMessage. It is the users reponsibility to populate and extract the payload information (in the Master and Worker classes) and also provide methods to serialize and deserialize the payload data.

        public class WorkUnit extends MWMessage {
          int data;
        
          public int getData() {
            return data;
          }
        
          public void setData(int data) {
            this.data = data;
          }
        
          @Override
          public void writeWorkUnit(DataOutput out) throws IOException {
            out.writeInt(data);
          }
        
          @Override
          public void readFieldsWorkUnit(DataInput in) throws IOException {
            data = in.readInt();
          }
        }
        

        Similarly, here is the code for the ResultUnit. For our simple example its quite identical to the WorkUnit. As with the WorkUnit, the result unit also contains the payload and its the users responsibility to populate and extract the payload and provide functionality to serialize and deserialize the payload.

        public class ResultUnit extends MWMessage {
          int data;
          
          public int getData() {
            return data;
          }
        
          public void setData(int data) {
            this.data = data;
          }
        
          @Override
          public void writeWorkUnit(DataOutput out) throws IOException {
            out.writeInt(data);   
          }
        
          @Override
          public void readFieldsWorkUnit(DataInput in) throws IOException {
            data = in.readInt();
          }
        

        Now lets look at the API for the Worker. Any Worker should extend the MWWorkerRunner class. It should override the doWork method which basically receives a WorkUnit and returns a ResultUnit. For this simple example, I am simply populating the ResultUnit with the data in the WorkUnit.

        public class MWWorker extends MWWorkerRunner {
        
          public static void main(String[] args) {
            MWWorker curr = new MWWorker();
            try {
              curr.init("localhost", 16001);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        
          @Override
          public MWMessage doWork(MWMessage workUnit) {
            ResultUnit result = new ResultUnit();
            int got = ((WorkUnit) workUnit).getData();
            result.setData(got);
            return result;
          }
        
        }
        

        Now, on to the Master. Any Master extends the MWApplicationMaster class and overrides the manageWorkers method. There are 4 methods in MWApplicationMaster that the master can use. The addWorker method which simply adds a worker. Similarly, there is a killWorker method that kills a worker. This basically allows the user to add workers and get rid of them based on the work load. To assign work, the user can use the addWork method which takes the WorkUnit as a parameter. This is a non-blocking call. To get a ResultUnit the user can use the waitForResult method which returns a ResultUnit. This is a blocking call.

        public class MWMaster extends MWApplicationMaster {
          private static final Log LOG = LogFactory.getLog(MWMaster.class);
          
          public static void main(String[] args) throws InterruptedException, ParseException, IOException, URISyntaxException {
            MWMaster curr = new MWMaster();
            curr.initiate(args);
            curr.terminate();
          }
          
          @Override
          public void manageWorkers() {
        
          addWorker();
          addWorker();
            
           for(int i = 0; i < 100; i++) {
             WorkUnit curr = new WorkUnit();
             curr.setData(i);
             addWork(curr);
           }
        
           for(int i = 0; i < 100; i++) {
            try {
              ResultUnit curr = (ResultUnit) waitForResult();
              LOG.info("Receiveing Result" + curr.getData());
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
           }
        
           killWorker();
           killWorker();
          }
        }
        

        Currenly, I have not provided Client API for submission. The Client is basically a part of the framework, the code of this can be found in the MWClient class.

        Here is how the example application is to be launched. There are 4 required parameters, the MasterWorker Library Jar (--masterworkerlib) which contains the client code, the MasterWorker Application Jar (masterworkerapp) which contains the users application, and the main classes for the Master and the Worker (masterclass and workerclass) respectively.

        hadoop jar masterworker-0.0.1-SNAPSHOT.jar org.apache.hadoop.yarn.applications.masterworker.MWClient --masterworkerlib hadoop-yarn-applications-masterworker-core-3.0.0-SNAPSHOT.jar --masterworkerapp hadoop-yarn-applications-masterworker-example-3.0.0-SNAPSHOT.jar --masterclass org.apache.hadoop.yarn.applications.masterworkerexample.MWMaster --workerclass org.apache.hadoop.yarn.applications.masterworkerexample.MWWorker
        
        Show
        Nikhil S. Ketkar added a comment - Here is a brief description of the API in the current patch. This is quite preliminary and I will be improving on this based on feedback. In order to implement a new Master-Worker job, the user has to implement 4 classes, which are, WorkUnit, ResultUnit, Master and Worker. The WorkUnit and ResultUnit classes extend the MWMessage class which is an abstract class and is placed in the Master-Worker framework. Similarly, Master extends the MWApplicationMaster and Worker extends the MWWorkerRunner. Lets look at each of the classes one by one. Here is the code for the WorkUnit. Note that here, a single integer has been added as a payload and it represents the data that the Master will populate and the Worker will work on. The framework passes around MWMessage objects and is unaware of the additional data that might be contained in the MWMessage. It is the users reponsibility to populate and extract the payload information (in the Master and Worker classes) and also provide methods to serialize and deserialize the payload data. public class WorkUnit extends MWMessage { int data; public int getData() { return data; } public void setData( int data) { this .data = data; } @Override public void writeWorkUnit(DataOutput out) throws IOException { out.writeInt(data); } @Override public void readFieldsWorkUnit(DataInput in) throws IOException { data = in.readInt(); } } Similarly, here is the code for the ResultUnit. For our simple example its quite identical to the WorkUnit. As with the WorkUnit, the result unit also contains the payload and its the users responsibility to populate and extract the payload and provide functionality to serialize and deserialize the payload. public class ResultUnit extends MWMessage { int data; public int getData() { return data; } public void setData( int data) { this .data = data; } @Override public void writeWorkUnit(DataOutput out) throws IOException { out.writeInt(data); } @Override public void readFieldsWorkUnit(DataInput in) throws IOException { data = in.readInt(); } Now lets look at the API for the Worker. Any Worker should extend the MWWorkerRunner class. It should override the doWork method which basically receives a WorkUnit and returns a ResultUnit. For this simple example, I am simply populating the ResultUnit with the data in the WorkUnit. public class MWWorker extends MWWorkerRunner { public static void main( String [] args) { MWWorker curr = new MWWorker(); try { curr.init( "localhost" , 16001); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public MWMessage doWork(MWMessage workUnit) { ResultUnit result = new ResultUnit(); int got = ((WorkUnit) workUnit).getData(); result.setData(got); return result; } } Now, on to the Master. Any Master extends the MWApplicationMaster class and overrides the manageWorkers method. There are 4 methods in MWApplicationMaster that the master can use. The addWorker method which simply adds a worker. Similarly, there is a killWorker method that kills a worker. This basically allows the user to add workers and get rid of them based on the work load. To assign work, the user can use the addWork method which takes the WorkUnit as a parameter. This is a non-blocking call. To get a ResultUnit the user can use the waitForResult method which returns a ResultUnit. This is a blocking call. public class MWMaster extends MWApplicationMaster { private static final Log LOG = LogFactory.getLog(MWMaster.class); public static void main( String [] args) throws InterruptedException, ParseException, IOException, URISyntaxException { MWMaster curr = new MWMaster(); curr.initiate(args); curr.terminate(); } @Override public void manageWorkers() { addWorker(); addWorker(); for ( int i = 0; i < 100; i++) { WorkUnit curr = new WorkUnit(); curr.setData(i); addWork(curr); } for ( int i = 0; i < 100; i++) { try { ResultUnit curr = (ResultUnit) waitForResult(); LOG.info( "Receiveing Result" + curr.getData()); } catch (InterruptedException e) { e.printStackTrace(); } } killWorker(); killWorker(); } } Currenly, I have not provided Client API for submission. The Client is basically a part of the framework, the code of this can be found in the MWClient class. Here is how the example application is to be launched. There are 4 required parameters, the MasterWorker Library Jar (--masterworkerlib) which contains the client code, the MasterWorker Application Jar (masterworkerapp) which contains the users application, and the main classes for the Master and the Worker (masterclass and workerclass) respectively. hadoop jar masterworker-0.0.1-SNAPSHOT.jar org.apache.hadoop.yarn.applications.masterworker.MWClient --masterworkerlib hadoop-yarn-applications-masterworker-core-3.0.0-SNAPSHOT.jar --masterworkerapp hadoop-yarn-applications-masterworker-example-3.0.0-SNAPSHOT.jar --masterclass org.apache.hadoop.yarn.applications.masterworkerexample.MWMaster --workerclass org.apache.hadoop.yarn.applications.masterworkerexample.MWWorker
        Hide
        Sharad Agarwal added a comment -

        Thanks Nikhil for the patch. Will have a look at it.

        Show
        Sharad Agarwal added a comment - Thanks Nikhil for the patch. Will have a look at it.
        Hide
        Nikhil S. Ketkar added a comment -

        I have added a new patch which includes the previous work with one additional change regarding the hostname of the Master (ApplicationMaster) which runs a Hadoop IPC server (Workers/Containers are Hadoop IPC clients). Earlier, I had hardcoded the hostname. Now, the hostname of the Master is identified and sent to each of the workers (containers) as an environment variable.

        The port is still hard coded which is not good. What this means is that one physical node can run only a single Master. I am thinking about specifying a list ports, if one is occupied the next one in the list can be tried. I suppose it would be sufficient to have about 10-20. Any suggessions?

        Show
        Nikhil S. Ketkar added a comment - I have added a new patch which includes the previous work with one additional change regarding the hostname of the Master (ApplicationMaster) which runs a Hadoop IPC server (Workers/Containers are Hadoop IPC clients). Earlier, I had hardcoded the hostname. Now, the hostname of the Master is identified and sent to each of the workers (containers) as an environment variable. The port is still hard coded which is not good. What this means is that one physical node can run only a single Master. I am thinking about specifying a list ports, if one is occupied the next one in the list can be tried. I suppose it would be sufficient to have about 10-20. Any suggessions?
        Nikhil S. Ketkar made changes -
        Attachment MAPREDUCE-3315-1.patch [ 12521307 ]
        Hide
        Robert Joseph Evans added a comment -

        You could use an ephemeral port rather then limiting the set of ports needed. either way you are going to need to send the port to the workers probably as an environment variable. Without it workers could join with the incorrect master, and a malicious or buggy master could accept them and steal the worker.

        Show
        Robert Joseph Evans added a comment - You could use an ephemeral port rather then limiting the set of ports needed. either way you are going to need to send the port to the workers probably as an environment variable. Without it workers could join with the incorrect master, and a malicious or buggy master could accept them and steal the worker.
        Hide
        Nikhil S. Ketkar added a comment -

        Patch contains previous implementation with the hard coding of the port for the Hadoop IPC (in the Master and Workers) removed.

        Now we iterate over the Linux Ephemeral Port range from 32768 to 61000. Catch the BindException implying that the port is being used. Then move on to the next port. If no port is found from 32768 to 61000 throw BindException.

        We keep track of the port and send it to the Workers (Containers) as an environment variable.

        Show
        Nikhil S. Ketkar added a comment - Patch contains previous implementation with the hard coding of the port for the Hadoop IPC (in the Master and Workers) removed. Now we iterate over the Linux Ephemeral Port range from 32768 to 61000. Catch the BindException implying that the port is being used. Then move on to the next port. If no port is found from 32768 to 61000 throw BindException. We keep track of the port and send it to the Workers (Containers) as an environment variable.
        Nikhil S. Ketkar made changes -
        Attachment MAPREDUCE-3315-2.patch [ 12521354 ]
        Hide
        Robert Joseph Evans added a comment -

        If you pass in a port number of 0 the OS will pick a free port. You can then query the server to know what port it actually bound to.

        Show
        Robert Joseph Evans added a comment - If you pass in a port number of 0 the OS will pick a free port. You can then query the server to know what port it actually bound to.
        Hide
        Nikhil S. Ketkar added a comment -

        I am using org.apache.hadoop.ipc.RPC.getServer(Class protocol, Object instance, String bindAddress, int port, Configuration conf) to create a RPC server as follows.

            int linuxEphemeralPortRangeStart = 32768;
            int linuxEphemeralPortRangeEnd = 61000;
            int port = -1;
            for(int i = linuxEphemeralPortRangeStart; i <= linuxEphemeralPortRangeEnd; i++) {
              try {
                server = RPC.getServer(MWProtocol.class, this, hostname, i, conf);
                port = i;
                break;
              } catch (BindException e) {        
              }      
            }
            if (port == -1)
              throw new BindException();
            else {
              server.start();
              return port;      
        

        Does RPC.getServer also pick a free port if 0 is specified? I tried to dig around but found not find any information. What code should I be looking at to confirm this?

        Show
        Nikhil S. Ketkar added a comment - I am using org.apache.hadoop.ipc.RPC.getServer(Class protocol, Object instance, String bindAddress, int port, Configuration conf) to create a RPC server as follows. int linuxEphemeralPortRangeStart = 32768; int linuxEphemeralPortRangeEnd = 61000; int port = -1; for ( int i = linuxEphemeralPortRangeStart; i <= linuxEphemeralPortRangeEnd; i++) { try { server = RPC.getServer(MWProtocol.class, this , hostname, i, conf); port = i; break ; } catch (BindException e) { } } if (port == -1) throw new BindException(); else { server.start(); return port; Does RPC.getServer also pick a free port if 0 is specified? I tried to dig around but found not find any information. What code should I be looking at to confirm this?
        Hide
        Robert Joseph Evans added a comment -

        Yes it does.

        server = RPC.getServer(MWProtocol.class, this, hostname, 0, conf);
        port = server.getPort();
        server.start();
        
        Show
        Robert Joseph Evans added a comment - Yes it does. server = RPC.getServer(MWProtocol.class, this , hostname, 0, conf); port = server.getPort(); server.start();
        Hide
        Nikhil S. Ketkar added a comment -

        New patch with all previous changes, plus the change suggested by Robert Joseph Evans on finding an unused port for the Master.

        Show
        Nikhil S. Ketkar added a comment - New patch with all previous changes, plus the change suggested by Robert Joseph Evans on finding an unused port for the Master.
        Nikhil S. Ketkar made changes -
        Attachment MAPREDUCE-3315-3.patch [ 12521465 ]
        Hide
        Sharad Agarwal added a comment -

        Thanks Nikhil. Overall a good start. Needs some changes primarily to make the framework scalable and fault tolerant.

        Specific comments as follows:

        • MWProtocol: Would be better to have a heartbeat kind of interface between worker and master via which worker sends the status reports and results of completed workunits as HeartbeatRequest and gets more WorkUnits and instructions as HeartbeatResponse. Using MWMessage for passing workunit and result is confusing. workunit and result could be just Writable objects. special instructions like kill should be given via separate Action commands in the HeartbeatResponse.
        • MWWorkerRunner:
          If unable to contact Master for sometime, then worker should do a suicide
        • MWMasterRunner:
          if doesn't receive the heartbeat from worker for certain time period, then mark the worker as killed and launch a new worker. (The assumption is that doWork in is idempotent).
        • AMRMProtocolWraper:
          requestContainer() is currently blocking. This will have a high startup cost if number of workers are high.
        • MWApplicationMaster:
          addWorker: is blocking. Requests container and container launch is sequential. Will have high worker startup cost. Should be done via a thread pool for parallel launches. See ContainerLauncher in MR application master.
          For each worker launch, a new ContainerManagerWrapper thread is created. This is not scalable.
        • Extend org.apache.hadoop.yarn.service.AbstractService or CompositeService for all moving components
        • Add code comments. Javadocs to public and protected methods.
        • Add unit tests.
        • Dynamic worker pool: minWorker/maxWorkers. Needs client protocol say MWClientProtocol to see the status of the overall application. Potentially submit new workunits, kill workers, add workers etc.
        Show
        Sharad Agarwal added a comment - Thanks Nikhil. Overall a good start. Needs some changes primarily to make the framework scalable and fault tolerant. Specific comments as follows: MWProtocol: Would be better to have a heartbeat kind of interface between worker and master via which worker sends the status reports and results of completed workunits as HeartbeatRequest and gets more WorkUnits and instructions as HeartbeatResponse. Using MWMessage for passing workunit and result is confusing. workunit and result could be just Writable objects. special instructions like kill should be given via separate Action commands in the HeartbeatResponse. MWWorkerRunner: If unable to contact Master for sometime, then worker should do a suicide MWMasterRunner: if doesn't receive the heartbeat from worker for certain time period, then mark the worker as killed and launch a new worker. (The assumption is that doWork in is idempotent). AMRMProtocolWraper: requestContainer() is currently blocking. This will have a high startup cost if number of workers are high. MWApplicationMaster: addWorker: is blocking. Requests container and container launch is sequential. Will have high worker startup cost. Should be done via a thread pool for parallel launches. See ContainerLauncher in MR application master. For each worker launch, a new ContainerManagerWrapper thread is created. This is not scalable. Extend org.apache.hadoop.yarn.service.AbstractService or CompositeService for all moving components Add code comments. Javadocs to public and protected methods. Add unit tests. Dynamic worker pool: minWorker/maxWorkers. Needs client protocol say MWClientProtocol to see the status of the overall application. Potentially submit new workunits, kill workers, add workers etc.
        Hide
        Nikhil S. Ketkar added a comment -

        Thanks. Will address these comments and submit a patch, ASAP.

        Show
        Nikhil S. Ketkar added a comment - Thanks. Will address these comments and submit a patch, ASAP.
        Hide
        Nikhil S. Ketkar added a comment -

        Sharad based on your comment:

        "MWProtocol: Would be better to have a heartbeat kind of interface between worker and master via which worker sends the status reports and results of completed workunits as HeartbeatRequest and gets more WorkUnits and instructions as HeartbeatResponse. Using MWMessage for passing workunit and result is confusing. workunit and result could be just Writable objects. special instructions like kill should be given via separate Action commands in the HeartbeatResponse."

        Do you mean this?

        public interface MWProtocol {
          public static final long versionID = 1L;
          Writable heartBeat(Writable workUnit);
          // User implements action commands and status reports himself
          // Worker implemented by the user should handle all action commands take appropriate action like dying and also send send status
          // Master implemented by the user should construct and send action commands, consume status report and take proper action
        }
        

        Or this?

        public interface MWProtocol {
          public static final long versionID = 1L;
          HeartbeatResponse heartBeat(Writable HeartbeatRequest);
        }
        
        public class HeartbeatResponse implements Writable {
          // This class has an action command which is hidden from the user
          // The framework will operate on the action command, like causing a worker to die
          // User extends this class to carry Work
        }
        
        public class HeartbeatRequest implements Writable {
          // This class has worker status which is hidden from the user
          // User extends this class to carry Result
        }
        

        Or, did I misunderstand the comment?

        Show
        Nikhil S. Ketkar added a comment - Sharad based on your comment: "MWProtocol: Would be better to have a heartbeat kind of interface between worker and master via which worker sends the status reports and results of completed workunits as HeartbeatRequest and gets more WorkUnits and instructions as HeartbeatResponse. Using MWMessage for passing workunit and result is confusing. workunit and result could be just Writable objects. special instructions like kill should be given via separate Action commands in the HeartbeatResponse." Do you mean this? public interface MWProtocol { public static final long versionID = 1L; Writable heartBeat(Writable workUnit); // User implements action commands and status reports himself // Worker implemented by the user should handle all action commands take appropriate action like dying and also send send status // Master implemented by the user should construct and send action commands, consume status report and take proper action } Or this? public interface MWProtocol { public static final long versionID = 1L; HeartbeatResponse heartBeat(Writable HeartbeatRequest); } public class HeartbeatResponse implements Writable { // This class has an action command which is hidden from the user // The framework will operate on the action command, like causing a worker to die // User extends this class to carry Work } public class HeartbeatRequest implements Writable { // This class has worker status which is hidden from the user // User extends this class to carry Result } Or, did I misunderstand the comment?
        Hide
        Sharad Agarwal added a comment -

        I meant the later one. The only difference is that I think it is better to not expose HeartbeatRequest and HeartbeatResponse to users beacause certain fields like actions etc are framework level. Instead these classes should composes the fields required by the user.

        Show
        Sharad Agarwal added a comment - I meant the later one. The only difference is that I think it is better to not expose HeartbeatRequest and HeartbeatResponse to users beacause certain fields like actions etc are framework level. Instead these classes should composes the fields required by the user.
        Hide
        Nikhil S. Ketkar added a comment -

        Sharad, you mean the following, right?

        public interface MWProtocol {
        public static final long versionID = 1L;
        HeartbeatResponse heartBeat(Writable HeartbeatRequest);
        }
        
        public class HeartbeatResponse implements Writable {
        }
        
        public class HeartbeatRequest implements Writable {
        }
        
        // User extends MasterRunner and WorkerRunner
        
        public abstract class WorkerRunner {
        Writable doWork(Writable workUnit); // Abstract, implemented by user
        }
        
        public abstract class MasterRunner {
        
        public void addWorkers() {
           // Implementation
        }
        
        public void killWorkers() {
           // Implementation
        }
        
        public void assignWork(Writable workUnit) {
          // Implementation
          // uses heartBeat internally but HeartbeatResponse, HeartbeatRequest not exposed
        }
        
        public Writable waitForResult() {
           // Implementation
          // uses heartBeat internally but HeartbeatResponse, HeartbeatRequest not exposed
        }
        
        void manageWorkers(); // Abstract, implemented by user, can use addWorker(), killWorker(), assignWork() and waitForResult()
        }
        
        
        Show
        Nikhil S. Ketkar added a comment - Sharad, you mean the following, right? public interface MWProtocol { public static final long versionID = 1L; HeartbeatResponse heartBeat(Writable HeartbeatRequest); } public class HeartbeatResponse implements Writable { } public class HeartbeatRequest implements Writable { } // User extends MasterRunner and WorkerRunner public abstract class WorkerRunner { Writable doWork(Writable workUnit); // Abstract, implemented by user } public abstract class MasterRunner { public void addWorkers() { // Implementation } public void killWorkers() { // Implementation } public void assignWork(Writable workUnit) { // Implementation // uses heartBeat internally but HeartbeatResponse, HeartbeatRequest not exposed } public Writable waitForResult() { // Implementation // uses heartBeat internally but HeartbeatResponse, HeartbeatRequest not exposed } void manageWorkers(); // Abstract, implemented by user, can use addWorker(), killWorker(), assignWork() and waitForResult() }

          People

          • Assignee:
            Sharad Agarwal
            Reporter:
            Sharad Agarwal
          • Votes:
            2 Vote for this issue
            Watchers:
            17 Start watching this issue

            Dates

            • Created:
              Updated:

              Development