diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index b3ea865..67e00ba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -216,6 +216,8 @@ private int containerMemory = 10; // VirtualCores to request for the container on which the shell command will run private int containerVirtualCores = 1; + // VirtualDisks to request for the container on which the shell command will run + private int containerVirtualDisks = 0; // Priority of the request private int requestPriority; @@ -356,6 +358,8 @@ public boolean init(String[] args) throws ParseException, IOException { "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_vdisks", true, + "Amount of virtual disks to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); @@ -488,6 +492,8 @@ public boolean init(String[] args) throws ParseException, IOException { "container_memory", "10")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( "container_vcores", "1")); + containerVirtualDisks = Integer.parseInt(cliParser.getOptionValue( + "container_vdisks", "0")); numTotalContainers = Integer.parseInt(cliParser.getOptionValue( "num_containers", "1")); if (numTotalContainers == 0) { @@ -590,6 +596,9 @@ public void run() throws YarnException, IOException { int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); + int maxVDisks = response.getMaximumResourceCapability().getVirtualDisks(); + LOG.info("Max vdisks capability of resources in this cluster " + maxVDisks); + // A resource ask cannot exceed the max. if (containerMemory > maxMem) { LOG.info("Container memory specified above max threshold of cluster." @@ -605,6 +614,13 @@ public void run() throws YarnException, IOException { containerVirtualCores = maxVCores; } + if (containerVirtualDisks > maxVDisks) { + LOG.info("Container virtual disks specified above max threshold of cluster." + + " Using max value. specified=" + containerVirtualDisks + ", max=" + + maxVDisks); + containerVirtualDisks = maxVDisks; + } + List previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() @@ -772,10 +788,12 @@ public void onContainersAllocated(List allocatedContainers) { + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() - + ", containerResourceMemory" + + ", containerResourceMemory=" + allocatedContainer.getResource().getMemory() - + ", containerResourceVirtualCores" - + allocatedContainer.getResource().getVirtualCores()); + + ", containerResourceVirtualCores=" + + allocatedContainer.getResource().getVirtualCores() + + ", containerResourceVirtualDisks=" + + allocatedContainer.getResource().getVirtualDisks()); // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); @@ -1039,7 +1057,7 @@ private ContainerRequest setupContainerAskForRM() { // Set up resource type requirements // For now, memory and CPU are supported so we set memory and cpu requirements Resource capability = Resource.newInstance(containerMemory, - containerVirtualCores); + containerVirtualCores, containerVirtualDisks); ContainerRequest request = new ContainerRequest(capability, null, null, pri); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0e9a4e4..519250d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -129,6 +129,8 @@ private int amMemory = 10; // Amt. of virtual core resource to request for to run the App Master private int amVCores = 1; + // Amt. of virtual disk resource to request for to run the App Master + private int amVDisks = 0; // Application master jar file private String appMasterJar = ""; @@ -150,6 +152,8 @@ private int containerMemory = 10; // Amt. of virtual cores to request for container in which shell script will be executed private int containerVirtualCores = 1; + // Amt. of virtual disks to request for container in which shell script will be executed + private int containerVirtualDisks = 0; // No. of containers in which the shell script needs to be executed private int numContainers = 1; private String nodeLabelExpression = null; @@ -245,6 +249,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); + opts.addOption("master_vdisks", true, "Amount of virtual disks to be requested to run the application master"); opts.addOption("jar", true, "Jar file containing the application master"); opts.addOption("shell_command", true, "Shell command to be executed by " + "the Application Master. Can only specify either --shell_command " + @@ -258,6 +263,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_vdisks", true, "Amount of virtual disks to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, @@ -345,6 +351,7 @@ public boolean init(String[] args) throws ParseException { amQueue = cliParser.getOptionValue("queue", "default"); amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); + amVDisks = Integer.parseInt(cliParser.getOptionValue("master_vdisks", "0")); if (amMemory < 0) { throw new IllegalArgumentException("Invalid memory specified for application master, exiting." @@ -355,6 +362,11 @@ public boolean init(String[] args) throws ParseException { + " Specified virtual cores=" + amVCores); } + if (amVDisks < 0) { + throw new IllegalArgumentException("Invalid virtual disks specified for application master, exiting." + + " Specified virtual disks=" + amVDisks); + } + if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); } @@ -396,14 +408,17 @@ public boolean init(String[] args) throws ParseException { containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); + containerVirtualDisks = Integer.parseInt(cliParser.getOptionValue("container_vdisks", "0")); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); - if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { - throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," + if (containerMemory < 0 || containerVirtualCores < 0 || + containerVirtualDisks < 0 || numContainers < 1) { + throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores/vdisks specified," + " exiting." + " Specified containerMemory=" + containerMemory + ", containerVirtualCores=" + containerVirtualCores + + ", containerVirtualDisks=" + containerVirtualDisks + ", numContainer=" + numContainers); } @@ -507,6 +522,16 @@ public boolean run() throws IOException, YarnException { + ", max=" + maxVCores); amVCores = maxVCores; } + + int maxVDisks = appResponse.getMaximumResourceCapability().getVirtualDisks(); + LOG.info("Max virtual disks capability of resources in this cluster " + maxVDisks); + + if (amVDisks > maxVDisks) { + LOG.info("AM virtual disks specified above max threshold of cluster. " + + "Using max value." + "specified=" + amVDisks + + ", max=" + maxVDisks); + amVDisks = maxVDisks; + } // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); @@ -625,6 +650,7 @@ public boolean run() throws IOException, YarnException { // Set params for Application Master vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); + vargs.add("--container_vdisks " + String.valueOf(containerVirtualDisks)); vargs.add("--num_containers " + String.valueOf(numContainers)); if (null != nodeLabelExpression) { appContext.setNodeLabelExpression(nodeLabelExpression); @@ -658,7 +684,7 @@ public boolean run() throws IOException, YarnException { // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements - Resource capability = Resource.newInstance(amMemory, amVCores); + Resource capability = Resource.newInstance(amMemory, amVCores, amVDisks); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index a05b3b0..92e6d7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -159,9 +159,13 @@ public void testDSShell(boolean haveDomain) throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", + "1", + "--container_vdisks", "1" }; if (haveDomain) { @@ -469,9 +473,13 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", + "1", + "--container_vdisks", "1" }; @@ -513,9 +521,13 @@ public void testDSShellWithCommands() throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", + "1", + "--container_vdisks", "1" }; @@ -547,9 +559,13 @@ public void testDSShellWithMultipleArgs() throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", + "1", + "--container_vdisks", "1" }; @@ -595,9 +611,13 @@ public void testDSShellWithShellScript() throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", + "1", + "--container_vdisks", "1" }; @@ -699,9 +719,13 @@ public void testDSShellWithInvalidArgs() throws Exception { "512", "--master_vcores", "-2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", + "1", + "--container_vdisks", "1" }; client.init(args); @@ -724,10 +748,14 @@ public void testDSShellWithInvalidArgs() throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", "1", + "--container_vdisks", + "1", "--shell_script", "test.sh" }; @@ -750,9 +778,13 @@ public void testDSShellWithInvalidArgs() throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", + "1", + "--container_vdisks", "1" }; client.init(args); @@ -817,10 +849,14 @@ public void testDebugFlag() throws Exception { "512", "--master_vcores", "2", + "--master_vdisks", + "1", "--container_memory", "128", "--container_vcores", "1", + "--container_vdisks", + "1", "--debug" };