diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java index 83db5fcf857..1e237ba0fdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; @@ -33,6 +34,7 @@ import java.io.*; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,7 +107,7 @@ private final PrivilegedOperationExecutor privilegedOperationExecutor; private String tmpDirPath; - private String device; + private Collection devices; private int rootBandwidthMbit; private int yarnBandwidthMbit; private int defaultClassBandwidthMbit; @@ -119,10 +121,10 @@ /** * Bootstrap tc configuration */ - public void bootstrap(String device, int rootBandwidthMbit, int + public void bootstrap(String devices, int rootBandwidthMbit, int yarnBandwidthMbit) throws ResourceHandlerException { - if (device == null) { + if (devices == null) { throw new ResourceHandlerException("device cannot be null!"); } @@ -139,7 +141,7 @@ public void bootstrap(String device, int rootBandwidthMbit, int tmpDirPath); } - this.device = device; + this.devices = StringUtils.getTrimmedStringCollection(devices); this.rootBandwidthMbit = rootBandwidthMbit; this.yarnBandwidthMbit = yarnBandwidthMbit; defaultClassBandwidthMbit = (rootBandwidthMbit - yarnBandwidthMbit) <= 0 @@ -153,7 +155,8 @@ public void bootstrap(String device, int rootBandwidthMbit, int LOG.info("NM recovery is not enabled. We'll wipe tc state before proceeding."); } else { //NM recovery enabled - run a state check - state = readState(); + //check for first device in case of multiple devices + state = readState(this.devices.iterator().next()); if (checkIfAlreadyBootstrapped(state)) { LOG.info("TC configuration is already in place. Not wiping state."); @@ -171,25 +174,27 @@ public void bootstrap(String device, int rootBandwidthMbit, int } private void initializeState() throws ResourceHandlerException { - LOG.info("Initializing tc state."); + for (String device : devices) { + LOG.info("Initializing tc state. device: " + device); + + BatchBuilder builder = new BatchBuilder(PrivilegedOperation. + OperationType.TC_MODIFY_STATE) + .addRootQDisc(device) + .addCGroupFilter(device) + .addClassToRootQDisc(rootBandwidthMbit, device) + .addDefaultClass(defaultClassBandwidthMbit, rootBandwidthMbit, device) + //yarn bandwidth is capped with rate = ceil + .addYARNRootClass(yarnBandwidthMbit, yarnBandwidthMbit, device); + PrivilegedOperation op = builder.commitBatchToTempFile(); - BatchBuilder builder = new BatchBuilder(PrivilegedOperation. - OperationType.TC_MODIFY_STATE) - .addRootQDisc() - .addCGroupFilter() - .addClassToRootQDisc(rootBandwidthMbit) - .addDefaultClass(defaultClassBandwidthMbit, rootBandwidthMbit) - //yarn bandwidth is capped with rate = ceil - .addYARNRootClass(yarnBandwidthMbit, yarnBandwidthMbit); - PrivilegedOperation op = builder.commitBatchToTempFile(); - - try { - privilegedOperationExecutor.executePrivilegedOperation(op, false); - } catch (PrivilegedOperationException e) { - LOG.warn("Failed to bootstrap outbound bandwidth configuration"); + try { + privilegedOperationExecutor.executePrivilegedOperation(op, false); + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to bootstrap outbound bandwidth configuration. device: " + device); - throw new ResourceHandlerException( - "Failed to bootstrap outbound bandwidth configuration", e); + throw new ResourceHandlerException( + "Failed to bootstrap outbound bandwidth configuration. device: " + device, e); + } } } @@ -238,7 +243,7 @@ private boolean checkIfAlreadyBootstrapped(String state) return true; } - private String readState() throws ResourceHandlerException { + private String readState(String device) throws ResourceHandlerException { //Sample state output: // qdisc htb 42: root refcnt 2 r2q 10 default 2 direct_packets_stat 0 // filter parent 42: protocol ip pref 10 cgroup handle 0x1 @@ -251,7 +256,7 @@ private String readState() throws ResourceHandlerException { BatchBuilder builder = new BatchBuilder(PrivilegedOperation. OperationType.TC_READ_STATE) - .readState(); + .readState(device); PrivilegedOperation op = builder.commitBatchToTempFile(); try { @@ -271,20 +276,23 @@ private String readState() throws ResourceHandlerException { } private void wipeState() throws ResourceHandlerException { - BatchBuilder builder = new BatchBuilder(PrivilegedOperation. - OperationType.TC_MODIFY_STATE) - .wipeState(); - PrivilegedOperation op = builder.commitBatchToTempFile(); + for (String device : devices) { + BatchBuilder builder = new BatchBuilder(PrivilegedOperation. + OperationType.TC_MODIFY_STATE) + .wipeState(device); + PrivilegedOperation op = builder.commitBatchToTempFile(); - try { - LOG.info("Wiping tc state."); - privilegedOperationExecutor.executePrivilegedOperation(op, false); - } catch (PrivilegedOperationException e) { - LOG.warn("Failed to wipe tc state. This could happen if the interface" + - " is already in its default state. Ignoring."); - //Ignoring this exception. This could happen if the interface is already - //in its default state. For this reason we don't throw a - //ResourceHandlerException here. + try { + LOG.info("Wiping tc state. device: " + device); + privilegedOperationExecutor.executePrivilegedOperation(op, false); + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to wipe tc state. device: " + device + + " This could happen if the interface" + + " is already in its default state. Ignoring."); + //Ignoring this exception. This could happen if the interface is already + //in its default state. For this reason we don't throw a + //ResourceHandlerException here. + } } } @@ -323,30 +331,38 @@ private void reacquireContainerClasses(String state) { } public Map readStats() throws ResourceHandlerException { - BatchBuilder builder = new BatchBuilder(PrivilegedOperation. - OperationType.TC_READ_STATS) - .readClasses(); - PrivilegedOperation op = builder.commitBatchToTempFile(); + Map classIdAggregatedBytesStats = new HashMap<>(); + for (String device : devices) { + BatchBuilder builder = new BatchBuilder(PrivilegedOperation. + OperationType.TC_READ_STATS) + .readClasses(device); + PrivilegedOperation op = builder.commitBatchToTempFile(); - try { - String output = - privilegedOperationExecutor.executePrivilegedOperation(op, true); - - if (LOG.isDebugEnabled()) { - LOG.debug("TC stats output:" + output); - } + try { + String output = + privilegedOperationExecutor.executePrivilegedOperation(op, true); - Map classIdBytesStats = parseStatsString(output); + if (LOG.isDebugEnabled()) { + LOG.debug("TC stats device: " + device + " output:" + output); + } + Map classIdBytesStats = parseStatsString(output); + if (LOG.isDebugEnabled()) { + LOG.debug("device: " + device + " classId -> bytes sent %n" + classIdBytesStats); + } + for (Integer classId : classIdBytesStats.keySet()) { + Integer bytesStats = classIdBytesStats.get(classId); + if (classIdAggregatedBytesStats.containsKey(classId)) { + bytesStats = bytesStats + classIdAggregatedBytesStats.get(classId); + } + classIdAggregatedBytesStats.put(classId, bytesStats); + } - if (LOG.isDebugEnabled()) { - LOG.debug("classId -> bytes sent %n" + classIdBytesStats); + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to get tc stats"); + throw new ResourceHandlerException("Failed to get tc stats device " + device, e); } - - return classIdBytesStats; - } catch (PrivilegedOperationException e) { - LOG.warn("Failed to get tc stats"); - throw new ResourceHandlerException("Failed to get tc stats", e); } + return classIdAggregatedBytesStats; } private Map parseStatsString(String stats) { @@ -408,7 +424,7 @@ private void reacquireContainerClasses(String state) { * parameters can be supplied - for example, the default 'class' to use for * incoming packets */ - private String getStringForAddRootQDisc() { + private String getStringForAddRootQDisc(String device) { return String.format(FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT, device, ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID); } @@ -417,7 +433,7 @@ private String getStringForAddRootQDisc() { * Returns a formatted string for a filter that matches packets based on the * presence of net_cls classids */ - private String getStringForaAddCGroupFilter() { + private String getStringForaAddCGroupFilter(String device) { return String.format(FORMAT_FILTER_CGROUP_ADD_TO_PARENT, device, ROOT_QDISC_HANDLE); } @@ -479,7 +495,7 @@ public int getClassIdFromFileContents(String input) { /** * Adds a tc class to qdisc at root */ - private String getStringForAddClassToRootQDisc(int rateMbit) { + private String getStringForAddClassToRootQDisc(int rateMbit, String device) { String rateMbitStr = rateMbit + MBIT_SUFFIX; //example : "class add dev eth0 parent 42:0 classid 42:1 htb rate 1000mbit // ceil 1000mbit" @@ -488,7 +504,8 @@ private String getStringForAddClassToRootQDisc(int rateMbit) { rateMbitStr, rateMbitStr); } - private String getStringForAddDefaultClass(int rateMbit, int ceilMbit) { + private String getStringForAddDefaultClass(int rateMbit, int ceilMbit, String + device) { String rateMbitStr = rateMbit + MBIT_SUFFIX; String ceilMbitStr = ceilMbit + MBIT_SUFFIX; //example : "class add dev eth0 parent 42:1 classid 42:2 htb rate 300mbit @@ -498,7 +515,8 @@ private String getStringForAddDefaultClass(int rateMbit, int ceilMbit) { rateMbitStr, ceilMbitStr); } - private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit) { + private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit, String + device) { String rateMbitStr = rateMbit + MBIT_SUFFIX; String ceilMbitStr = ceilMbit + MBIT_SUFFIX; //example : "class add dev eth0 parent 42:1 classid 42:3 htb rate 700mbit @@ -509,7 +527,7 @@ private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit) { } private String getStringForAddContainerClass(int classId, int rateMbit, int - ceilMbit) { + ceilMbit, String device) { String rateMbitStr = rateMbit + MBIT_SUFFIX; String ceilMbitStr = ceilMbit + MBIT_SUFFIX; //example : "class add dev eth0 parent 42:99 classid 42:99 htb rate 50mbit @@ -519,21 +537,21 @@ private String getStringForAddContainerClass(int classId, int rateMbit, int rateMbitStr, ceilMbitStr); } - private String getStringForDeleteContainerClass(int classId) { + private String getStringForDeleteContainerClass(int classId, String device) { //example "class del dev eth0 classid 42:7" return String.format(FORMAT_DELETE_CLASS, device, ROOT_QDISC_HANDLE, classId); } - private String getStringForReadState() { + private String getStringForReadState(String device) { return String.format(FORMAT_READ_STATE, device); } - private String getStringForReadClasses() { + private String getStringForReadClasses(String device) { return String.format(FORMAT_READ_CLASSES, device); } - private String getStringForWipeState() { + private String getStringForWipeState(String device) { return String.format(FORMAT_WIPE_STATE, device); } @@ -556,28 +574,28 @@ public BatchBuilder(PrivilegedOperation.OperationType opType) } } - private BatchBuilder addRootQDisc() { - commands.add(getStringForAddRootQDisc()); + private BatchBuilder addRootQDisc(String device) { + commands.add(getStringForAddRootQDisc(device)); return this; } - private BatchBuilder addCGroupFilter() { - commands.add(getStringForaAddCGroupFilter()); + private BatchBuilder addCGroupFilter(String device) { + commands.add(getStringForaAddCGroupFilter(device)); return this; } - private BatchBuilder addClassToRootQDisc(int rateMbit) { - commands.add(getStringForAddClassToRootQDisc(rateMbit)); + private BatchBuilder addClassToRootQDisc(int rateMbit, String device) { + commands.add(getStringForAddClassToRootQDisc(rateMbit, device)); return this; } - private BatchBuilder addDefaultClass(int rateMbit, int ceilMbit) { - commands.add(getStringForAddDefaultClass(rateMbit, ceilMbit)); + private BatchBuilder addDefaultClass(int rateMbit, int ceilMbit, String device) { + commands.add(getStringForAddDefaultClass(rateMbit, ceilMbit, device)); return this; } - private BatchBuilder addYARNRootClass(int rateMbit, int ceilMbit) { - commands.add(getStringForAddYARNRootClass(rateMbit, ceilMbit)); + private BatchBuilder addYARNRootClass(int rateMbit, int ceilMbit, String device) { + commands.add(getStringForAddYARNRootClass(rateMbit, ceilMbit, device)); return this; } @@ -591,17 +609,21 @@ public BatchBuilder addContainerClass(int classId, int rateMbit, boolean ceilMbit = yarnBandwidthMbit; } - commands.add(getStringForAddContainerClass(classId, rateMbit, ceilMbit)); + for (String device : devices) { + commands.add(getStringForAddContainerClass(classId, rateMbit, ceilMbit, device)); + } return this; } public BatchBuilder deleteContainerClass(int classId) { - commands.add(getStringForDeleteContainerClass(classId)); + for (String device : devices) { + commands.add(getStringForDeleteContainerClass(classId, device)); + } return this; } - private BatchBuilder readState() { - commands.add(getStringForReadState()); + private BatchBuilder readState(String device) { + commands.add(getStringForReadState(device)); return this; } @@ -609,16 +631,16 @@ private BatchBuilder readState() { //when reading stats for all these classes. Stats are fetched using a //different tc cli option (-s). - private BatchBuilder readClasses() { + private BatchBuilder readClasses(String device) { //We'll read all classes, but use a different tc operation type //for reading stats for all these classes. Stats are fetched using a //different tc cli option (-s). - commands.add(getStringForReadClasses()); + commands.add(getStringForReadClasses(device)); return this; } - private BatchBuilder wipeState() { - commands.add(getStringForWipeState()); + private BatchBuilder wipeState(String device) { + commands.add(getStringForWipeState(device)); return this; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java index c5236635fd4..690d53d88f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -81,6 +82,15 @@ "class add dev eth0 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit"; private static final String FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE = "class del dev eth0 classid 42:%d"; + private static final String MULTIPLE_DEVICES = "eth0,eth1"; + private static final String[] FORMAT_ADD_CONTAINER_CLASS_TO_MULTIPLE_DEVICES = { + "class add dev eth0 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit", + "class add dev eth1 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit" + }; + private static final String[] FORAMT_DELETE_CONTAINER_CLASS_FROM_MULTIPLE_DEVICES = { + "class del dev eth0 classid 42:%d", + "class del dev eth1 classid 42:%d" + }; private static final int TEST_CLASS_ID = 97; //decimal form of 0x00420097 - when reading a classid file, it is read out @@ -321,6 +331,71 @@ public void testContainerOperations() { } } + @Test + public void testContainerOperationsWhenMultipleDevices() { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + + TrafficController trafficController = new TrafficController(conf, + privilegedOperationExecutorMock); + try { + trafficController + .bootstrap(MULTIPLE_DEVICES, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT); + + int classId = trafficController.getNextClassId(); + + Assert.assertTrue(classId >= MIN_CONTAINER_CLASS_ID); + Assert.assertEquals(String.format(FORMAT_CONTAINER_CLASS_STR, classId), + trafficController.getStringForNetClsClassId(classId)); + + //Verify that the operation is setup correctly with strictMode = false + TrafficController.BatchBuilder builder = trafficController. + new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE) + .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, false); + PrivilegedOperation addClassOp = builder.commitBatchToTempFile(); + + List expectedAddClassCmd = new ArrayList<>(); + for (String format : FORMAT_ADD_CONTAINER_CLASS_TO_MULTIPLE_DEVICES) { + expectedAddClassCmd.add(String.format(format, classId, YARN_BANDWIDTH_MBIT)); + } + verifyTrafficControlOperation(addClassOp, + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + expectedAddClassCmd); + + //Verify that the operation is setup correctly with strictMode = true + TrafficController.BatchBuilder strictModeBuilder = trafficController. + new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE) + .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, true); + PrivilegedOperation addClassStrictModeOp = strictModeBuilder + .commitBatchToTempFile(); + + List expectedAddClassStrictModeCmd = new ArrayList<>(); + for (String format : FORMAT_ADD_CONTAINER_CLASS_TO_MULTIPLE_DEVICES) { + expectedAddClassStrictModeCmd.add(String.format(format, classId, CONTAINER_BANDWIDTH_MBIT)); + } + verifyTrafficControlOperation(addClassStrictModeOp, + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + expectedAddClassStrictModeCmd); + + TrafficController.BatchBuilder deleteBuilder = trafficController.new + BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE) + .deleteContainerClass(classId); + PrivilegedOperation deleteClassOp = deleteBuilder.commitBatchToTempFile(); + + List expectedDeleteClassCmd = new ArrayList<>(); + for (String format : FORAMT_DELETE_CONTAINER_CLASS_FROM_MULTIPLE_DEVICES) { + expectedDeleteClassCmd.add(String.format(format, classId)); + } + verifyTrafficControlOperation(deleteClassOp, + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + expectedDeleteClassCmd); + + } catch (ResourceHandlerException | IOException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected exception: " + + e.getClass().getSimpleName()); + } + } + @After public void teardown() { FileUtil.fullyDelete(new File(tmpPath));