diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 0eeae19ab8d..cf886afc065 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapred; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.bytes; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; @@ -855,8 +857,6 @@ public ChannelPipeline getPipeline() throws Exception { private static final int ALLOWED_CONCURRENCY = 16; private final Configuration conf; private final IndexCache indexCache; - private final LocalDirAllocator lDirAlloc = - new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); private int port; private final LoadingCache pathCache = CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, @@ -889,10 +889,10 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception { String base = getBaseLocation(key.jobId, key.user); String attemptBase = base + key.attemptId; - Path indexFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + INDEX_FILE_NAME, conf); - Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + DATA_FILE_NAME, conf); + Path indexFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME); + Path mapOutputFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME); if (LOG.isDebugEnabled()) { LOG.debug("Loaded : " + key + " via loader"); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 849ce1a1563..90ea365c027 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import static org.junit.Assert.assertTrue; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; @@ -101,6 +102,7 @@ private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler { + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); @Override protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @@ -140,11 +142,83 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, } }; } + + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } } - private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler { - boolean socketKeepAlive = false; + class TestShuffleHandler1 extends ShuffleHandler { + + protected File absLogDir; + public TestShuffleHandler1() { + absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + } + + public File getAbsLogDir() { + return absLogDir; + } + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler(); + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } + } + + private class TestAuxiliaryLocalPathHandler implements AuxiliaryLocalPathHandler { + @Override + public Path getLocalPathForRead(String path) throws IOException { + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + + return new Path(absLogDir.getAbsolutePath(), path); + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + + return new Path(absLogDir.getAbsolutePath()); + } + + @Override + public Path getLocalPathForWrite(String path, long size) throws IOException { + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + + return new Path(absLogDir.getAbsolutePath()); + } + } + + private class TestAuxiliaryLocalPathHandler1 implements AuxiliaryLocalPathHandler { + @Override + public Path getLocalPathForRead(String path) throws IOException { + throw new IOException("Path not found!"); + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + return new Path(absLogDir.getAbsolutePath()); + } + + @Override + public Path getLocalPathForWrite(String path, long size) throws IOException { + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + + return new Path(absLogDir.getAbsolutePath()); + } + } + + private class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler { + boolean socketKeepAlive = false; + AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler1(); @Override protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @@ -161,6 +235,10 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, protected boolean isSocketKeepAlive() { return socketKeepAlive; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } } /** @@ -801,10 +879,12 @@ public void testRecovery() throws IOException { final File tmpDir = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName()); + ShuffleHandler shuffle = new TestShuffleHandler1(); Configuration conf = new Configuration(); - conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); - conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); - ShuffleHandler shuffle = new ShuffleHandler(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY,0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS,3); + conf.set(YarnConfiguration.NM_LOCAL_DIRS,( + (TestShuffleHandler1)shuffle).getAbsLogDir().getAbsolutePath()); // emulate aux services startup with recovery enabled shuffle.setRecoveryPath(new Path(tmpDir.toString())); tmpDir.mkdirs(); @@ -829,7 +909,7 @@ public void testRecovery() throws IOException { // emulate shuffle handler restart shuffle.close(); - shuffle = new ShuffleHandler(); + shuffle = new TestShuffleHandler1(); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -871,7 +951,9 @@ public void testRecoveryFromOtherVersions() throws IOException { Configuration conf = new Configuration(); conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); - ShuffleHandler shuffle = new ShuffleHandler(); + ShuffleHandler shuffle = new TestShuffleHandler1(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS,( + (TestShuffleHandler1)shuffle).getAbsLogDir().getAbsolutePath()); // emulate aux services startup with recovery enabled shuffle.setRecoveryPath(new Path(tmpDir.toString())); tmpDir.mkdirs(); @@ -896,7 +978,7 @@ public void testRecoveryFromOtherVersions() throws IOException { // emulate shuffle handler restart shuffle.close(); - shuffle = new ShuffleHandler(); + shuffle = new TestShuffleHandler1(); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -913,7 +995,7 @@ public void testRecoveryFromOtherVersions() throws IOException { shuffle.storeVersion(version11); Assert.assertEquals(version11, shuffle.loadVersion()); shuffle.close(); - shuffle = new ShuffleHandler(); + shuffle = new TestShuffleHandler1(); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); shuffle.start(); @@ -929,7 +1011,7 @@ public void testRecoveryFromOtherVersions() throws IOException { shuffle.storeVersion(version21); Assert.assertEquals(version21, shuffle.loadVersion()); shuffle.close(); - shuffle = new ShuffleHandler(); + shuffle = new TestShuffleHandler1(); shuffle.setRecoveryPath(new Path(tmpDir.toString())); shuffle.init(conf); @@ -990,6 +1072,28 @@ public void testGetMapOutputInfo() throws Exception { createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap); ShuffleHandler shuffleHandler = new ShuffleHandler() { + AuxiliaryLocalPathHandler pathHandler = new AuxiliaryLocalPathHandler() { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return new Path(absLogDir.getAbsolutePath(),path); + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + + return new Path(absLogDir.getAbsolutePath()); + } + + @Override + public Path getLocalPathForWrite(String path, long size) throws IOException { + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + + return new Path(absLogDir.getAbsolutePath()); + } + }; @Override protected Shuffle getShuffle(Configuration conf) { // replace the shuffle handler with one stubbed for testing @@ -1031,6 +1135,10 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, } }; } + @Override + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return pathHandler; + } }; shuffleHandler.init(conf); try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AuxiliaryLocalPathHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AuxiliaryLocalPathHandler.java new file mode 100644 index 00000000000..9ea4b690f9e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AuxiliaryLocalPathHandler.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public interface AuxiliaryLocalPathHandler { + public Path getLocalPathForRead(String path) throws IOException; + public Path getLocalPathForWrite(String path) throws IOException; + public Path getLocalPathForWrite(String path, long size) throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java index 58b1d4a61a3..490f5415cef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -40,6 +41,7 @@ public abstract class AuxiliaryService extends AbstractService { private Path recoveryPath = null; + private AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; protected AuxiliaryService(String name) { super(name); @@ -123,4 +125,10 @@ public void stopContainer(ContainerTerminationContext stopContainerContext) { public void setRecoveryPath(Path recoveryPath) { this.recoveryPath = recoveryPath; } + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return this.auxiliaryLocalPathHandler; + } + public void setAuxiliaryLocalPathHandler(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 5e0f2936f83..839402e5083 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -36,6 +36,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; @@ -56,6 +57,7 @@ protected final Map serviceMap; protected final Map serviceMetaData; + protected AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); @@ -154,6 +156,7 @@ public void serviceInit(Configuration conf) throws Exception { +"Service Meta Data may have issues unless the refer to " +"the name in the config."); } + s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); addService(sName, s); if (recoveryEnabled) { Path storePath = new Path(stateStoreRoot, sName); @@ -285,4 +288,13 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } + + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return auxiliaryLocalPathHandler; + } + + public void setAuxiliaryLocalPathHandler(AuxiliaryLocalPathHandler + auxiliaryLocalPathHandler) { + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 38eb636f746..891691eb9ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; @@ -38,6 +39,7 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; @@ -217,6 +219,7 @@ private AMRMProxyService amrmProxyService; protected boolean amrmProxyEnabled = false; private final ContainerScheduler containerScheduler; + protected AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; private long waitForContainersOnShutdownMillis; @@ -249,6 +252,8 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, // Start configurable services auxiliaryServices = new AuxServices(); + auxiliaryLocalPathHandler = new AuxiliaryLocalPathHandlerImpl(); + auxiliaryServices.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); @@ -1514,6 +1519,19 @@ public void handle(LocalizationEvent event) { } } + public class AuxiliaryLocalPathHandlerImpl implements AuxiliaryLocalPathHandler { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return dirsHandler.getLocalPathForRead(path); + } + public Path getLocalPathForWrite(String path) throws IOException { + return dirsHandler.getLocalPathForWrite(path); + } + public Path getLocalPathForWrite(String path, long size) throws IOException { + return dirsHandler.getLocalPathForWrite(path, size, false); + } + } + @SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index fc9e6c417ce..f05068ed2f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -190,6 +190,7 @@ public void setup() throws IOException { conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10)); + conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 1000); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); @@ -207,6 +208,7 @@ public void setup() throws IOException { nodeStatusUpdater.init(conf); containerManager.init(conf); nodeStatusUpdater.start(); + dirsHandler.start(); ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); ((NMContext)context).setContainerStateTransitionListener( new NodeManager.DefaultContainerStateListener()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index bfe87c690d2..dbe6aa8ac2c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.service.Service.STATE.INITED; import static org.apache.hadoop.service.Service.STATE.STARTED; import static org.apache.hadoop.service.Service.STATE.STOPPED; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -343,8 +344,26 @@ public void testAuxServices() { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); final AuxServices aux = new AuxServices(); - aux.init(conf); + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = + new AuxiliaryLocalPathHandler() { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return null; + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + return null; + } + @Override + public Path getLocalPathForWrite(String path, long size) + throws IOException { + return null; + } + }; + aux.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); + aux.init(conf); int latch = 1; for (Service s : aux.getServices()) { assertEquals(INITED, s.getServiceState()); @@ -356,6 +375,8 @@ public void testAuxServices() { aux.start(); for (Service s : aux.getServices()) { assertEquals(STARTED, s.getServiceState()); + assertEquals(((AuxiliaryService)s).getAuxiliaryLocalPathHandler(), + auxiliaryLocalPathHandler); } aux.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 6e8c005d62f..7dfef5c862b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; @@ -41,6 +42,7 @@ import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; @@ -313,6 +315,40 @@ public void testContainerSetup() throws Exception { Assert.assertEquals(null, reader.readLine()); } + @Test (timeout = 10000L) + public void testContainerAuxPathHandler() throws Exception { + + containerManager.start(); + + // ////// Create the resources for the container + String basePath = containerManager.dirsHandler.getLocalDirsForRead().get(0); + File newFile = new File(basePath, "test.txt"); + File baseDir = new File(basePath); + newFile.createNewFile(); + baseDir.setWritable(false); + baseDir.setReadable(false); + int count = 0; + while ( containerManager.dirsHandler.getLocalDirsForRead().size() != 0 + && count < 3) { + Thread.currentThread().sleep(conf.getLong( + YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 1000)); + count++; + } + Assert.assertEquals(0, + containerManager.dirsHandler.getLocalDirsForRead().size()); + try { + containerManager.auxiliaryLocalPathHandler.getLocalPathForRead( + "test.txt"); + fail("The call to getLocalPathForRead should fail with an IOException"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("Could not find")); + } finally { + baseDir.setWritable(true); + baseDir.setReadable(true); + FileUtil.fullyDelete(baseDir); + } + } + //@Test public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException {