From 9a0741aef44bb11a3c2bd70c276c1d17434c1ce7 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 18 Jan 2019 14:53:12 -0800 Subject: [PATCH] HBASE-21735 Port HBASE-18784 (Use of filesystem that requires hflush / hsync / append / etc should query outputstream capabilities) to branch-1 HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync. * pull things that don't rely on HDFS in hbase-server/FSUtils into hbase-common/CommonFSUtils * refactor setStoragePolicy so that it can move into hbase-common/CommonFSUtils * refactor WALProcedureStore so that it handles its own FS interactions * add a reflection-based lookup of stream capabilities * call said lookup in places where we make WALs to make sure hflush/hsync is available. --- .../hadoop/hbase/util/CommonFSUtils.java | 908 ++++++++++++++++++ .../hadoop/hbase/util/TestCommonFSUtils.java | 164 ++++ .../store/wal/WALProcedureStore.java | 31 +- .../procedure2/ProcedureTestingUtility.java | 12 +- .../hbase/procedure2/TestChildProcedures.java | 2 +- .../procedure2/TestProcedureExecution.java | 2 +- .../hbase/procedure2/TestProcedureNonce.java | 2 +- .../procedure2/TestProcedureRecovery.java | 2 +- .../procedure2/TestProcedureReplayOrder.java | 2 +- .../hbase/procedure2/TestYieldProcedures.java | 9 +- ...ocedureWALLoaderPerformanceEvaluation.java | 2 +- .../ProcedureWALPerformanceEvaluation.java | 11 +- .../wal/TestStressWALProcedureStore.java | 2 +- .../store/wal/TestWALProcedureStore.java | 7 +- .../apache/hadoop/hbase/fs/HFileSystem.java | 3 - .../apache/hadoop/hbase/master/HMaster.java | 25 +- .../hadoop/hbase/master/MasterFileSystem.java | 3 +- .../procedure/MasterProcedureConstants.java | 3 - .../regionserver/wal/ProtobufLogWriter.java | 8 +- .../org/apache/hadoop/hbase/util/FSUtils.java | 679 +------------ .../hadoop/hbase/fs/TestBlockReorder.java | 3 +- .../TestWALProcedureStoreOnHDFS.java | 2 +- .../hbase/regionserver/wal/TestWALReplay.java | 7 +- .../apache/hadoop/hbase/util/TestFSUtils.java | 238 ++--- .../hadoop/hbase/wal/IOTestProvider.java | 15 +- 25 files changed, 1242 insertions(+), 900 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java new file mode 100644 index 0000000000..179a912b8a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -0,0 +1,908 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Utility methods for interacting with the underlying file system. + */ +@InterfaceAudience.Private +public abstract class CommonFSUtils { + private static final Log LOG = LogFactory.getLog(CommonFSUtils.class); + + /** Parameter name for HBase WAL directory */ + public static final String HBASE_WAL_DIR = "hbase.wal.dir"; + + /** Full access permissions (starting point for a umask) */ + public static final String FULL_RWX_PERMISSIONS = "777"; + + protected CommonFSUtils() { + super(); + } + + /** + * Compare of path component. Does not consider schema; i.e. if schemas + * different but path starts with rootPath, + * then the function returns true + * @param rootPath value to check for + * @param path subject to check + * @return True if path starts with rootPath + */ + public static boolean isStartingWithPath(final Path rootPath, final String path) { + String uriRootPath = rootPath.toUri().getPath(); + String tailUriPath = (new Path(path)).toUri().getPath(); + return tailUriPath.startsWith(uriRootPath); + } + + /** + * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the + * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, + * the two will equate. + * @param pathToSearch Path we will be trying to match against. + * @param pathTail what to match + * @return True if pathTail is tail on the path of pathToSearch + */ + public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { + return isMatchingTail(pathToSearch, new Path(pathTail)); + } + + /** + * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the + * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider + * schema; i.e. if schemas different but path or subpath matches, the two will equate. + * @param pathToSearch Path we will be trying to match agains against + * @param pathTail what to match + * @return True if pathTail is tail on the path of pathToSearch + */ + public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) { + if (pathToSearch.depth() != pathTail.depth()) { + return false; + } + Path tailPath = pathTail; + String tailName; + Path toSearch = pathToSearch; + String toSearchName; + boolean result = false; + do { + tailName = tailPath.getName(); + if (tailName == null || tailName.length() <= 0) { + result = true; + break; + } + toSearchName = toSearch.getName(); + if (toSearchName == null || toSearchName.length() <= 0) { + break; + } + // Move up a parent on each path for next go around. Path doesn't let us go off the end. + tailPath = tailPath.getParent(); + toSearch = toSearch.getParent(); + } while(tailName.equals(toSearchName)); + return result; + } + + /** + * Delete if exists. + * @param fs filesystem object + * @param dir directory to delete + * @return True if deleted dir + * @throws IOException e + */ + public static boolean deleteDirectory(final FileSystem fs, final Path dir) + throws IOException { + return fs.exists(dir) && fs.delete(dir, true); + } + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * + * use reflection to search for getDefaultBlockSize(Path f) + * if the method doesn't exist, fall back to using getDefaultBlockSize() + * + * @param fs filesystem object + * @return the default block size for the path's filesystem + * @throws IOException e + */ + public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultBlockSize", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultBlockSize"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultBlockSize(path); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Long)ret).longValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /* + * Get the default replication. + * + * use reflection to search for getDefaultReplication(Path f) + * if the method doesn't exist, fall back to using getDefaultReplication() + * + * @param fs filesystem object + * @param f path of file + * @return default replication for the path's filesystem + * @throws IOException e + */ + public static short getDefaultReplication(final FileSystem fs, final Path path) + throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultReplication", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultReplication"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultReplication(path); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Number)ret).shortValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /** + * Returns the default buffer size to use during writes. + * + * The size of the buffer should probably be a multiple of hardware + * page size (4096 on Intel x86), and it determines how much data is + * buffered during read and write operations. + * + * @param fs filesystem object + * @return default buffer size to use during writes + */ + public static int getDefaultBufferSize(final FileSystem fs) { + return fs.getConf().getInt("io.file.buffer.size", 4096); + } + + /** + * Create the specified file on the filesystem. By default, this will: + *
    + *
  1. apply the umask in the configuration (if it is enabled)
  2. + *
  3. use the fs configured buffer size (or 4096 if not set)
  4. + *
  5. use the default replication
  6. + *
  7. use the default block size
  8. + *
  9. not track progress
  10. + *
+ * + * @param fs {@link FileSystem} on which to write the file + * @param path {@link Path} to the file to write + * @param perm intial permissions + * @param overwrite Whether or not the created file should be overwritten. + * @return output stream to the created file + * @throws IOException if the file cannot be created + */ + public static FSDataOutputStream create(FileSystem fs, Path path, + FsPermission perm, boolean overwrite) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite); + } + return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), + getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); + } + + /** + * Get the file permissions specified in the configuration, if they are + * enabled. + * + * @param fs filesystem that the file will be created on. + * @param conf configuration to read for determining if permissions are + * enabled and which to use + * @param permssionConfKey property key in the configuration to use when + * finding the permission + * @return the permission to use when creating a new file on the fs. If + * special permissions are not specified in the configuration, then + * the default permissions on the the fs will be returned. + */ + public static FsPermission getFilePermissions(final FileSystem fs, + final Configuration conf, final String permssionConfKey) { + boolean enablePermissions = conf.getBoolean( + HConstants.ENABLE_DATA_FILE_UMASK, false); + + if (enablePermissions) { + try { + FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); + // make sure that we have a mask, if not, go default. + String mask = conf.get(permssionConfKey); + if (mask == null) { + return FsPermission.getFileDefault(); + } + // appy the umask + FsPermission umask = new FsPermission(mask); + return perm.applyUMask(umask); + } catch (IllegalArgumentException e) { + LOG.warn( + "Incorrect umask attempted to be created: " + + conf.get(permssionConfKey) + + ", using default file permissions.", e); + return FsPermission.getFileDefault(); + } + } + return FsPermission.getFileDefault(); + } + + /** + * Verifies root directory path is a valid URI with a scheme + * + * @param root root directory path + * @return Passed root argument. + * @throws IOException if not a valid URI with a scheme + */ + public static Path validateRootPath(Path root) throws IOException { + try { + URI rootURI = new URI(root.toString()); + String scheme = rootURI.getScheme(); + if (scheme == null) { + throw new IOException("Root directory does not have a scheme"); + } + return root; + } catch (URISyntaxException e) { + IOException io = new IOException("Root directory path is not a valid " + + "URI -- check your " + HConstants.HBASE_DIR + " configuration"); + io.initCause(e); + throw io; + } + } + + /** + * Checks for the presence of the WAL log root path (using the provided conf object) in the given + * path. If it exists, this method removes it and returns the String representation of remaining + * relative path. + * @param path must not be null + * @param conf must not be null + * @return String representation of the remaining relative path + * @throws IOException from underlying filesystem + */ + public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { + Path root = getWALRootDir(conf); + String pathStr = path.toString(); + // check that the path is absolute... it has the root path in it. + if (!pathStr.startsWith(root.toString())) { + return pathStr; + } + // if not, return as it is. + return pathStr.substring(root.toString().length() + 1);// remove the "/" too. + } + + /** + * Return the 'path' component of a Path. In Hadoop, Path is an URI. This + * method returns the 'path' component of a Path's URI: e.g. If a Path is + * hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir, + * this method returns /hbase_trunk/TestTable/compaction.dir. + * This method is useful if you want to print out a Path without qualifying + * Filesystem instance. + * @param p Filesystem Path whose 'path' component we are to return. + * @return Path portion of the Filesystem + */ + public static String getPath(Path p) { + return p.toUri().getPath(); + } + + /** + * @param c configuration + * @return {@link Path} to hbase root directory from + * configuration as a qualified Path. + * @throws IOException e + */ + public static Path getRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HConstants.HBASE_DIR)); + FileSystem fs = p.getFileSystem(c); + return p.makeQualified(fs); + } + + public static void setRootDir(final Configuration c, final Path root) throws IOException { + c.set(HConstants.HBASE_DIR, root.toString()); + } + + public static void setFsDefault(final Configuration c, final Path root) throws IOException { + c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ + } + + public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { + Path p = getRootDir(c); + return p.getFileSystem(c); + } + + /** + * @param c configuration + * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from + * configuration as a qualified Path. Defaults to HBase root dir. + * @throws IOException e + */ + public static Path getWALRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); + if (!isValidWALRootDir(p, c)) { + return getRootDir(c); + } + FileSystem fs = p.getFileSystem(c); + return p.makeQualified(fs); + } + + @VisibleForTesting + public static void setWALRootDir(final Configuration c, final Path root) throws IOException { + c.set(HBASE_WAL_DIR, root.toString()); + } + + public static FileSystem getWALFileSystem(final Configuration c) throws IOException { + Path p = getWALRootDir(c); + return p.getFileSystem(c); + } + + private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { + Path rootDir = getRootDir(c); + if (walDir != rootDir) { + if (walDir.toString().startsWith(rootDir.toString() + "/")) { + throw new IllegalStateException("Illegal WAL directory specified. " + + "WAL directories are not permitted to be under the root directory if set."); + } + } + return true; + } + + /** + * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under + * path rootdir + * + * @param rootdir qualified path of HBase root directory + * @param tableName name of table + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static Path getTableDir(Path rootdir, final TableName tableName) { + return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), + tableName.getQualifierAsString()); + } + + /** + * Returns the {@link org.apache.hadoop.hbase.TableName} object representing + * the table directory under + * path rootdir + * + * @param tablePath path of table + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static TableName getTableName(Path tablePath) { + return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); + } + + /** + * Returns the {@link org.apache.hadoop.fs.Path} object representing + * the namespace directory under path rootdir + * + * @param rootdir qualified path of HBase root directory + * @param namespace namespace name + * @return {@link org.apache.hadoop.fs.Path} for table + */ + public static Path getNamespaceDir(Path rootdir, final String namespace) { + return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, + new Path(namespace))); + } + + /** + * Sets storage policy for given path according to config setting. + * If the passed path is a directory, we'll set the storage policy for all files + * created in the future in said directory. Note that this change in storage + * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle. + * If we're running on a FileSystem implementation that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * + * @param fs We only do anything it implements a setStoragePolicy method + * @param conf used to look up storage policy with given key; not modified. + * @param path the Path whose storage policy is to be set + * @param policyKey Key to use pulling a policy from Configuration: + * e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy). + * @param defaultPolicy if the configured policy is equal to this policy name, we will skip + * telling the FileSystem to set a storage policy. + */ + public static void setStoragePolicy(final FileSystem fs, final Configuration conf, + final Path path, final String policyKey, final String defaultPolicy) { + String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT); + if (storagePolicy.equals(defaultPolicy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("default policy of " + defaultPolicy + " requested, exiting early."); + } + return; + } + setStoragePolicy(fs, path, storagePolicy); + } + + private static final Map warningMap = + new ConcurrentHashMap(); + + /** + * Sets storage policy for given path. + *

+ * If the passed path is a directory, we'll set the storage policy for all files + * created in the future in said directory. Note that this change in storage + * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle. + * If we're running on a version of HDFS that doesn't support the given storage policy + * (or storage policies at all), then we'll issue a log message and continue. + * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html + * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. + * + * @param fs We only do anything if an instance of DistributedFileSystem + * @param path the Path whose storage policy is to be set + * @param storagePolicy Policy to set on path + */ + public static void setStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) { + try { + setStoragePolicy(fs, path, storagePolicy, false); + } catch (IOException e) { + // should never arrive here + LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); + } + } + + static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, + boolean throwException) throws IOException { + if (storagePolicy == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("We were passed a null storagePolicy, exiting early."); + } + return; + } + String trimmedStoragePolicy = storagePolicy.trim(); + if (trimmedStoragePolicy.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("We were passed an empty storagePolicy, exiting early."); + } + return; + } else { + trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); + } + if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "We were passed the defer-to-hdfs policy " + trimmedStoragePolicy + ", exiting early."); + } + return; + } + try { + invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); + } catch (IOException e) { + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn("Failed to invoke set storage policy API on FS; presuming it doesn't " + + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy + + " on path=" + path); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Failed to invoke set storage policy API on FS; presuming it doesn't " + + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy + + " on path=" + path); + } + if (throwException) { + throw e; + } + } + } + + /* + * All args have been checked and are good. Run the setStoragePolicy invocation. + */ + private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, + final String storagePolicy) throws IOException { + Method m = null; + Exception toThrow = null; + try { + m = fs.getClass().getDeclaredMethod("setStoragePolicy", + new Class[] { Path.class, String.class }); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + toThrow = e; + final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available"; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; + } catch (SecurityException e) { + toThrow = e; + final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available"; + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn(msg, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg, e); + } + m = null; // could happen on setAccessible() + } + if (m != null) { + try { + m.invoke(fs, path, storagePolicy); + if (LOG.isDebugEnabled()) { + LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path); + } + } catch (Exception e) { + toThrow = e; + // This swallows FNFE, should we be throwing it? seems more likely to indicate dev + // misuse than a runtime problem with HDFS. + if (!warningMap.containsKey(fs)) { + warningMap.put(fs, true); + LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); + } + // check for lack of HDFS-7228 + if (e instanceof InvocationTargetException) { + final Throwable exception = e.getCause(); + if (exception instanceof RemoteException && + HadoopIllegalArgumentException.class.getName().equals( + ((RemoteException)exception).getClassName())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + + "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + + "trying to use SSD related policies then you're likely missing HDFS-7228. For " + + "more information see the 'ArchivalStorage' docs for your Hadoop release."); + } + } + } + } + } + if (toThrow != null) { + throw new IOException(toThrow); + } + } + + + /** + * @param conf must not be null + * @return True if this filesystem whose scheme is 'hdfs'. + * @throws IOException from underlying FileSystem + */ + public static boolean isHDFS(final Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + String scheme = fs.getUri().getScheme(); + return scheme.equalsIgnoreCase("hdfs"); + } + + /** + * Checks if the given path is the one with 'recovered.edits' dir. + * @param path must not be null + * @return True if we recovered edits + */ + public static boolean isRecoveredEdits(Path path) { + return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); + } + + /** + * @param conf must not be null + * @return Returns the filesystem of the hbase rootdir. + * @throws IOException from underlying FileSystem + */ + public static FileSystem getCurrentFileSystem(Configuration conf) + throws IOException { + return getRootDir(conf).getFileSystem(conf); + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This accommodates differences between hadoop versions, where hadoop 1 + * does not throw a FileNotFoundException, and return an empty FileStatus[] + * while Hadoop 2 will throw FileNotFoundException. + * + * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem, + * Path, FileStatusFilter) instead. + * + * @param fs file system + * @param dir directory + * @param filter path filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus [] listStatus(final FileSystem fs, + final Path dir, final PathFilter filter) throws IOException { + FileStatus [] status = null; + try { + status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + if (status == null || status.length < 1) { + return null; + } + return status; + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This would accommodates differences between hadoop versions + * + * @param fs file system + * @param dir directory + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { + return listStatus(fs, dir, null); + } + + /** + * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call + * + * @param fs file system + * @param dir directory + * @return LocatedFileStatus list + */ + public static List listLocatedStatus(final FileSystem fs, + final Path dir) throws IOException { + List status = null; + try { + RemoteIterator locatedFileStatusRemoteIterator = fs + .listFiles(dir, false); + while (locatedFileStatusRemoteIterator.hasNext()) { + if (status == null) { + status = Lists.newArrayList(); + } + status.add(locatedFileStatusRemoteIterator.next()); + } + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + return status; + } + + /** + * Calls fs.delete() and returns the value returned by the fs.delete() + * + * @param fs must not be null + * @param path must not be null + * @param recursive delete tree rooted at path + * @return the value returned by the fs.delete() + * @throws IOException from underlying FileSystem + */ + public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) + throws IOException { + return fs.delete(path, recursive); + } + + /** + * Calls fs.exists(). Checks if the specified path exists + * + * @param fs must not be null + * @param path must not be null + * @return the value returned by fs.exists() + * @throws IOException from underlying FileSystem + */ + public static boolean isExists(final FileSystem fs, final Path path) throws IOException { + return fs.exists(path); + } + + /** + * Log the current state of the filesystem from a certain root directory + * @param fs filesystem to investigate + * @param root root file/directory to start logging from + * @param LOG log to output information + * @throws IOException if an unexpected exception occurs + */ + public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG) + throws IOException { + LOG.debug("Current file system:"); + logFSTree(LOG, fs, root, "|-"); + } + + /** + * Recursive helper to log the state of the FS + * + * @see #logFileSystemState(FileSystem, Path, Log) + */ + private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix) + throws IOException { + FileStatus[] files = listStatus(fs, root, null); + if (files == null) { + return; + } + + for (FileStatus file : files) { + if (file.isDirectory()) { + LOG.debug(prefix + file.getPath().getName() + "/"); + logFSTree(LOG, fs, file.getPath(), prefix + "---"); + } else { + LOG.debug(prefix + file.getPath().getName()); + } + } + } + + public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) + throws IOException { + // set the modify time for TimeToLive Cleaner + fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); + return fs.rename(src, dest); + } + + /** + * Do our short circuit read setup. + * Checks buffer size to use and whether to do checksumming in hbase or hdfs. + * @param conf must not be null + */ + public static void setupShortCircuitRead(final Configuration conf) { + // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property. + boolean shortCircuitSkipChecksum = + conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false); + boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); + if (shortCircuitSkipChecksum) { + LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " + + "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " + + "it, see https://issues.apache.org/jira/browse/HBASE-6868." : "")); + assert !shortCircuitSkipChecksum; //this will fail if assertions are on + } + checkShortCircuitReadBufferSize(conf); + } + + /** + * Check if short circuit read buffer size is set and if not, set it to hbase value. + * @param conf must not be null + */ + public static void checkShortCircuitReadBufferSize(final Configuration conf) { + final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2; + final int notSet = -1; + // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2 + final String dfsKey = "dfs.client.read.shortcircuit.buffer.size"; + int size = conf.getInt(dfsKey, notSet); + // If a size is set, return -- we will use it. + if (size != notSet) { + return; + } + // But short circuit buffer size is normally not set. Put in place the hbase wanted size. + int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); + conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); + } + + // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and + // not until we attempt to reference it. + private static class StreamCapabilities { + public static final boolean PRESENT; + public static final Class CLASS; + public static final Method METHOD; + static { + boolean tmp = false; + Class clazz = null; + Method method = null; + try { + clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + method = clazz.getMethod("hasCapability", String.class); + tmp = true; + } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) { + LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " + + "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " + + "support hflush/hsync. If you are running on top of HDFS this probably just " + + "means you have an older version and this can be ignored. If you are running on " + + "top of an alternate FileSystem implementation you should manually verify that " + + "hflush and hsync are implemented; otherwise you risk data loss and hard to " + + "diagnose errors when our assumptions are violated."); + LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.", + exception); + } finally { + PRESENT = tmp; + CLASS = clazz; + METHOD = method; + } + } + } + + /** + * If our FileSystem version includes the StreamCapabilities class, check if + * the given stream has a particular capability. + * @param stream capabilities are per-stream instance, so check this one specifically. must not be + * null + * @param capability what to look for, per Hadoop Common's FileSystem docs + * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't + * implement it. return result of asking the stream otherwise. + */ + public static boolean hasCapability(FSDataOutputStream stream, String capability) { + // be consistent whether or not StreamCapabilities is present + if (stream == null) { + throw new NullPointerException("stream parameter must not be null."); + } + // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything + // otherwise old versions of Hadoop will break. + boolean result = true; + if (StreamCapabilities.PRESENT) { + // if StreamCapabilities is present, but the stream doesn't implement it + // or we run into a problem invoking the method, + // we treat that as equivalent to not declaring anything + result = false; + if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) { + try { + result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue(); + } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException + exception) { + LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " + + "our understanding of how it's supposed to work. Please file a JIRA and include " + + "the following stack trace. In the mean time we're interpreting this behavior " + + "difference as a lack of capability support, which will probably cause a failure.", + exception); + } + } + } + return result; + } + + /** + * Helper exception for those cases where the place where we need to check a stream capability + * is not where we have the needed context to explain the impact and mitigation for a lack. + */ + public static class StreamLacksCapabilityException extends IOException { + private static final long serialVersionUID = 1L; + public StreamLacksCapabilityException(String message, Throwable cause) { + super(message, cause); + } + public StreamLacksCapabilityException(String message) { + super(message); + } + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java new file mode 100644 index 0000000000..7ff579277c --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java @@ -0,0 +1,164 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test {@link CommonFSUtils}. + */ +@Category({MiscTests.class, MediumTests.class}) +public class TestCommonFSUtils { + private static final Log LOG = LogFactory.getLog(TestCommonFSUtils.class); + + private HBaseCommonTestingUtility htu; + private Configuration conf; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + conf = htu.getConfiguration(); + } + + /** + * Test path compare and prefix checking. + */ + @Test + public void testMatchingTail() throws IOException { + Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); + assertTrue(rootdir.depth() > 1); + Path partPath = new Path("a", "b"); + Path fullPath = new Path(rootdir, partPath); + Path fullyQualifiedPath = fs.makeQualified(fullPath); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath)); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath.toString())); + assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullPath.toString())); + assertTrue(CommonFSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString())); + assertFalse(CommonFSUtils.isStartingWithPath(rootdir, partPath.toString())); + assertFalse(CommonFSUtils.isMatchingTail(fullyQualifiedPath, partPath)); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath)); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString())); + assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath))); + assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString())); + assertFalse(CommonFSUtils.isMatchingTail(fullPath, new Path("x"))); + assertFalse(CommonFSUtils.isMatchingTail(new Path("x"), fullPath)); + } + + private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) + throws Exception { + FSDataOutputStream out = fs.create(file); + byte [] data = new byte[dataSize]; + out.write(data, 0, dataSize); + out.close(); + } + + @Test + public void testSetWALRootDir() throws Exception { + Path p = new Path("file:///hbase/root"); + CommonFSUtils.setWALRootDir(conf, p); + assertEquals(p.toString(), conf.get(CommonFSUtils.HBASE_WAL_DIR)); + } + + @Test + public void testGetWALRootDir() throws IOException { + Path root = new Path("file:///hbase/root"); + Path walRoot = new Path("file:///hbase/logroot"); + CommonFSUtils.setRootDir(conf, root); + assertEquals(CommonFSUtils.getRootDir(conf), root); + assertEquals(CommonFSUtils.getWALRootDir(conf), root); + CommonFSUtils.setWALRootDir(conf, walRoot); + assertEquals(CommonFSUtils.getWALRootDir(conf), walRoot); + } + + @Test(expected=IllegalStateException.class) + public void testGetWALRootDirIllegalWALDir() throws IOException { + Path root = new Path("file:///hbase/root"); + Path invalidWALDir = new Path("file:///hbase/root/logroot"); + CommonFSUtils.setRootDir(conf, root); + CommonFSUtils.setWALRootDir(conf, invalidWALDir); + CommonFSUtils.getWALRootDir(conf); + } + + @Test + public void testRemoveWALRootPath() throws Exception { + CommonFSUtils.setRootDir(conf, new Path("file:///user/hbase")); + Path testFile = new Path(CommonFSUtils.getRootDir(conf), "test/testfile"); + Path tmpFile = new Path("file:///test/testfile"); + assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), "test/testfile"); + assertEquals(CommonFSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString()); + CommonFSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir")); + assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), testFile.toString()); + Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog"); + assertEquals(CommonFSUtils.removeWALRootPath(logFile, conf), "test/testlog"); + } + + @Test(expected=NullPointerException.class) + public void streamCapabilitiesDoesNotAllowNullStream() { + CommonFSUtils.hasCapability(null, "hopefully any string"); + } + + private static final boolean STREAM_CAPABILITIES_IS_PRESENT; + static { + boolean tmp = false; + try { + Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + tmp = true; + LOG.debug("Test thought StreamCapabilities class was present."); + } catch (ClassNotFoundException exception) { + LOG.debug("Test didn't think StreamCapabilities class was present."); + } finally { + STREAM_CAPABILITIES_IS_PRESENT = tmp; + } + } + + @Test + public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException { + FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "hsync")); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "hflush")); + assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " + + "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT, + CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " + + "implement.")); + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index e125c6aea5..f5d463bc2c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -67,6 +69,9 @@ import org.apache.hadoop.ipc.RemoteException; public class WALProcedureStore extends ProcedureStoreBase { private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); + /** Used to construct the name of the log directory for master procedures */ + public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; + public interface LeaseRecovery { void recoverFileLease(FileSystem fs, Path path) throws IOException; } @@ -170,12 +175,21 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, - final LeaseRecovery leaseRecovery) { - this.fs = fs; + public WALProcedureStore(final Configuration conf, final Path walDir, + final LeaseRecovery leaseRecovery) throws IOException { this.conf = conf; this.walDir = walDir; this.leaseRecovery = leaseRecovery; + this.fs = walDir.getFileSystem(conf); + // Create the log directory for the procedure store + if (!fs.exists(walDir)) { + if (!fs.mkdirs(walDir)) { + throw new IOException("Unable to mkdir " + walDir); + } + } + // Now that it exists, set the log policy + CommonFSUtils.setStoragePolicy(fs, conf, walDir, HConstants.WAL_STORAGE_POLICY, + HConstants.DEFAULT_WAL_STORAGE_POLICY); } @Override @@ -844,6 +858,17 @@ public class WALProcedureStore extends ProcedureStoreBase { LOG.warn("failed to create log file with id=" + logId, re); return false; } + // After we create the stream but before we attempt to use it at all + // ensure that we can provide the level of data safety we're configured + // to provide. + final String durability = useHsync ? "hsync" : "hflush"; + if (!(CommonFSUtils.hasCapability(newStream, durability))) { + throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + + " for proper operation during component failures, but the underlying filesystem does " + + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + + "' to set the desired level of robustness and ensure the config value of '" + + CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); + } try { ProcedureWALFormat.writeHeader(newStream, header); startPos = newStream.getPos(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index c4b4b9f953..79b4e2bca6 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -53,14 +53,14 @@ public class ProcedureTestingUtility { private ProcedureTestingUtility() { } - public static ProcedureStore createStore(final Configuration conf, final FileSystem fs, - final Path baseDir) throws IOException { - return createWalStore(conf, fs, baseDir); + public static ProcedureStore createStore(final Configuration conf, final Path dir) + throws IOException { + return createWalStore(conf, dir); } - public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs, - final Path walDir) throws IOException { - return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() { + public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) + throws IOException { + return new WALProcedureStore(conf, dir, new WALProcedureStore.LeaseRecovery() { @Override public void recoverFileLease(FileSystem fs, Path path) throws IOException { // no-op diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java index 0937f70b51..731e6f6067 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java @@ -62,7 +62,7 @@ public class TestChildProcedures { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java index a259f49f8a..b2251554f7 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java @@ -64,7 +64,7 @@ public class TestProcedureExecution { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java index f275426628..772a37638d 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java @@ -69,7 +69,7 @@ public class TestProcedureNonce { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index fa9ddfa678..6add884062 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -70,7 +70,7 @@ public class TestProcedureRecovery { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 2557c982b4..06b66d2d8a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -69,7 +69,7 @@ public class TestProcedureReplayOrder { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcedureEnv(); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procStore.start(NUM_THREADS); procExecutor.start(1, true); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index ba362917ed..fc35e118ba 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -33,20 +31,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; -import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.After; import org.junit.Before; -import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; @Category(SmallTests.class) public class TestYieldProcedures { @@ -72,7 +65,7 @@ public class TestYieldProcedures { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procRunnables = new TestRunQueue(); procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java index d34d2594db..9749925b9f 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java @@ -132,7 +132,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { Path logDir = new Path(testDir, "proc-logs"); System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); fs.delete(logDir, true); - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + store = ProcedureTestingUtility.createWalStore(conf, logDir); store.start(1); store.recoverLease(); store.load(new LoadCounter()); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java index 7565c41a38..1970d3870c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java @@ -93,9 +93,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { System.out.println("Logs directory : " + logDir.toString()); fs.delete(logDir, true); if ("nosync".equals(syncType)) { - store = new NoSyncWalProcedureStore(conf, fs, logDir); + store = new NoSyncWalProcedureStore(conf, logDir); } else { - store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + store = ProcedureTestingUtility.createWalStore(conf, logDir); } store.start(numThreads); store.recoverLease(); @@ -237,10 +237,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { } } - public static class NoSyncWalProcedureStore extends WALProcedureStore { - public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, - final Path logDir) { - super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { + private class NoSyncWalProcedureStore extends WALProcedureStore { + public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException { + super(conf, logDir, new WALProcedureStore.LeaseRecovery() { @Override public void recoverFileLease(FileSystem fs, Path path) throws IOException { // no-op diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java index 8b1c49af2c..d066b743af 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java @@ -74,7 +74,7 @@ public class TestStressWALProcedureStore { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 2bf95561dc..50089fce84 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -24,7 +24,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.Comparator; -import java.util.Iterator; import java.util.HashSet; import java.util.Set; @@ -47,7 +46,6 @@ import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; -import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -61,7 +59,6 @@ public class TestWALProcedureStore { private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class); private static final int PROCEDURE_STORE_SLOTS = 1; - private static final Procedure NULL_PROC = null; private WALProcedureStore procStore; @@ -78,7 +75,7 @@ public class TestWALProcedureStore { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); - procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procStore.start(PROCEDURE_STORE_SLOTS); procStore.recoverLease(); procStore.load(new LoadCounter()); @@ -458,7 +455,7 @@ public class TestWALProcedureStore { assertEquals(procs.length + 2, status.length); // simulate another active master removing the wals - procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir, + procStore = new WALProcedureStore(htu.getConfiguration(), logDir, new WALProcedureStore.LeaseRecovery() { private int count = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java index e55916f88a..c6ad4a2474 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java @@ -66,9 +66,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_DIR; public class HFileSystem extends FilterFileSystem { public static final Log LOG = LogFactory.getLog(HFileSystem.class); - /** Parameter name for HBase WAL directory */ - public static final String HBASE_WAL_DIR = "hbase.wal.dir"; - private final FileSystem noChecksumFs; // read hfile data from storage private final boolean useHBaseChecksum; private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0d0b5c1e85..9d986d9a70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -49,7 +49,6 @@ import javax.servlet.http.HttpServletResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClusterStatus; @@ -156,10 +155,10 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EncryptionTest; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.ModifyRegionUtils; @@ -451,7 +450,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024)); - LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) + + LOG.info("hbase.rootdir=" + CommonFSUtils.getRootDir(this.conf) + ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); // Disable usage of meta replicas in the master @@ -1310,23 +1309,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private void startProcedureExecutor() throws IOException { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); - final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), - MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); - - final FileSystem walFs = walDir.getFileSystem(conf); - - // Create the log directory for the procedure store - if (!walFs.exists(walDir)) { - if (!walFs.mkdirs(walDir)) { - throw new IOException("Unable to mkdir " + walDir); - } - } - // Now that it exists, set the log policy - String storagePolicy = - conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); - FSUtils.setStoragePolicy(walFs, walDir, storagePolicy); - - procedureStore = new WALProcedureStore(conf, walFs, walDir, + final Path walDir = new Path(CommonFSUtils.getWALRootDir(this.conf), + WALProcedureStore.MASTER_PROCEDURE_LOGDIR); + procedureStore = new WALProcedureStore(conf, walDir, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index c1bd9307bc..892bcf002d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; @@ -166,7 +167,7 @@ public class MasterFileSystem { checkRootDir(this.rootdir, conf, this.fs, HConstants.HBASE_DIR, HBASE_DIR_PERMS); // if the log directory is different from root, check if it exists if (!this.walRootDir.equals(this.rootdir)) { - checkRootDir(this.walRootDir, conf, this.walFs, HFileSystem.HBASE_WAL_DIR, HBASE_WAL_DIR_PERMS); + checkRootDir(this.walRootDir, conf, this.walFs, CommonFSUtils.HBASE_WAL_DIR, HBASE_WAL_DIR_PERMS); } // check if temp directory exists and clean it diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java index c21137d949..6d7d09a0fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -24,9 +24,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public final class MasterProcedureConstants { private MasterProcedureConstants() {} - /** Used to construct the name of the log directory for master procedures */ - public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; - /** Number of threads used by the procedure executor */ public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 2bfc6a8e77..69647621a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -77,7 +79,7 @@ public class ProtobufLogWriter extends WriterBase { @Override @SuppressWarnings("deprecation") public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) - throws IOException { + throws IOException { super.init(fs, path, conf, overwritable); assert this.output == null; boolean doCompress = initializeCompressionContext(conf, path); @@ -87,6 +89,10 @@ public class ProtobufLogWriter extends WriterBase { "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); long blockSize = WALUtil.getWALBlockSize(conf, fs, path); output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); + // TODO Be sure to add a check for hsync if this branch includes HBASE-19024 + if (!(CommonFSUtils.hasCapability(output, "hflush"))) { + throw new StreamLacksCapabilityException("hflush"); + } output.write(ProtobufLogReader.PB_WAL_MAGIC); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index f558618de2..299abea746 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.util; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import edu.umd.cs.findbugs.annotations.CheckForNull; @@ -36,8 +35,6 @@ import java.io.InterruptedIOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -60,17 +57,14 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.ClusterId; @@ -94,185 +88,29 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; - -import static org.apache.hadoop.hbase.HConstants.HBASE_DIR; - /** * Utility methods for interacting with the underlying file system. */ @InterfaceAudience.Private -public abstract class FSUtils { +public abstract class FSUtils extends CommonFSUtils { private static final Log LOG = LogFactory.getLog(FSUtils.class); - /** Full access permissions (starting point for a umask) */ - public static final String FULL_RWX_PERMISSIONS = "777"; private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize"; private static final int DEFAULT_THREAD_POOLSIZE = 2; /** Set to true on Windows platforms */ + @VisibleForTesting // currently only used in testing. TODO refactor into a test class public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); protected FSUtils() { super(); } - private static final Map warningMap = - new ConcurrentHashMap(); - - /** - * Sets storage policy for given path. - *

- * If the passed path is a directory, we'll set the storage policy for all files - * created in the future in said directory. Note that this change in storage - * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle. - * If we're running on a version of HDFS that doesn't support the given storage policy - * (or storage policies at all), then we'll issue a log message and continue. - * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html - * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'. - * - * @param fs We only do anything if an instance of DistributedFileSystem - * @param path the Path whose storage policy is to be set - * @param storagePolicy Policy to set on path - */ - public static void setStoragePolicy(final FileSystem fs, final Path path, - final String storagePolicy) { - try { - setStoragePolicy(fs, path, storagePolicy, false); - } catch (IOException e) { - // should never arrive here - LOG.warn("We have chosen not to throw exception but some unexpectedly thrown out", e); - } - } - - static void setStoragePolicy(final FileSystem fs, final Path path, final String storagePolicy, - boolean throwException) throws IOException { - if (storagePolicy == null) { - if (LOG.isTraceEnabled()) { - LOG.trace("We were passed a null storagePolicy, exiting early."); - } - return; - } - String trimmedStoragePolicy = storagePolicy.trim(); - if (trimmedStoragePolicy.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("We were passed an empty storagePolicy, exiting early."); - } - return; - } else { - trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT); - } - if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) { - if (LOG.isTraceEnabled()) { - LOG.trace( - "We were passed the defer-to-hdfs policy " + trimmedStoragePolicy + ", exiting early."); - } - return; - } - boolean distributed = false; - try { - distributed = isDistributedFileSystem(fs); - } catch (IOException ioe) { - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " - + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy - + " on path=" + path); - } else if (LOG.isDebugEnabled()) { - LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " - + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy - + " on path=" + path); - } - return; - } - if (distributed) { - try { - invokeSetStoragePolicy(fs, path, trimmedStoragePolicy); - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Failed to invoke set storage policy API on FS", e); - } - if (throwException) { - throw e; - } - } - } - } - - /* - * All args have been checked and are good. Run the setStoragePolicy invocation. - */ - private static void invokeSetStoragePolicy(final FileSystem fs, final Path path, - final String storagePolicy) throws IOException { - Method m = null; - Exception toThrow = null; - try { - m = fs.getClass().getDeclaredMethod("setStoragePolicy", - new Class[] { Path.class, String.class }); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - toThrow = e; - final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available"; - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn(msg, e); - } else if (LOG.isDebugEnabled()) { - LOG.debug(msg, e); - } - m = null; - } catch (SecurityException e) { - toThrow = e; - final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available"; - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn(msg, e); - } else if (LOG.isDebugEnabled()) { - LOG.debug(msg, e); - } - m = null; // could happen on setAccessible() - } - if (m != null) { - try { - m.invoke(fs, path, storagePolicy); - if (LOG.isDebugEnabled()) { - LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path); - } - } catch (Exception e) { - toThrow = e; - // This swallows FNFE, should we be throwing it? seems more likely to indicate dev - // misuse than a runtime problem with HDFS. - if (!warningMap.containsKey(fs)) { - warningMap.put(fs, true); - LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); - } else if (LOG.isDebugEnabled()) { - LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e); - } - // check for lack of HDFS-7228 - if (e instanceof InvocationTargetException) { - final Throwable exception = e.getCause(); - if (exception instanceof RemoteException && - HadoopIllegalArgumentException.class.getName().equals( - ((RemoteException)exception).getClassName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " + - "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " + - "trying to use SSD related policies then you're likely missing HDFS-7228. For " + - "more information see the 'ArchivalStorage' docs for your Hadoop release."); - } - } - } - } - } - if (toThrow != null) { - throw new IOException(toThrow); - } - } - /** * @return True is fs is instance of DistributedFileSystem * @throws IOException @@ -287,32 +125,6 @@ public abstract class FSUtils { return fileSystem instanceof DistributedFileSystem; } - /** - * Compare of path component. Does not consider schema; i.e. if schemas - * different but path starts with rootPath, - * then the function returns true - * @param rootPath - * @param path - * @return True if path starts with rootPath - */ - public static boolean isStartingWithPath(final Path rootPath, final String path) { - String uriRootPath = rootPath.toUri().getPath(); - String tailUriPath = (new Path(path)).toUri().getPath(); - return tailUriPath.startsWith(uriRootPath); - } - - /** - * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the - * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches, - * the two will equate. - * @param pathToSearch Path we will be trying to match. - * @param pathTail - * @return True if pathTail is tail on the path of pathToSearch - */ - public static boolean isMatchingTail(final Path pathToSearch, String pathTail) { - return isMatchingTail(pathToSearch, new Path(pathTail)); - } - /** * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true. Does not consider @@ -356,18 +168,6 @@ public abstract class FSUtils { return fsUtils; } - /** - * Delete if exists. - * @param fs filesystem object - * @param dir directory to delete - * @return True if deleted dir - * @throws IOException e - */ - public static boolean deleteDirectory(final FileSystem fs, final Path dir) - throws IOException { - return fs.exists(dir) && fs.delete(dir, true); - } - /** * Delete the region directory if exists. * @param conf @@ -383,89 +183,7 @@ public abstract class FSUtils { new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName())); } - /** - * Return the number of bytes that large input files should be optimally - * be split into to minimize i/o time. - * - * use reflection to search for getDefaultBlockSize(Path f) - * if the method doesn't exist, fall back to using getDefaultBlockSize() - * - * @param fs filesystem object - * @return the default block size for the path's filesystem - * @throws IOException e - */ - public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { - Method m = null; - Class cls = fs.getClass(); - try { - m = cls.getMethod("getDefaultBlockSize", new Class[] { Path.class }); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem doesn't support getDefaultBlockSize"); - } catch (SecurityException e) { - LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); - m = null; // could happen on setAccessible() - } - if (m == null) { - return fs.getDefaultBlockSize(path); - } else { - try { - Object ret = m.invoke(fs, path); - return ((Long)ret).longValue(); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - /* - * Get the default replication. - * - * use reflection to search for getDefaultReplication(Path f) - * if the method doesn't exist, fall back to using getDefaultReplication() - * - * @param fs filesystem object - * @param f path of file - * @return default replication for the path's filesystem - * @throws IOException e - */ - public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException { - Method m = null; - Class cls = fs.getClass(); - try { - m = cls.getMethod("getDefaultReplication", new Class[] { Path.class }); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem doesn't support getDefaultReplication"); - } catch (SecurityException e) { - LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); - m = null; // could happen on setAccessible() - } - if (m == null) { - return fs.getDefaultReplication(path); - } else { - try { - Object ret = m.invoke(fs, path); - return ((Number)ret).shortValue(); - } catch (Exception e) { - throw new IOException(e); - } - } - } - - /** - * Returns the default buffer size to use during writes. - * - * The size of the buffer should probably be a multiple of hardware - * page size (4096 on Intel x86), and it determines how much data is - * buffered during read and write operations. - * - * @param fs filesystem object - * @return default buffer size to use during writes - */ - public static int getDefaultBufferSize(final FileSystem fs) { - return fs.getConf().getInt("io.file.buffer.size", 4096); - } - - /** + /** * Create the specified file on the filesystem. By default, this will: *

    *
  1. overwrite the file if it exists
  2. @@ -518,71 +236,6 @@ public abstract class FSUtils { return create(fs, path, perm, true); } - /** - * Create the specified file on the filesystem. By default, this will: - *
      - *
    1. apply the umask in the configuration (if it is enabled)
    2. - *
    3. use the fs configured buffer size (or 4096 if not set)
    4. - *
    5. use the default replication
    6. - *
    7. use the default block size
    8. - *
    9. not track progress
    10. - *
    - * - * @param fs {@link FileSystem} on which to write the file - * @param path {@link Path} to the file to write - * @param perm - * @param overwrite Whether or not the created file should be overwritten. - * @return output stream to the created file - * @throws IOException if the file cannot be created - */ - public static FSDataOutputStream create(FileSystem fs, Path path, - FsPermission perm, boolean overwrite) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite); - } - return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), - getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); - } - - /** - * Get the file permissions specified in the configuration, if they are - * enabled. - * - * @param fs filesystem that the file will be created on. - * @param conf configuration to read for determining if permissions are - * enabled and which to use - * @param permssionConfKey property key in the configuration to use when - * finding the permission - * @return the permission to use when creating a new file on the fs. If - * special permissions are not specified in the configuration, then - * the default permissions on the the fs will be returned. - */ - public static FsPermission getFilePermissions(final FileSystem fs, - final Configuration conf, final String permssionConfKey) { - boolean enablePermissions = conf.getBoolean( - HConstants.ENABLE_DATA_FILE_UMASK, false); - - if (enablePermissions) { - try { - FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS); - // make sure that we have a mask, if not, go default. - String mask = conf.get(permssionConfKey); - if (mask == null) - return FsPermission.getFileDefault(); - // appy the umask - FsPermission umask = new FsPermission(mask); - return perm.applyUMask(umask); - } catch (IllegalArgumentException e) { - LOG.warn( - "Incorrect umask attempted to be created: " - + conf.get(permssionConfKey) - + ", using default file permissions.", e); - return FsPermission.getFileDefault(); - } - } - return FsPermission.getFileDefault(); - } - /** * Checks to see if the specified file system is available * @@ -1025,46 +678,6 @@ public abstract class FSUtils { } } - /** - * Verifies root directory path is a valid URI with a scheme - * - * @param root root directory path - * @return Passed root argument. - * @throws IOException if not a valid URI with a scheme - */ - public static Path validateRootPath(Path root) throws IOException { - try { - URI rootURI = new URI(root.toString()); - String scheme = rootURI.getScheme(); - if (scheme == null) { - throw new IOException("Root directory does not have a scheme"); - } - return root; - } catch (URISyntaxException e) { - IOException io = new IOException("Root directory path is not a valid " + - "URI -- check your " + HBASE_DIR + " configuration"); - io.initCause(e); - throw io; - } - } - - /** - * Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If - * it exists, this method removes it and returns the String representation of remaining relative path. - * @param path - * @param conf - * @return String representation of the remaining relative path - * @throws IOException - */ - public static String removeWALRootPath(Path path, final Configuration conf) throws IOException { - Path root = getWALRootDir(conf); - String pathStr = path.toString(); - // check that the path is absolute... it has the root path in it. - if (!pathStr.startsWith(root.toString())) return pathStr; - // if not, return as it is. - return pathStr.substring(root.toString().length() + 1);// remove the "/" too. - } - /** * If DFS, check safe mode and if so, wait until we clear it. * @param conf configuration @@ -1088,81 +701,6 @@ public abstract class FSUtils { } } - /** - * Return the 'path' component of a Path. In Hadoop, Path is an URI. This - * method returns the 'path' component of a Path's URI: e.g. If a Path is - * hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir, - * this method returns /hbase_trunk/TestTable/compaction.dir. - * This method is useful if you want to print out a Path without qualifying - * Filesystem instance. - * @param p Filesystem Path whose 'path' component we are to return. - * @return Path portion of the Filesystem - */ - public static String getPath(Path p) { - return p.toUri().getPath(); - } - - /** - * @param c configuration - * @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from - * configuration as a qualified Path. - * @throws IOException e - */ - public static Path getRootDir(final Configuration c) throws IOException { - Path p = new Path(c.get(HBASE_DIR)); - FileSystem fs = p.getFileSystem(c); - return p.makeQualified(fs); - } - - public static void setRootDir(final Configuration c, final Path root) throws IOException { - c.set(HBASE_DIR, root.toString()); - } - - public static void setFsDefault(final Configuration c, final Path root) throws IOException { - c.set("fs.defaultFS", root.toString()); // for hadoop 0.21+ - } - - public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { - Path p = getRootDir(c); - return p.getFileSystem(c); - } - - /** - * @param c configuration - * @return {@link Path} to hbase log root directory: i.e. {@value org.apache.hadoop.hbase.fs.HFileSystem#HBASE_WAL_DIR} from - * configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} - * @throws IOException e - */ - public static Path getWALRootDir(final Configuration c) throws IOException { - Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR))); - if (!isValidWALRootDir(p, c)) { - return FSUtils.getRootDir(c); - } - FileSystem fs = p.getFileSystem(c); - return p.makeQualified(fs); - } - - @VisibleForTesting - public static void setWALRootDir(final Configuration c, final Path root) throws IOException { - c.set(HFileSystem.HBASE_WAL_DIR, root.toString()); - } - - public static FileSystem getWALFileSystem(final Configuration c) throws IOException { - Path p = getWALRootDir(c); - return p.getFileSystem(c); - } - - private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { - Path rootDir = FSUtils.getRootDir(c); - if (!walDir.equals(rootDir)) { - if (walDir.toString().startsWith(rootDir.toString() + "/")) { - throw new IllegalStateException("Illegal WAL directory specified. " + - "WAL directories are not permitted to be under the root directory if set."); - } - } - return true; - } - /** * Returns the WAL region directory based on the region info * @param conf configuration to determine WALRootDir @@ -1172,9 +710,9 @@ public abstract class FSUtils { */ public static Path getWALRegionDir(final Configuration conf, final HRegionInfo regionInfo) - throws IOException { + throws IOException { return new Path(getWALTableDir(conf, regionInfo.getTable()), - regionInfo.getEncodedName()); + regionInfo.getEncodedName()); } /** @@ -1314,19 +852,6 @@ public abstract class FSUtils { return frags; } - /** - * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under - * path rootdir - * - * @param rootdir qualified path of HBase root directory - * @param tableName name of table - * @return {@link org.apache.hadoop.fs.Path} for table - */ - public static Path getTableDir(Path rootdir, final TableName tableName) { - return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()), - tableName.getQualifierAsString()); - } - /** * Returns the Table directory under the WALRootDir for the specified table name * @param conf configuration used to get the WALRootDir @@ -1337,32 +862,7 @@ public abstract class FSUtils { public static Path getWALTableDir(final Configuration conf, final TableName tableName) throws IOException { return new Path(new Path(getWALRootDir(conf), tableName.getNamespaceAsString()), - tableName.getQualifierAsString()); - } - - /** - * Returns the {@link org.apache.hadoop.hbase.TableName} object representing - * the table directory under - * path rootdir - * - * @param tablePath path of table - * @return {@link org.apache.hadoop.fs.Path} for table - */ - public static TableName getTableName(Path tablePath) { - return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName()); - } - - /** - * Returns the {@link org.apache.hadoop.fs.Path} object representing - * the namespace directory under path rootdir - * - * @param rootdir qualified path of HBase root directory - * @param namespace namespace name - * @return {@link org.apache.hadoop.fs.Path} for table - */ - public static Path getNamespaceDir(Path rootdir, final String namespace) { - return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR, - new Path(namespace))); + tableName.getQualifierAsString()); } /** @@ -1494,17 +994,6 @@ public abstract class FSUtils { return append; } - /** - * @param conf - * @return True if this filesystem whose scheme is 'hdfs'. - * @throws IOException - */ - public static boolean isHDFS(final Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - String scheme = fs.getUri().getScheme(); - return scheme.equalsIgnoreCase("hdfs"); - } - /** * Recover file lease. Used when a file might be suspect * to be had been left open by another process. @@ -1546,15 +1035,6 @@ public abstract class FSUtils { return tabledirs; } - /** - * Checks if the given path is the one with 'recovered.edits' dir. - * @param path - * @return True if we recovered edits - */ - public static boolean isRecoveredEdits(Path path) { - return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); - } - /** * Filter for all dirs that don't start with '.' */ @@ -1731,18 +1211,6 @@ public abstract class FSUtils { } } - - /** - * @param conf - * @return Returns the filesystem of the hbase rootdir. - * @throws IOException - */ - public static FileSystem getCurrentFileSystem(Configuration conf) - throws IOException { - return getRootDir(conf).getFileSystem(conf); - } - - /** * Runs through the HBase rootdir/tablename and creates a reverse lookup map for * table StoreFile names to the full Path. @@ -2040,101 +1508,6 @@ public abstract class FSUtils { } } - /** - * Calls fs.listStatus() and treats FileNotFoundException as non-fatal - * This accommodates differences between hadoop versions, where hadoop 1 - * does not throw a FileNotFoundException, and return an empty FileStatus[] - * while Hadoop 2 will throw FileNotFoundException. - * - * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem, - * Path, FileStatusFilter)} instead. - * - * @param fs file system - * @param dir directory - * @param filter path filter - * @return null if dir is empty or doesn't exist, otherwise FileStatus array - */ - public static FileStatus [] listStatus(final FileSystem fs, - final Path dir, final PathFilter filter) throws IOException { - FileStatus [] status = null; - try { - status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); - } catch (FileNotFoundException fnfe) { - // if directory doesn't exist, return null - if (LOG.isTraceEnabled()) { - LOG.trace(dir + " doesn't exist"); - } - } - if (status == null || status.length < 1) return null; - return status; - } - - /** - * Calls fs.listStatus() and treats FileNotFoundException as non-fatal - * This would accommodates differences between hadoop versions - * - * @param fs file system - * @param dir directory - * @return null if dir is empty or doesn't exist, otherwise FileStatus array - */ - public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { - return listStatus(fs, dir, null); - } - - /** - * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call - * - * @param fs file system - * @param dir directory - * @return LocatedFileStatus list - */ - public static List listLocatedStatus(final FileSystem fs, - final Path dir) throws IOException { - List status = null; - try { - RemoteIterator locatedFileStatusRemoteIterator = fs - .listFiles(dir, false); - while (locatedFileStatusRemoteIterator.hasNext()) { - if (status == null) { - status = Lists.newArrayList(); - } - status.add(locatedFileStatusRemoteIterator.next()); - } - } catch (FileNotFoundException fnfe) { - // if directory doesn't exist, return null - if (LOG.isTraceEnabled()) { - LOG.trace(dir + " doesn't exist"); - } - } - return status; - } - - /** - * Calls fs.delete() and returns the value returned by the fs.delete() - * - * @param fs - * @param path - * @param recursive - * @return the value returned by the fs.delete() - * @throws IOException - */ - public static boolean delete(final FileSystem fs, final Path path, final boolean recursive) - throws IOException { - return fs.delete(path, recursive); - } - - /** - * Calls fs.exists(). Checks if the specified path exists - * - * @param fs - * @param path - * @return the value returned by fs.exists() - * @throws IOException - */ - public static boolean isExists(final FileSystem fs, final Path path) throws IOException { - return fs.exists(path); - } - /** * Throw an exception if an action is not permitted by a user on a file. * @@ -2171,46 +1544,6 @@ public abstract class FSUtils { return false; } - /** - * Log the current state of the filesystem from a certain root directory - * @param fs filesystem to investigate - * @param root root file/directory to start logging from - * @param LOG log to output information - * @throws IOException if an unexpected exception occurs - */ - public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG) - throws IOException { - LOG.debug("Current file system:"); - logFSTree(LOG, fs, root, "|-"); - } - - /** - * Recursive helper to log the state of the FS - * - * @see #logFileSystemState(FileSystem, Path, Log) - */ - private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix) - throws IOException { - FileStatus[] files = FSUtils.listStatus(fs, root, null); - if (files == null) return; - - for (FileStatus file : files) { - if (file.isDirectory()) { - LOG.debug(prefix + file.getPath().getName() + "/"); - logFSTree(LOG, fs, file.getPath(), prefix + "---"); - } else { - LOG.debug(prefix + file.getPath().getName()); - } - } - } - - public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest) - throws IOException { - // set the modify time for TimeToLive Cleaner - fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1); - return fs.rename(src, dest); - } - /** * This function is to scan the root path of the file system to get the * degree of locality for each region on each of the servers having at least diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java index 91e11dc26e..7f2d9dfaa2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -463,7 +464,7 @@ public class TestBlockReorder { // Should be reordered, as we pretend to be a file name with a compliant stuff Assert.assertNotNull(conf.get(HConstants.HBASE_DIR)); Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty()); - String pseudoLogFile = conf.get(HFileSystem.HBASE_WAL_DIR) + "/" + + String pseudoLogFile = conf.get(CommonFSUtils.HBASE_WAL_DIR) + "/" + HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile"; // Check that it will be possible to extract a ServerName from our construction diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java index abd16f78d2..28728afb0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java @@ -99,7 +99,7 @@ public class TestWALProcedureStoreOnHDFS { MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); - store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir); + store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir); store.registerListener(stopProcedureListener); store.start(8); store.recoverLease(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 10e7412361..018bab6ee3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -57,11 +57,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -70,8 +68,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; @@ -90,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -147,7 +144,7 @@ public class TestWALReplay { Path hbaseWALRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaselog")); LOG.info(HConstants.HBASE_DIR + "=" + hbaseRootDir); - LOG.info(HFileSystem.HBASE_WAL_DIR + "=" + hbaseWALRootDir); + LOG.info(CommonFSUtils.HBASE_WAL_DIR + "=" + hbaseWALRootDir); FSUtils.setRootDir(conf, hbaseRootDir); FSUtils.setWALRootDir(conf, hbaseWALRootDir); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 38a78ee052..52027a9c95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -37,16 +37,15 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -57,58 +56,17 @@ import org.junit.experimental.categories.Category; public class TestFSUtils { private static final Log LOG = LogFactory.getLog(TestFSUtils.class); - /** - * Test path compare and prefix checking. - * @throws IOException - */ - @Test - public void testMatchingTail() throws IOException { - HBaseTestingUtility htu = new HBaseTestingUtility(); - final FileSystem fs = htu.getTestFileSystem(); - Path rootdir = htu.getDataTestDir(); - assertTrue(rootdir.depth() > 1); - Path partPath = new Path("a", "b"); - Path fullPath = new Path(rootdir, partPath); - Path fullyQualifiedPath = fs.makeQualified(fullPath); - assertFalse(FSUtils.isMatchingTail(fullPath, partPath)); - assertFalse(FSUtils.isMatchingTail(fullPath, partPath.toString())); - assertTrue(FSUtils.isStartingWithPath(rootdir, fullPath.toString())); - assertTrue(FSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString())); - assertFalse(FSUtils.isStartingWithPath(rootdir, partPath.toString())); - assertFalse(FSUtils.isMatchingTail(fullyQualifiedPath, partPath)); - assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath)); - assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString())); - assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath))); - assertTrue(FSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString())); - assertFalse(FSUtils.isMatchingTail(fullPath, new Path("x"))); - assertFalse(FSUtils.isMatchingTail(new Path("x"), fullPath)); - } + private HBaseTestingUtility htu; + private Configuration conf; - @Test - public void testVersion() throws DeserializationException, IOException { - HBaseTestingUtility htu = new HBaseTestingUtility(); - final FileSystem fs = htu.getTestFileSystem(); - final Path rootdir = htu.getDataTestDir(); - assertNull(FSUtils.getVersion(fs, rootdir)); - // Write out old format version file. See if we can read it in and convert. - Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); - FSDataOutputStream s = fs.create(versionFile); - final String version = HConstants.FILE_SYSTEM_VERSION; - s.writeUTF(version); - s.close(); - assertTrue(fs.exists(versionFile)); - FileStatus [] status = fs.listStatus(versionFile); - assertNotNull(status); - assertTrue(status.length > 0); - String newVersion = FSUtils.getVersion(fs, rootdir); - assertEquals(version.length(), newVersion.length()); - assertEquals(version, newVersion); - // File will have been converted. Exercise the pb format - assertEquals(version, FSUtils.getVersion(fs, rootdir)); - FSUtils.checkVersion(fs, rootdir, true); + @Before + public void setUp() throws IOException { + htu = new HBaseTestingUtility(); + conf = htu.getConfiguration(); } - @Test public void testIsHDFS() throws Exception { + @Test + public void testIsHDFS() throws Exception { HBaseTestingUtility htu = new HBaseTestingUtility(); htu.getConfiguration().setBoolean("dfs.support.append", false); assertFalse(FSUtils.isHDFS(htu.getConfiguration())); @@ -236,10 +194,32 @@ public class TestFSUtils { } @Test - public void testPermMask() throws Exception { + public void testVersion() throws DeserializationException, IOException { + final Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); + assertNull(FSUtils.getVersion(fs, rootdir)); + // Write out old format version file. See if we can read it in and convert. + Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); + FSDataOutputStream s = fs.create(versionFile); + final String version = HConstants.FILE_SYSTEM_VERSION; + s.writeUTF(version); + s.close(); + assertTrue(fs.exists(versionFile)); + FileStatus [] status = fs.listStatus(versionFile); + assertNotNull(status); + assertTrue(status.length > 0); + String newVersion = FSUtils.getVersion(fs, rootdir); + assertEquals(version.length(), newVersion.length()); + assertEquals(version, newVersion); + // File will have been converted. Exercise the pb format + assertEquals(version, FSUtils.getVersion(fs, rootdir)); + FSUtils.checkVersion(fs, rootdir, true); + } - Configuration conf = HBaseConfiguration.create(); - FileSystem fs = FileSystem.get(conf); + @Test + public void testPermMask() throws Exception { + final Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); // default fs permission FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf, @@ -278,10 +258,9 @@ public class TestFSUtils { @Test public void testDeleteAndExists() throws Exception { - HBaseTestingUtility htu = new HBaseTestingUtility(); - Configuration conf = htu.getConfiguration(); + final Path rootdir = htu.getDataTestDir(); + final FileSystem fs = rootdir.getFileSystem(conf); conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true); - FileSystem fs = FileSystem.get(conf); FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); // then that the correct file is created String file = UUID.randomUUID().toString(); @@ -306,6 +285,7 @@ public class TestFSUtils { } } + @Test public void testFilteredStatusDoesNotThrowOnNotFound() throws Exception { HBaseTestingUtility htu = new HBaseTestingUtility(); @@ -357,6 +337,26 @@ public class TestFSUtils { } } + @Test + public void testSetStoragePolicyDefault() throws Exception { + verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); + } + + /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ + @Test + public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { + verifyFileInDirWithStoragePolicy("ALL_SSD"); + } + + /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ + @Test + public void testSetStoragePolicyInvalid() throws Exception { + verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY); + } + + final String INVALID_STORAGE_POLICY = "1772"; + + // Here instead of TestCommonFSUtils because we need a minicluster private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception { HBaseTestingUtility htu = new HBaseTestingUtility(); Configuration conf = htu.getConfiguration(); @@ -397,108 +397,42 @@ public class TestFSUtils { } } - @Test - public void testSetStoragePolicyDefault() throws Exception { - verifyNoHDFSApiInvocationForDefaultPolicy(); - verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY); + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { + assertTrue(fileSys.exists(name)); + assertTrue(fileSys.delete(name, true)); + assertTrue(!fileSys.exists(name)); } - /** - * Note: currently the default policy is set to defer to HDFS and this case is to verify the - * logic, will need to remove the check if the default policy is changed - */ - private void verifyNoHDFSApiInvocationForDefaultPolicy() { - FileSystem testFs = new AlwaysFailSetStoragePolicyFileSystem(); - // There should be no exception thrown when setting to default storage policy, which indicates - // the HDFS API hasn't been called - try { - FSUtils.setStoragePolicy(testFs, new Path("non-exist"), HConstants.DEFAULT_WAL_STORAGE_POLICY, - true); - } catch (IOException e) { - Assert.fail("Should have bypassed the FS API when setting default storage policy"); - } - // There should be exception thrown when given non-default storage policy, which indicates the - // HDFS API has been called - try { - FSUtils.setStoragePolicy(testFs, new Path("non-exist"), "HOT", true); - Assert.fail("Should have invoked the FS API but haven't"); - } catch (IOException e) { - // expected given an invalid path - } - } + private static final boolean STREAM_CAPABILITIES_IS_PRESENT; - class AlwaysFailSetStoragePolicyFileSystem extends DistributedFileSystem { - @Override - public void setStoragePolicy(final Path src, final String policyName) throws IOException { - throw new IOException("The setStoragePolicy method is invoked"); + static { + boolean tmp = false; + try { + Class.forName("org.apache.hadoop.fs.StreamCapabilities"); + tmp = true; + LOG.debug("Test thought StreamCapabilities class was present."); + } catch (ClassNotFoundException exception) { + LOG.debug("Test didn't think StreamCapabilities class was present."); + } finally { + STREAM_CAPABILITIES_IS_PRESENT = tmp; } } - /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */ + // Here instead of TestCommonFSUtils because we need a minicluster @Test - public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception { - verifyFileInDirWithStoragePolicy("ALL_SSD"); - } - - final String INVALID_STORAGE_POLICY = "1772"; - - /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */ - @Test - public void testSetStoragePolicyInvalid() throws Exception { - verifyFileInDirWithStoragePolicy(INVALID_STORAGE_POLICY); - } - - @Test - public void testSetWALRootDir() throws Exception { - HBaseTestingUtility htu = new HBaseTestingUtility(); - Configuration conf = htu.getConfiguration(); - Path p = new Path("file:///hbase/root"); - FSUtils.setWALRootDir(conf, p); - assertEquals(p.toString(), conf.get(HFileSystem.HBASE_WAL_DIR)); - } - - @Test - public void testGetWALRootDir() throws IOException { - HBaseTestingUtility htu = new HBaseTestingUtility(); - Configuration conf = htu.getConfiguration(); - Path root = new Path("file:///hbase/root"); - Path walRoot = new Path("file:///hbase/logroot"); - FSUtils.setRootDir(conf, root); - assertEquals(FSUtils.getRootDir(conf), root); - assertEquals(FSUtils.getWALRootDir(conf), root); - FSUtils.setWALRootDir(conf, walRoot); - assertEquals(FSUtils.getWALRootDir(conf), walRoot); - } - - @Test(expected=IllegalStateException.class) - public void testGetWALRootDirIllegalWALDir() throws IOException { - HBaseTestingUtility htu = new HBaseTestingUtility(); - Configuration conf = htu.getConfiguration(); - Path root = new Path("file:///hbase/root"); - Path invalidWALDir = new Path("file:///hbase/root/logroot"); - FSUtils.setRootDir(conf, root); - FSUtils.setWALRootDir(conf, invalidWALDir); - FSUtils.getWALRootDir(conf); - } - - @Test - public void testRemoveWALRootPath() throws Exception { - HBaseTestingUtility htu = new HBaseTestingUtility(); - Configuration conf = htu.getConfiguration(); - FSUtils.setRootDir(conf, new Path("file:///user/hbase")); - Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile"); - Path tmpFile = new Path("file:///test/testfile"); - assertEquals(FSUtils.removeWALRootPath(testFile, conf), "test/testfile"); - assertEquals(FSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString()); - FSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir")); - assertEquals(FSUtils.removeWALRootPath(testFile, conf), testFile.toString()); - Path logFile = new Path(FSUtils.getWALRootDir(conf), "test/testlog"); - assertEquals(FSUtils.removeWALRootPath(logFile, conf), "test/testlog"); + public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception { + MiniDFSCluster cluster = htu.startMiniDFSCluster(1); + try (FileSystem filesystem = cluster.getFileSystem()) { + FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar")); + assertTrue(FSUtils.hasCapability(stream, "hsync")); + assertTrue(FSUtils.hasCapability(stream, "hflush")); + assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " + + "StreamCapabilities class is not defined.", + STREAM_CAPABILITIES_IS_PRESENT, + FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add.")); + } finally { + cluster.shutdown(); + } } - private void cleanupFile(FileSystem fileSys, Path name) throws IOException { - assertTrue(fileSys.exists(name)); - assertTrue(fileSys.delete(name, true)); - assertTrue(!fileSys.exists(name)); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 97fa2c07bd..01d0ef410d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -30,14 +30,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import static org.apache.hadoop.hbase.wal.DefaultWALProvider.DEFAULT_PROVIDER_ID; import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID; import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; - // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; @@ -102,7 +101,7 @@ public class IOTestProvider implements WALProvider { providerId = DEFAULT_PROVIDER_ID; } final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; - log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), + log = new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), DefaultWALProvider.getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); @@ -186,7 +185,12 @@ public class IOTestProvider implements WALProvider { if (!initialized || doFileRolls) { LOG.info("creating new writer instance."); final ProtobufLogWriter writer = new IOTestWriter(); - writer.init(fs, path, conf, false); + try { + writer.init(fs, path, conf, false); + } catch (CommonFSUtils.StreamLacksCapabilityException exception) { + throw new IOException("Can't create writer instance because underlying FileSystem " + + "doesn't support needed stream capabilities.", exception); + } if (!initialized) { LOG.info("storing initial writer instance in case file rolling isn't allowed."); noRollsWriter = writer; @@ -209,7 +213,8 @@ public class IOTestProvider implements WALProvider { private boolean doSyncs; @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException { + public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) + throws IOException, CommonFSUtils.StreamLacksCapabilityException { Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS); if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { doAppends = doSyncs = true; -- 2.20.1