diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 863da7efde9..c2ac7d1e1d6 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapred; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.bytes; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; @@ -854,8 +856,6 @@ public ChannelPipeline getPipeline() throws Exception { private static final int ALLOWED_CONCURRENCY = 16; private final Configuration conf; private final IndexCache indexCache; - private final LocalDirAllocator lDirAlloc = - new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); private int port; private final LoadingCache pathCache = CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, @@ -888,10 +888,10 @@ public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception { String base = getBaseLocation(key.jobId, key.user); String attemptBase = base + key.attemptId; - Path indexFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + INDEX_FILE_NAME, conf); - Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - attemptBase + "/" + DATA_FILE_NAME, conf); + Path indexFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME); + Path mapOutputFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME); if (LOG.isDebugEnabled()) { LOG.debug("Loaded : " + key + " via loader"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AuxiliaryLocalPathHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AuxiliaryLocalPathHandler.java new file mode 100644 index 00000000000..9ea4b690f9e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AuxiliaryLocalPathHandler.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public interface AuxiliaryLocalPathHandler { + public Path getLocalPathForRead(String path) throws IOException; + public Path getLocalPathForWrite(String path) throws IOException; + public Path getLocalPathForWrite(String path, long size) throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java index 58b1d4a61a3..490f5415cef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/AuxiliaryService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -40,6 +41,7 @@ public abstract class AuxiliaryService extends AbstractService { private Path recoveryPath = null; + private AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; protected AuxiliaryService(String name) { super(name); @@ -123,4 +125,10 @@ public void stopContainer(ContainerTerminationContext stopContainerContext) { public void setRecoveryPath(Path recoveryPath) { this.recoveryPath = recoveryPath; } + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return this.auxiliaryLocalPathHandler; + } + public void setAuxiliaryLocalPathHandler(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) { + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AuxiliaryLocalPathHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AuxiliaryLocalPathHandler.java new file mode 100644 index 00000000000..03ac1b2f5ae --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/AuxiliaryLocalPathHandler.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +/** + * Created by kshukla on 9/29/17. + */ +public class AuxiliaryLocalPathHandler { +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 5e0f2936f83..3e145692738 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.regex.Pattern; + +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +58,7 @@ protected final Map serviceMap; protected final Map serviceMetaData; + protected AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$"); @@ -154,6 +157,7 @@ public void serviceInit(Configuration conf) throws Exception { +"Service Meta Data may have issues unless the refer to " +"the name in the config."); } + s.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); addService(sName, s); if (recoveryEnabled) { Path storePath = new Path(stateStoreRoot, sName); @@ -285,4 +289,13 @@ private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service, : "The auxService name is " + service.getName()) + " and it got an error at event: " + eventType, th); } + + public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() { + return auxiliaryLocalPathHandler; + } + + public void setAuxiliaryLocalPathHandler(AuxiliaryLocalPathHandler + auxiliaryLocalPathHandler) { + this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e497f62264b..649867143e7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -20,6 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -217,6 +219,7 @@ private AMRMProxyService amrmProxyService; protected boolean amrmProxyEnabled = false; private final ContainerScheduler containerScheduler; + protected AuxiliaryLocalPathHandler auxiliaryLocalPathHandler; private long waitForContainersOnShutdownMillis; @@ -249,6 +252,8 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, // Start configurable services auxiliaryServices = new AuxServices(); + auxiliaryLocalPathHandler = new AuxiliaryLocalPathHandlerImpl(); + auxiliaryServices.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); auxiliaryServices.registerServiceListener(this); addService(auxiliaryServices); @@ -1537,6 +1542,19 @@ public void handle(LocalizationEvent event) { } } + public class AuxiliaryLocalPathHandlerImpl implements AuxiliaryLocalPathHandler { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return dirsHandler.getLocalPathForRead(path); + } + public Path getLocalPathForWrite(String path) throws IOException { + return dirsHandler.getLocalPathForWrite(path); + } + public Path getLocalPathForWrite(String path, long size) throws IOException { + return dirsHandler.getLocalPathForWrite(path, size, false); + } + } + @SuppressWarnings("unchecked") @Override public void handle(ContainerManagerEvent event) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 3cafcbda0a5..b478eb91e70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -190,6 +190,7 @@ public void setup() throws IOException { conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10)); + conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 1000); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); @@ -207,6 +208,7 @@ public void setup() throws IOException { nodeStatusUpdater.init(conf); containerManager.init(conf); nodeStatusUpdater.start(); + dirsHandler.start(); ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index bfe87c690d2..dbe6aa8ac2c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.service.Service.STATE.INITED; import static org.apache.hadoop.service.Service.STATE.STARTED; import static org.apache.hadoop.service.Service.STATE.STOPPED; +import org.apache.hadoop.yarn.api.AuxiliaryLocalPathHandler; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -343,8 +344,26 @@ public void testAuxServices() { conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"), ServiceB.class, Service.class); final AuxServices aux = new AuxServices(); - aux.init(conf); + AuxiliaryLocalPathHandler auxiliaryLocalPathHandler = + new AuxiliaryLocalPathHandler() { + @Override + public Path getLocalPathForRead(String path) throws IOException { + return null; + } + + @Override + public Path getLocalPathForWrite(String path) throws IOException { + return null; + } + @Override + public Path getLocalPathForWrite(String path, long size) + throws IOException { + return null; + } + }; + aux.setAuxiliaryLocalPathHandler(auxiliaryLocalPathHandler); + aux.init(conf); int latch = 1; for (Service s : aux.getServices()) { assertEquals(INITED, s.getServiceState()); @@ -356,6 +375,8 @@ public void testAuxServices() { aux.start(); for (Service s : aux.getServices()) { assertEquals(STARTED, s.getServiceState()); + assertEquals(((AuxiliaryService)s).getAuxiliaryLocalPathHandler(), + auxiliaryLocalPathHandler); } aux.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 6eea77b5a12..7f0e8226ab7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; @@ -41,6 +42,7 @@ import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; @@ -284,6 +286,40 @@ public void testContainerSetup() throws Exception { Assert.assertEquals(null, reader.readLine()); } + @Test (timeout = 10000L) + public void testContainerAuxPathHandler() throws Exception { + + containerManager.start(); + + // ////// Create the resources for the container + String basePath = containerManager.dirsHandler.getLocalDirsForRead().get(0); + File newFile = new File(basePath, "test.txt"); + File baseDir = new File(basePath); + newFile.createNewFile(); + baseDir.setWritable(false); + baseDir.setReadable(false); + int count = 0; + while ( containerManager.dirsHandler.getLocalDirsForRead().size() != 0 + && count < 3) { + Thread.currentThread().sleep(conf.getLong( + YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 1000)); + count++; + } + Assert.assertEquals(0, + containerManager.dirsHandler.getLocalDirsForRead().size()); + try { + containerManager.auxiliaryLocalPathHandler.getLocalPathForRead( + "test.txt"); + fail("The call to getLocalPathForRead should fail with an IOException"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("Could not find")); + } finally { + baseDir.setWritable(true); + baseDir.setReadable(true); + FileUtil.fullyDelete(baseDir); + } + } + //@Test public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException {