From dd59d6030561846ad230182d222dab02ddf6df7e Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Sat, 2 May 2015 18:44:23 -0700 Subject: [PATCH] Abstraction for local disk path allocation. LocalDirAllocator is used to do a round robin based disk allocation for paths that need to be created on the local disk. There is a need to inject a different implementation for some use cases. So an interface called LocalDiskPathAllocator is provided. The actual implementation can be retrieved using LocalDiskUtil. DFSLocalDirAllocator: An implementation based on accessing node local paths using DFS. Testing: Unit test created - TestDFSLocalDirAllocator. --- .../org/apache/hadoop/fs/DFSLocalDirAllocator.java | 106 +++++++++++++++++++ .../org/apache/hadoop/fs/LocalDirAllocator.java | 8 +- .../apache/hadoop/fs/LocalDiskPathAllocator.java | 115 +++++++++++++++++++++ .../java/org/apache/hadoop/fs/LocalDiskUtil.java | 104 +++++++++++++++++++ .../src/main/java/org/apache/hadoop/net/DNS.java | 4 +- .../apache/hadoop/fs/TestDFSLocalDirAllocator.java | 75 ++++++++++++++ 6 files changed, 409 insertions(+), 3 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DFSLocalDirAllocator.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskPathAllocator.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskUtil.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDFSLocalDirAllocator.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DFSLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DFSLocalDirAllocator.java new file mode 100644 index 0000000..4ebe66a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DFSLocalDirAllocator.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; + +/** + * Local disk path allocator for each node on DFS. + * In order to identify each node on the DFS namespace, its hostname will be + * used as part of the directory path. The assumption is that the DFS will be + * able to create this directory on the local disk of the node specified by + * the hostname. + */ +public class DFSLocalDirAllocator implements LocalDiskPathAllocator { + private static final String NODE_LOCAL_DIR_PREFIX = "local.dir.prefix"; + private static final String NODE_LOCAL_DIR_SUFFIX = "local.dir.suffix"; + private static final String DEFAULT_NODE_LOCAL_DIR_PREFIX = "/tmp"; + + /** + * Absolute path of the local directory designated for each node. + */ + private final Path nodeLocalDir; + + public DFSLocalDirAllocator(String hostname, String contextCfgItemName, + Configuration conf) { + + String nodeLocalDirPrefix = conf.get(NODE_LOCAL_DIR_PREFIX, + DEFAULT_NODE_LOCAL_DIR_PREFIX); + + String nodeLocalDirSuffix = conf.get(NODE_LOCAL_DIR_SUFFIX); + StringBuilder sb = new StringBuilder(); + sb.append(nodeLocalDirPrefix).append(Path.SEPARATOR) + .append(hostname); + + if (nodeLocalDirSuffix != null) { + sb.append(Path.SEPARATOR).append(nodeLocalDirSuffix); + } + + this.nodeLocalDir = new Path(sb.toString()); + } + + private Path getPath(String pathStr) { + return new Path(nodeLocalDir, pathStr); + } + + public Path getLocalPathForWrite(String pathStr, + Configuration conf) throws IOException { + + return getPath(pathStr); + } + + public Path getLocalPathForWrite(String pathStr, long size, + Configuration conf) throws IOException { + + return getPath(pathStr); + } + + public Path getLocalPathForWrite(String pathStr, long size, + Configuration conf, + boolean checkWrite) throws IOException { + + return getPath(pathStr); + } + + public Path getLocalPathToRead(String pathStr, + Configuration conf) throws IOException { + + return getPath(pathStr); + } + + public Iterable getAllLocalPathsToRead(String pathStr, + Configuration conf) throws IOException { + + return Arrays.asList(getPath(pathStr)); + } + + public boolean ifExists(String pathStr, Configuration conf) + throws IOException { + + FileSystem fs = FileSystem.get(conf); + return fs.exists(getPath(pathStr)); + } + + public String[] getLocalDirs(Configuration conf) { + return new String[] {nodeLocalDir.toString()}; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index 6fb34e3..9c2b917 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -64,8 +64,8 @@ */ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Unstable -public class LocalDirAllocator { - +public class LocalDirAllocator implements LocalDiskPathAllocator { + //A Map from the config item names like "mapred.local.dir" //to the instance of the AllocatorPerContext. This //is a static object to make sure there exists exactly one instance per JVM @@ -546,4 +546,8 @@ public synchronized boolean ifExists(String pathStr,Configuration conf) { return false; } } + + public String[] getLocalDirs(Configuration conf) { + return conf.getTrimmedStrings(contextCfgItemName); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskPathAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskPathAllocator.java new file mode 100644 index 0000000..49b966d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskPathAllocator.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * Interface to get file/directory paths from the local disk namespace. + * Modules that need to get a handle to the implementation should do so using + * {@link org.apache.hadoop.fs.LocalDiskUtil}. + * + * This abstraction is provided for 2 reasons: + *
+ * 1. There can be more than one algorithm to determine how to allocate local
+ * disks.
+ *
+ * 2. A distributed file system that supports full POFSIX read/write can choose
+ * to expose local disks using the HDFS interface itself and thereby have a way
+ * to create node local paths inside the DFS. For e.g., the local dir for node1
+ * and node2 on DFS can be:
+ *
+ * /path/to/local/dir/node1_fqdn/
+ * /path/to/local/dir/node2_fqdn/
+ *
+ * where these directories actually reside on node1 and node2 respectively.
+ * 
+ */ +public interface LocalDiskPathAllocator { + /** Get a path from the local disk. This method should be used if the size of + * the file is not known apriori. + * @param pathStr the requested path + * @param conf the Configuration object + * @return the complete path to the file on a local disk + * @throws IOException + */ + Path getLocalPathForWrite(String pathStr, + Configuration conf) throws IOException; + + /** Get a path from the local disk. Pass size as + * SIZE_UNKNOWN if not known apriori. + * @param pathStr the requested path + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @return the complete path to the file on a local disk + * @throws IOException + */ + Path getLocalPathForWrite(String pathStr, long size, + Configuration conf) throws IOException; + + /** Get a path from the local disk. Pass size as + * SIZE_UNKNOWN if not known apriori. + * @param pathStr the requested path + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @param checkWrite ensure that the path is writable + * @return the complete path to the file on a local disk + * @throws IOException + */ + Path getLocalPathForWrite(String pathStr, long size, + Configuration conf, + boolean checkWrite) throws IOException; + + /** Get a path from the local disk for reading. + * @param pathStr the requested file (this will be searched) + * @param conf the Configuration object + * @return the complete path to the file on a local disk + * @throws IOException + */ + Path getLocalPathToRead(String pathStr, + Configuration conf) throws IOException; + + /** + * Get all of the paths that currently exist in the working directories. + * @param pathStr the path underneath the roots + * @param conf the configuration to look up the roots in + * @return all of the paths that exist under any of the roots + * @throws IOException + */ + Iterable getAllLocalPathsToRead(String pathStr, + Configuration conf + ) throws IOException; + + /** We search through all the configured dirs for the file's existence + * and return true when we find one. + * @param pathStr the requested file (this will be searched) + * @param conf the Configuration object + * @return true if files exist. false otherwise + * @throws IOException + */ + boolean ifExists(String pathStr, Configuration conf) + throws IOException; + + /** + * Returns the local directories used to construct the paths. + */ + String[] getLocalDirs(Configuration conf); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskUtil.java new file mode 100644 index 0000000..624eaa9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDiskUtil.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.DNS; + +/** + * Helper class to get a handle to LocalDiskPathAllocator and the FileSystem + * that is used to manage those paths. + */ +public final class LocalDiskUtil { + private LocalDiskUtil() { + } + + /** + * Determines if a Distributed File System should be used to manage the + * allocated local disk paths. This defaults to false. + */ + private static final String USE_DFS_LOCAL_DISK_PATH_ALLOCATOR = + "local.disk.path.allocator.use-dfs"; + + /** + * Property that determines the custom class for local disk path allocation. + */ + private static final String LOCAL_DISK_PATH_ALLOCATOR_CLASS = + "local.disk.path.allocator.class"; + + /** + * Returns the local disk path allocator for given host. + */ + public static LocalDiskPathAllocator getPathAllocator(String hostname, + Configuration conf, String ctxName) { + + String customClass = conf.get(LOCAL_DISK_PATH_ALLOCATOR_CLASS); + if (customClass == null) { + if (isManagedByDFS(conf)) { + return new DFSLocalDirAllocator(hostname, ctxName, conf); + } else { + return new LocalDirAllocator(ctxName); + } + } else { + Class clazz = conf.getClass( + LOCAL_DISK_PATH_ALLOCATOR_CLASS, + null, + LocalDiskPathAllocator.class); + + try { + return clazz.getDeclaredConstructor(String.class, String.class, + Configuration.class) + .newInstance(hostname, ctxName, conf); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Returns the local disk path allocator for current host. + */ + public static LocalDiskPathAllocator getPathAllocator(Configuration conf, + String ctxName) { + + String hostname = DNS.resolveLocalHostname(); + return getPathAllocator(hostname, conf, ctxName); + } + + /** + * Returns the file system that is used to manage the allocated local disk + * paths. + */ + public static FileSystem getFileSystem(Configuration conf) + throws IOException { + + return isManagedByDFS(conf) + ? FileSystem.get(conf) + : FileSystem.getLocal(conf); + } + + /** + * Determines if the local disk path allocation is managed by DFS. + */ + public static boolean isManagedByDFS(Configuration conf) { + return conf.getBoolean(USE_DFS_LOCAL_DISK_PATH_ALLOCATOR, false); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java index f19e802..16c76b9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/DNS.java @@ -258,11 +258,13 @@ public static String getDefaultIP(String strInterface) * If we cannot determine our host name, return "localhost" * @return the local hostname or "localhost" */ - private static String resolveLocalHostname() { + public static String resolveLocalHostname() { String localhost; try { localhost = InetAddress.getLocalHost().getCanonicalHostName(); + System.out.println("hehe : " + localhost); } catch (UnknownHostException e) { + System.out.println("hehe failed"); LOG.warn("Unable to determine local hostname " + "-falling back to \"" + LOCALHOST + "\"", e); localhost = LOCALHOST; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDFSLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDFSLocalDirAllocator.java new file mode 100644 index 0000000..94463c6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDFSLocalDirAllocator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.DNS; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestDFSLocalDirAllocator { + private static final boolean isWindows = + System.getProperty("os.name").startsWith("Windows"); + + private void run(Configuration conf, String baseDir, + String pathStr) throws Exception { + + if (isWindows) return; + + String hostname = DNS.resolveLocalHostname(); + String ctxName = ""; + DFSLocalDirAllocator allocator = new DFSLocalDirAllocator(hostname, + ctxName, conf); + + assertEquals(new Path(baseDir + "/" + pathStr), + allocator.getLocalPathForWrite(pathStr, conf)); + + String[] dirs = allocator.getLocalDirs(conf); + assertEquals(1, dirs.length); + assertEquals(baseDir, dirs[0]); + } + + @Test + public void testGetDFSLocalPathWithDefaultConf() throws Exception { + String hostname = DNS.resolveLocalHostname(); + Configuration conf = new Configuration(); + String baseDir = "/tmp/" + hostname; + String pathStr = "a/b"; + + run(conf, baseDir, pathStr); + } + + @Test + public void testGettDFSLocalPath() throws Exception { + String hostname = DNS.resolveLocalHostname(); + + String prefix = "/my/prefix"; + String suffix = "my/suffix"; + + Configuration conf = new Configuration(); + conf.set("local.dir.prefix", prefix); + conf.set("local.dir.suffix", suffix); + + String baseDir = prefix + "/" + hostname + "/" + suffix; + String pathStr = "a/b"; + + run(conf, baseDir, pathStr); + } +} -- 2.1.2