Index: src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java (working copy) @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.regionserver.wal.HLogFileSystem; +import org.apache.hadoop.hbase.util.Threads; + +/** + * An abstraction of the underlying filesystem. This is used by other entities such as + * {@link HLogFileSystem}, to make calls to the underlying filesystem. + * + */ +public abstract class HBaseFileSystem { + + public static final Log LOG = LogFactory.getLog(HBaseFileSystem.class); + + /** + * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the + * client level. + */ + protected static int hdfsClientRetriesNumber; + private static int baseSleepBeforeRetries; + private static final int DEFAULT_HDFS_CLIENT_RETRIES_NUMBER = 10; + private static final int DEFAULT_BASE_SLEEP_BEFORE_RETRIES = 1000; + + + /** + * Deletes a file. Assumes the user has already checked for this directory existence. + * @param dir + * @param fs + * @param conf + * @return true if the directory is deleted. + * @throws IOException + */ + public static boolean deleteFileFromFileSystem(FileSystem fs, Configuration conf, Path dir) + throws IOException { + IOException lastIOE = null; + int i = 0; + checkAndSetRetryCounts(conf); + do { + try { + return fs.delete(dir, false); + } catch (IOException ioe) { + lastIOE = ioe; + if (!fs.exists(dir)) return true; + // dir is there, retry deleting after some time. + sleepBeforeRetry("Delete File", i + 1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in deleteFileFromFileSystem", lastIOE); + } + + + /** + * Deletes a directory. Assumes the user has already checked for this directory existence. + * @param dir + * @param fs + * @param conf + * @return true if the directory is deleted. + * @throws IOException + */ + public static boolean deleteDirFromFileSystem(FileSystem fs, Configuration conf, Path dir) + throws IOException { + IOException lastIOE = null; + int i = 0; + checkAndSetRetryCounts(conf); + do { + try { + return fs.delete(dir, true); + } catch (IOException ioe) { + lastIOE = ioe; + if (!fs.exists(dir)) return true; + // dir is there, retry deleting after some time. + sleepBeforeRetry("Delete Dir", i + 1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in deleteDirFromFileSystem", lastIOE); + } + + protected static void checkAndSetRetryCounts(Configuration conf) { + if (hdfsClientRetriesNumber == 0) { + hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", + DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); + baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", + DEFAULT_BASE_SLEEP_BEFORE_RETRIES); + } + } + + /** + * Creates a directory for a filesystem and configuration object. Assumes the user has already + * checked for this directory existence. + * @param fs + * @param conf + * @param dir + * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks + * whether the directory exists or not, and returns true if it exists. + * @throws IOException + */ + public static boolean makeDirOnFileSystem(FileSystem fs, Configuration conf, Path dir) + throws IOException { + int i = 0; + IOException lastIOE = null; + checkAndSetRetryCounts(conf); + do { + try { + return fs.mkdirs(dir); + } catch (IOException ioe) { + lastIOE = ioe; + if (fs.exists(dir)) return true; // directory is present + sleepBeforeRetry("Create Directory", i+1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in makeDirOnFileSystem", lastIOE); + } + + /** + * Renames a directory. Assumes the user has already checked for this directory existence. + * @param src + * @param fs + * @param dst + * @param conf + * @return true if the directory is renamed. + * @throws IOException + */ + public static boolean renameDirForFileSystem(FileSystem fs, Configuration conf, Path src, Path dst) + throws IOException { + IOException lastIOE = null; + int i = 0; + checkAndSetRetryCounts(conf); + do { + try { + return fs.rename(src, dst); + } catch (IOException ioe) { + lastIOE = ioe; + if (!fs.exists(src) && fs.exists(dst)) return true; + // src is there, retry renaming after some time. + sleepBeforeRetry("Rename Directory", i + 1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in renameDirForFileSystem", lastIOE); + } + +/** + * Creates a path on the file system. Checks whether the path exists already or not, and use it + * for retrying in case underlying fs throws an exception. + * @param fs + * @param conf + * @param dir + * @param overwrite + * @return + * @throws IOException + */ + public static FSDataOutputStream createPathOnFileSystem(FileSystem fs, Configuration conf, Path dir, + boolean overwrite) throws IOException { + int i = 0; + boolean existsBefore = fs.exists(dir); + IOException lastIOE = null; + checkAndSetRetryCounts(conf); + do { + try { + return fs.create(dir, overwrite); + } catch (IOException ioe) { + lastIOE = ioe; + // directory is present, don't overwrite + if (!existsBefore && fs.exists(dir)) return fs.create(dir, false); + sleepBeforeRetry("Create Path", i + 1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in createPathOnFileSystem", lastIOE); + } + + /** + * Creates the specified file with the given permission. + * @param fs + * @param path + * @param perm + * @param overwrite + * @return + * @throws IOException + */ + public static FSDataOutputStream createPathWithPermsOnFileSystem(FileSystem fs, + Configuration conf, Path path, FsPermission perm, boolean overwrite) throws IOException { + int i = 0; + IOException lastIOE = null; + boolean existsBefore = fs.exists(path); + checkAndSetRetryCounts(conf); + do { + try { + return fs.create(path, perm, overwrite, fs.getConf().getInt("io.file.buffer.size", 4096), + fs.getDefaultReplication(), fs.getDefaultBlockSize(), null); + } catch (IOException ioe) { + lastIOE = ioe; + // path is present now, don't overwrite + if (!existsBefore && fs.exists(path)) return fs.create(path, false); + // if it existed before let's retry in case we get IOE, as we can't rely on fs.exists() + sleepBeforeRetry("Create Path with Perms", i + 1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in createPathWithPermsOnFileSystem", lastIOE); + } + +/** + * Creates the file. Assumes the user has already checked for this file existence. + * @param fs + * @param conf + * @param dir + * @return result true if the file is created with this call, false otherwise. + * @throws IOException + */ + public static boolean createNewFileOnFileSystem(FileSystem fs, Configuration conf, Path file) + throws IOException { + int i = 0; + IOException lastIOE = null; + checkAndSetRetryCounts(conf); + do { + try { + return fs.createNewFile(file); + } catch (IOException ioe) { + lastIOE = ioe; + if (fs.exists(file)) return true; // file exists now, return true. + sleepBeforeRetry("Create NewFile", i + 1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in createNewFileOnFileSystem", lastIOE); + } + + /** + * sleeping logic for static methods; handles the interrupt exception. Keeping a static version + * for this to avoid re-looking for the integer values. + */ + protected static void sleepBeforeRetry(String msg, int sleepMultiplier) { + if (sleepMultiplier > hdfsClientRetriesNumber) { + LOG.warn(msg + ", retries exhausted"); + return; + } + LOG.info(msg + ", sleeping " + baseSleepBeforeRetries + " times " + sleepMultiplier); + Threads.sleep(baseSleepBeforeRetries * sleepMultiplier); + } +} Index: src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -216,7 +217,7 @@ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family); // make sure we don't archive if we can't and that the archive dir exists - if (!fs.mkdirs(storeArchiveDir)) { + if (!HBaseFileSystem.makeDirOnFileSystem(fs, conf, storeArchiveDir)) { throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:" + Bytes.toString(family) + ", deleting compacted files instead."); } @@ -250,7 +251,7 @@ Configuration conf, Path tableDir, byte[] family, Path storeFile) throws IOException { Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); // make sure we don't archive if we can't and that the archive dir exists - if (!fs.mkdirs(storeArchiveDir)) { + if (!HBaseFileSystem.makeDirOnFileSystem(fs, conf, storeArchiveDir)) { throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:" + Bytes.toString(family) + ", deleting compacted files instead."); } @@ -318,7 +319,7 @@ // make sure the archive directory exists if (!fs.exists(baseArchiveDir)) { - if (!fs.mkdirs(baseArchiveDir)) { + if (!HBaseFileSystem.makeDirOnFileSystem(fs, fs.getConf(), baseArchiveDir)) { throw new IOException("Failed to create the archive directory:" + baseArchiveDir + ", quitting archive attempt."); } @@ -385,11 +386,12 @@ // move the archive file to the stamped backup Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime); - if (!fs.rename(archiveFile, backedupArchiveFile)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, fs.getConf(), archiveFile, + backedupArchiveFile)) { LOG.error("Could not rename archive file to backup: " + backedupArchiveFile + ", deleting existing file in favor of newer."); // try to delete the exisiting file, if we can't rename it - if (!fs.delete(archiveFile, false)) { + if (!HBaseFileSystem.deleteFileFromFileSystem(fs, fs.getConf(), archiveFile)) { throw new IOException("Couldn't delete existing archive file (" + archiveFile + ") or rename it to the backup file (" + backedupArchiveFile + ") to make room for similarly named file."); @@ -410,10 +412,9 @@ // the cleaner has removed our archive directory (HBASE-7643). // (we're in a retry loop, so don't worry too much about the exception) try { - if (!fs.exists(archiveDir)) { - if (fs.mkdirs(archiveDir)) { - LOG.debug("Created archive directory:" + archiveDir); - } + if (!fs.exists(archiveDir) + && HBaseFileSystem.makeDirOnFileSystem(fs, fs.getConf(), archiveDir)) { + LOG.debug("Created archive directory:" + archiveDir); } } catch (IOException e) { LOG.warn("Failed to create the archive directory: " + archiveDir, e); @@ -476,7 +477,7 @@ */ private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir) throws IOException { - if (fs.delete(regionDir, true)) { + if (HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), regionDir)) { LOG.debug("Deleted all region files in: " + regionDir); return true; } @@ -610,7 +611,7 @@ public boolean moveAndClose(Path dest) throws IOException { this.close(); Path p = this.getPath(); - return fs.rename(p, dest); + return HBaseFileSystem.renameDirForFileSystem(fs, fs.getConf(), p, dest); } /** @@ -641,7 +642,8 @@ @Override public void delete() throws IOException { - if (!fs.delete(file, true)) throw new IOException("Failed to delete:" + this.file); + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), file)) + throw new IOException("Failed to delete:" + this.file); } @Override Index: src/main/java/org/apache/hadoop/hbase/io/Reference.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/Reference.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/io/Reference.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -125,7 +126,7 @@ public Path write(final FileSystem fs, final Path p) throws IOException { - FSDataOutputStream out = fs.create(p, false); + FSDataOutputStream out = HBaseFileSystem.createPathOnFileSystem(fs, fs.getConf(), p, false); try { write(out); } finally { Index: src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -152,7 +153,7 @@ // Make sure the region servers can archive their old logs if(!this.fs.exists(oldLogDir)) { - this.fs.mkdirs(oldLogDir); + HBaseFileSystem.makeDirOnFileSystem(fs, conf, oldLogDir); } return oldLogDir; @@ -322,7 +323,7 @@ Path splitDir = logDir.suffix(HLog.SPLITTING_EXT); // rename the directory so a rogue RS doesn't create more HLogs if (fs.exists(logDir)) { - if (!this.fs.rename(logDir, splitDir)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, logDir, splitDir)) { throw new IOException("Failed fs.rename for log split: " + logDir); } logDir = splitDir; @@ -415,7 +416,7 @@ // Filesystem is good. Go ahead and check for hbase.rootdir. try { if (!fs.exists(rd)) { - fs.mkdirs(rd); + HBaseFileSystem.makeDirOnFileSystem(fs, c, rd); // DFS leaves safe mode with 0 DNs when there are 0 blocks. // We used to handle this by checking the current DN count and waiting until // it is nonzero. With security, the check for datanode count doesn't work -- @@ -479,13 +480,13 @@ HFileArchiver.archiveRegion(fs, this.rootdir, tabledir, regiondir); } } - if (!fs.delete(tmpdir, true)) { + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, c, tmpdir)) { throw new IOException("Unable to clean the temp directory: " + tmpdir); } } // Create the temp directory - if (!fs.mkdirs(tmpdir)) { + if (!HBaseFileSystem.makeDirOnFileSystem(fs, c, tmpdir)) { throw new IOException("HBase temp directory '" + tmpdir + "' creation failure."); } } @@ -553,7 +554,7 @@ } public void deleteTable(byte[] tableName) throws IOException { - fs.delete(new Path(rootdir, Bytes.toString(tableName)), true); + HBaseFileSystem.deleteDirFromFileSystem(fs, conf, new Path(rootdir, Bytes.toString(tableName))); } /** @@ -566,11 +567,11 @@ Path tempPath = new Path(this.tempdir, path.getName()); // Ensure temp exists - if (!fs.exists(tempdir) && !fs.mkdirs(tempdir)) { + if (!fs.exists(tempdir) && !HBaseFileSystem.makeDirOnFileSystem(fs, conf, tempdir)) { throw new IOException("HBase temp directory '" + tempdir + "' creation failure."); } - if (!fs.rename(path, tempPath)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, path, tempPath)) { throw new IOException("Unable to move '" + path + "' to temp '" + tempPath + "'"); } @@ -602,7 +603,7 @@ // delete the family folder Path familyDir = new Path(tableDir, new Path(region.getEncodedName(), Bytes.toString(familyName))); - if (fs.delete(familyDir, true) == false) { + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, conf, familyDir)) { throw new IOException("Could not delete family " + Bytes.toString(familyName) + " from FileSystem for region " + region.getRegionNameAsString() + "(" + region.getEncodedName() Index: src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; @@ -295,7 +296,7 @@ for(Path logDir: logDirs){ status.setStatus("Cleaning up log directory..."); try { - if (fs.exists(logDir) && !fs.delete(logDir, false)) { + if (fs.exists(logDir) && !HBaseFileSystem.deleteFileFromFileSystem(fs, conf, logDir)) { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); } } catch (IOException ioe) { Index: src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.FSUtils; @@ -153,7 +154,7 @@ // if the directory doesn't exist, then we are done if (children == null) { try { - return fs.delete(toCheck, false); + return HBaseFileSystem.deleteFileFromFileSystem(fs, conf, toCheck); } catch (IOException e) { if (LOG.isTraceEnabled()) { LOG.trace("Couldn't delete directory: " + toCheck, e); @@ -185,7 +186,7 @@ // delete this directory. However, don't do so recursively so we don't delete files that have // been added since we last checked. try { - return fs.delete(toCheck, false); + return HBaseFileSystem.deleteFileFromFileSystem(fs, conf, toCheck); } catch (IOException e) { if (LOG.isTraceEnabled()) { LOG.trace("Couldn't delete directory: " + toCheck, e); @@ -207,7 +208,7 @@ // first check to see if the path is valid if (!validate(filePath)) { LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it."); - boolean success = this.fs.delete(filePath, true); + boolean success = HBaseFileSystem.deleteDirFromFileSystem(fs, conf, filePath); if (!success) LOG.warn("Attempted to delete:" + filePath + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); @@ -233,7 +234,7 @@ if (LOG.isTraceEnabled()) { LOG.trace("Removing:" + filePath + " from archive"); } - boolean success = this.fs.delete(filePath, false); + boolean success = HBaseFileSystem.deleteFileFromFileSystem(fs, conf, filePath); if (!success) { LOG.warn("Attempted to delete:" + filePath + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); Index: src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; @@ -174,7 +175,7 @@ List regionInfos = handleCreateHdfsRegions(tempdir, tableName); // 3. Move Table temp directory to the hbase root location - if (!fs.rename(tempTableDir, tableDir)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, tempTableDir, tableDir)) { throw new IOException("Unable to move table from temp=" + tempTableDir + " to hbase root=" + tableDir); } Index: src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (working copy) @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.backup.HFileArchiver; @@ -89,7 +90,7 @@ } // 5. Delete table from FS (temp directory) - if (!fs.delete(tempTableDir, true)) { + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), tempTableDir)) { LOG.error("Couldn't delete " + tempTableDir); } } finally { Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; @@ -680,7 +681,7 @@ final Path initialFiles, final Path regiondir) throws IOException { if (initialFiles != null && fs.exists(initialFiles)) { - if (!fs.rename(initialFiles, regiondir)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, fs.getConf(), initialFiles, regiondir)) { LOG.warn("Unable to rename " + initialFiles + " to " + regiondir); } } @@ -842,7 +843,7 @@ } finally { out.close(); } - if (!fs.rename(tmpPath, regioninfoPath)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, tmpPath, regioninfoPath)) { throw new IOException("Unable to rename " + tmpPath + " to " + regioninfoPath); } @@ -2697,7 +2698,7 @@ // open and read the files as well). LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file); Path referenceFile = new Path(dstStoreDir, file.getName()); - boolean success = fs.createNewFile(referenceFile); + boolean success = HBaseFileSystem.createNewFileOnFileSystem(fs, conf, referenceFile); if (!success) { throw new IOException("Failed to create reference file:" + referenceFile); } @@ -3106,7 +3107,7 @@ } // Now delete the content of recovered edits. We're done w/ them. for (Path file: files) { - if (!this.fs.delete(file, false)) { + if (!HBaseFileSystem.deleteFileFromFileSystem(fs, conf, file)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); @@ -3298,7 +3299,7 @@ FileStatus stat = fs.getFileStatus(p); if (stat.getLen() > 0) return false; LOG.warn("File " + p + " is zero-length, deleting."); - fs.delete(p, false); + HBaseFileSystem.deleteFileFromFileSystem(fs, fs.getConf(), p); return true; } @@ -4221,7 +4222,7 @@ HTableDescriptor.getTableDir(rootDir, info.getTableName()); Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName()); FileSystem fs = FileSystem.get(conf); - fs.mkdirs(regionDir); + HBaseFileSystem.makeDirOnFileSystem(fs, conf, regionDir); // Write HRI to a file in case we need to recover .META. writeRegioninfoOnFilesystem(info, regionDir, fs, conf); HLog effectiveHLog = hlog; @@ -4417,7 +4418,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("DELETING region " + regiondir.toString()); } - if (!fs.delete(regiondir, true)) { + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), regiondir)) { LOG.warn("Failed delete of " + regiondir); } } @@ -4463,7 +4464,7 @@ final HRegionInfo hri, byte [] colFamily) throws IOException { Path dir = Store.getStoreHomedir(tabledir, hri.getEncodedName(), colFamily); - if (!fs.mkdirs(dir)) { + if (!HBaseFileSystem.makeDirOnFileSystem(fs, fs.getConf(), dir)) { LOG.warn("Failed to create " + dir); } } @@ -4573,7 +4574,7 @@ throw new IOException("Cannot merge; target file collision at " + newRegionDir); } - fs.mkdirs(newRegionDir); + HBaseFileSystem.makeDirOnFileSystem(fs, conf, newRegionDir); LOG.info("starting merge of regions: " + a + " and " + b + " into new region " + newRegionInfo.toString() + Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (working copy) @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseFileSystem; + +/** + * Acts as an abstraction layer b/w HBase and underlying fs. Used for making non-idempotent calls. + * This is useful as it can have a retry logic for such operations, as they are not retried at + * hdfs level. + * Region specific methods that access fs should be added here. + * + */ +public class HRegionFileSystem extends HBaseFileSystem { + public static final Log LOG = LogFactory.getLog(HRegionFileSystem.class); + + public HRegionFileSystem(Configuration conf) { + checkAndSetRetryCounts(conf); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -551,12 +552,13 @@ if (fs.exists(splitdir)) { LOG.info("The " + splitdir + " directory exists. Hence deleting it to recreate it"); - if (!fs.delete(splitdir, true)) { + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), splitdir)) { throw new IOException("Failed deletion of " + splitdir + " before creating them again."); } } - if (!fs.mkdirs(splitdir)) throw new IOException("Failed create of " + splitdir); + if (!HBaseFileSystem.makeDirOnFileSystem(fs, fs.getConf(), splitdir)) + throw new IOException("Failed create of " + splitdir); } private static void cleanupSplitDir(final FileSystem fs, final Path splitdir) @@ -577,7 +579,7 @@ throws IOException { if (!fs.exists(dir)) { if (mustPreExist) throw new IOException(dir.toString() + " does not exist!"); - } else if (!fs.delete(dir, true)) { + } else if (!HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), dir)) { throw new IOException("Failed delete of " + dir); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -288,8 +289,7 @@ */ Path createStoreHomeDir(final FileSystem fs, final Path homedir) throws IOException { - if (!fs.exists(homedir)) { - if (!fs.mkdirs(homedir)) + if (!fs.exists(homedir) && !HBaseFileSystem.makeDirOnFileSystem(fs, fs.getConf(), homedir)) { throw new IOException("Failed create of: " + homedir.toString()); } return homedir; @@ -879,7 +879,7 @@ String msg = "Renaming flushed file at " + path + " to " + dstPath; LOG.debug(msg); status.setStatus("Flushing " + this + ": " + msg); - if (!fs.rename(path, dstPath)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, path, dstPath)) { LOG.warn("Unable to rename " + path + " to " + dstPath); } @@ -1649,7 +1649,7 @@ Path origPath = compactedFile.getPath(); Path destPath = new Path(homedir, origPath.getName()); LOG.info("Renaming compacted file at " + origPath + " to " + destPath); - if (!fs.rename(origPath, destPath)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, origPath, destPath)) { LOG.error("Failed move of compacted file " + origPath + " to " + destPath); throw new IOException("Failed move of compacted file " + origPath + Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; @@ -674,7 +675,7 @@ */ public void deleteReader() throws IOException { closeReader(true); - this.fs.delete(getPath(), true); + HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), getPath()); } @Override @@ -717,7 +718,7 @@ if (!fs.exists(src)) { throw new FileNotFoundException(src.toString()); } - if (!fs.rename(src, tgt)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, fs.getConf(), src, tgt)) { throw new IOException("Failed rename of " + src + " to " + tgt); } return tgt; @@ -850,7 +851,7 @@ } if (!fs.exists(dir)) { - fs.mkdirs(dir); + HBaseFileSystem.makeDirOnFileSystem(fs, conf, dir); } if (filePath == null) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -135,6 +136,7 @@ private final FileSystem fs; private final Path dir; private final Configuration conf; + private final HLogFileSystem hlogFs; // Listeners that are called on WAL events. private List listeners = new CopyOnWriteArrayList(); @@ -397,6 +399,7 @@ this.fs = fs; this.dir = dir; this.conf = conf; + this.hlogFs = new HLogFileSystem(conf); if (listeners != null) { for (WALActionsListener i: listeners) { registerWALActionsListener(i); @@ -413,14 +416,12 @@ if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) { throw new IOException("Target HLog directory already exists: " + dir); } - if (!dirExists && !fs.mkdirs(dir)) { + if (!dirExists && !HBaseFileSystem.makeDirOnFileSystem(fs, conf, dir)) { throw new IOException("Unable to mkdir " + dir); } this.oldLogDir = oldLogDir; - if (!fs.exists(oldLogDir)) { - if (!fs.mkdirs(this.oldLogDir)) { - throw new IOException("Unable to mkdir " + this.oldLogDir); - } + if (!fs.exists(oldLogDir) && !HBaseFileSystem.makeDirOnFileSystem(fs, conf, oldLogDir)) { + throw new IOException("Unable to mkdir " + this.oldLogDir); } this.forMeta = forMeta; this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); @@ -711,7 +712,7 @@ if (forMeta) { //TODO: set a higher replication for the hlog files (HBASE-6773) } - return createWriter(fs, path, conf); + return this.hlogFs.createWriter(fs, conf, path); } /** @@ -930,7 +931,7 @@ i.preLogArchive(p, newPath); } } - if (!this.fs.rename(p, newPath)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, p, newPath)) { throw new IOException("Unable to rename " + p + " to " + newPath); } // Tell our listeners that a log has been archived. @@ -993,7 +994,7 @@ } } - if (!fs.rename(file.getPath(),p)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, file.getPath(),p)) { throw new IOException("Unable to rename " + file.getPath() + " to " + p); } // Tell our listeners that a log was archived. @@ -1005,7 +1006,7 @@ } LOG.debug("Moved " + files.length + " log files to " + FSUtils.getPath(this.oldLogDir)); - if (!fs.delete(dir, true)) { + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, conf, dir)) { LOG.info("Unable to delete " + dir); } } @@ -1887,7 +1888,7 @@ throws IOException { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); - if (!fs.rename(edits, moveAsideName)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, fs.getConf(), edits, moveAsideName)) { LOG.warn("Rename failed from " + edits + " to " + moveAsideName); } return moveAsideName; Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFileSystem.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFileSystem.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFileSystem.java (working copy) @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; + +/** + * Acts as an abstraction between the HLog and the underlying filesystem. This is analogous to the + * {@link HRegionFileSystem} class. + */ +public class HLogFileSystem extends HBaseFileSystem { + public static final Log LOG = LogFactory.getLog(HLogFileSystem.class); + + /** + * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the + * client level. + */ + + public HLogFileSystem(Configuration conf) { + checkAndSetRetryCounts(conf); + } + + /** + * Creates writer for the given path. + * @param fs + * @param conf + * @param hlogFile + * @return an init'ed writer for the given path. + * @throws IOException + */ + public Writer createWriter(FileSystem fs, Configuration conf, Path hlogFile) throws IOException { + int i = 0; + IOException lastIOE = null; + do { + try { + return HLog.createWriter(fs, hlogFile, conf); + } catch (IOException ioe) { + lastIOE = ioe; + sleepBeforeRetry("Create Writer", i+1); + } + } while (++i <= hdfsClientRetriesNumber); + throw new IOException("Exception in createWriter", lastIOE); + + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -41,7 +41,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -91,6 +91,7 @@ protected final Path oldLogDir; protected final FileSystem fs; protected final Configuration conf; + private final HLogFileSystem hlogFs; // Major subcomponents of the split process. // These are separated into inner classes to make testing easier. @@ -163,6 +164,7 @@ conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); outputSink = new OutputSink(); + this.hlogFs = new HLogFileSystem(conf); } /** @@ -482,7 +484,7 @@ LOG.warn("Found existing old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + dst + ", length=" + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { + if (!HBaseFileSystem.deleteFileFromFileSystem(fs, conf, dst)) { LOG.warn("Failed deleting of old " + dst); throw new IOException("Failed deleting of old " + dst); } @@ -491,7 +493,7 @@ // data without touching disk. TestHLogSplit#testThreading is an // example. if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.debug("Rename " + wap.p + " to " + dst); @@ -558,7 +560,7 @@ } archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); - fs.delete(stagingDir, true); + HBaseFileSystem.deleteDirFromFileSystem(fs, conf, stagingDir); } /** @@ -581,17 +583,17 @@ final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get( "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME)); - if (!fs.mkdirs(corruptDir)) { + if (!HBaseFileSystem.makeDirOnFileSystem(fs, conf, corruptDir)) { LOG.info("Unable to mkdir " + corruptDir); } - fs.mkdirs(oldLogDir); + HBaseFileSystem.makeDirOnFileSystem(fs, conf, oldLogDir); // this method can get restarted or called multiple times for archiving // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); if (fs.exists(corrupted)) { - if (!fs.rename(corrupted, p)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, corrupted, p)) { LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); } else { LOG.warn("Moving corrupted log " + corrupted + " to " + p); @@ -602,7 +604,7 @@ for (Path p : processedLogs) { Path newPath = HLog.getHLogArchivePath(oldLogDir, p); if (fs.exists(p)) { - if (!fs.rename(p, newPath)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, p, newPath)) { LOG.warn("Unable to move " + p + " to " + newPath); } else { LOG.debug("Archived processed log " + p + " to " + newPath); @@ -612,7 +614,7 @@ // distributed log splitting removes the srcDir (region's log dir) later // when all the log files in that srcDir have been successfully processed - if (srcDir != null && !fs.delete(srcDir, true)) { + if (srcDir != null && !HBaseFileSystem.deleteDirFromFileSystem(fs, conf, srcDir)) { throw new IOException("Unable to delete src dir: " + srcDir); } } @@ -644,8 +646,9 @@ " already split so it's safe to discard those edits."); return null; } - if (isCreate && !fs.exists(dir)) { - if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); + if (isCreate && !fs.exists(dir) && + !HBaseFileSystem.makeDirOnFileSystem(fs, fs.getConf(), dir)) { + LOG.warn("mkdir failed on " + dir); } // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure // region's replayRecoveredEdits will not delete it @@ -816,7 +819,7 @@ */ protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { - return HLog.createWriter(fs, logfile, conf); + return hlogFs.createWriter(fs, conf, logfile); } /** @@ -1063,7 +1066,7 @@ + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { + if (!HBaseFileSystem.deleteFileFromFileSystem(fs, conf, regionedits)) { LOG.warn("Failed delete of old " + regionedits); } } @@ -1089,13 +1092,13 @@ + "result of a previous failed split attempt. Deleting " + ret + ", length=" + fs.getFileStatus(ret).getLen()); - if (!fs.delete(ret, false)) { + if (!HBaseFileSystem.deleteFileFromFileSystem(fs, conf, ret)) { LOG.warn("Failed delete of old " + ret); } } Path dir = ret.getParent(); - if (!fs.exists(dir)) { - if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); + if (!fs.exists(dir) && !HBaseFileSystem.makeDirOnFileSystem(fs, conf, dir)) { + LOG.warn("mkdir failed on " + dir); } } catch (IOException e) { LOG.warn("Could not prepare temp staging area ", e); @@ -1188,7 +1191,7 @@ LOG.warn("Found existing old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + dst + ", length=" + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { + if (!HBaseFileSystem.deleteFileFromFileSystem(fs, conf, dst)) { LOG.warn("Failed deleting of old " + dst); throw new IOException("Failed deleting of old " + dst); } @@ -1197,7 +1200,7 @@ // the data without touching disk. TestHLogSplit#testThreading is an // example. if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, conf, wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.debug("Rename " + wap.p + " to " + dst); Index: src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableDescriptors; @@ -224,7 +225,7 @@ if (!this.fsreadonly) { Path tabledir = FSUtils.getTablePath(this.rootdir, tablename); if (this.fs.exists(tabledir)) { - if (!this.fs.delete(tabledir, true)) { + if (!HBaseFileSystem.deleteDirFromFileSystem(fs, fs.getConf(), tabledir)) { throw new IOException("Failed delete of " + tabledir.toString()); } } @@ -280,7 +281,7 @@ for (int i = 1; i < status.length; i++) { Path p = status[i].getPath(); // Clean up old versions - if (!fs.delete(p, false)) { + if (!HBaseFileSystem.deleteFileFromFileSystem(fs, fs.getConf(), p)) { LOG.warn("Failed cleanup of " + status); } else { LOG.debug("Cleaned up old tableinfo file " + p); @@ -504,7 +505,7 @@ try { writeHTD(fs, p, hTableDescriptor); tableInfoPath = getTableInfoFileName(tableDir, sequenceid); - if (!fs.rename(p, tableInfoPath)) { + if (!HBaseFileSystem.renameDirForFileSystem(fs, fs.getConf(), p, tableInfoPath)) { throw new IOException("Failed rename of " + p + " to " + tableInfoPath); } } catch (IOException ioe) { @@ -530,7 +531,7 @@ private static void writeHTD(final FileSystem fs, final Path p, final HTableDescriptor htd) throws IOException { - FSDataOutputStream out = fs.create(p, false); + FSDataOutputStream out = HBaseFileSystem.createPathOnFileSystem(fs, fs.getConf(), p, false); try { htd.write(out); out.write('\n'); Index: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; @@ -106,7 +107,7 @@ */ public Path checkdir(final FileSystem fs, final Path dir) throws IOException { if (!fs.exists(dir)) { - fs.mkdirs(dir); + HBaseFileSystem.makeDirOnFileSystem(fs, fs.getConf(), dir); } return dir; } @@ -151,13 +152,10 @@ * @return output stream to the created file * @throws IOException if the file cannot be created */ - public static FSDataOutputStream create(FileSystem fs, Path path, - FsPermission perm, boolean overwrite) throws IOException { + public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, + boolean overwrite) throws IOException { LOG.debug("Creating file=" + path + " with permission=" + perm); - - return fs.create(path, perm, overwrite, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), null); + return HBaseFileSystem.createPathWithPermsOnFileSystem(fs, fs.getConf(), path, perm, overwrite); } /** Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (revision 1464765) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (working copy) @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseFileSystem; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; @@ -164,7 +165,7 @@ FileSystem fs) { Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); try { - fs.createNewFile(file); + HBaseFileSystem.createNewFileOnFileSystem(fs, fs.getConf(), file); } catch (IOException e) { LOG.warn("Could not flag a log file as corrupted. Failed to create " + file, e); Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1464765) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -197,8 +197,13 @@ // a hbase checksum verification failure will cause unit tests to fail ChecksumUtil.generateExceptionForChecksumFailureForTest(true); + setHDFSClientRetryProperty(); } + private void setHDFSClientRetryProperty() { + this.conf.setInt("hdfs.client.retries.number", 1); + } + /** * Returns this classes's instance of {@link Configuration}. Be careful how * you use the returned Configuration since {@link HConnection} instances Index: src/test/java/org/apache/hadoop/hbase/TestHBaseFileSystem.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestHBaseFileSystem.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/TestHBaseFileSystem.java (working copy) @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestHBaseFileSystem { + public static final Log LOG = LogFactory.getLog(TestHBaseFileSystem.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.setBoolean("dfs.support.append", true); + // The below config supported by 0.20-append and CDH3b2 + conf.setInt("dfs.client.block.recovery.retries", 2); + TEST_UTIL.startMiniDFSCluster(3); + Path hbaseRootDir = + TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); + LOG.info("hbase.rootdir=" + hbaseRootDir); + conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString()); + conf.setInt("hdfs.client.retries.number", 10); + } + + + @Test + public void testNonIdempotentOpsWithRetries() throws IOException { + LOG.info("testNonIdempotentOpsWithRetries"); + + Path rootDir = new Path(TestHBaseFileSystem.conf.get(HConstants.HBASE_DIR)); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + // Create a Region + assertTrue(HBaseFileSystem.createPathOnFileSystem(fs, TestHBaseFileSystem.conf, rootDir, true) != null); + + boolean result = HBaseFileSystem.makeDirOnFileSystem(new MockFileSystemForCreate(), TestHBaseFileSystem.conf, new Path("/a")); + assertTrue("Couldn't create the directory", result); + + + result = HBaseFileSystem.renameDirForFileSystem(new MockFileSystem(), TestHBaseFileSystem.conf, new Path("/a"), new Path("/b")); + assertTrue("Couldn't rename the directory", result); + + result = HBaseFileSystem.deleteDirFromFileSystem(new MockFileSystem(), TestHBaseFileSystem.conf, new Path("/a")); + + assertTrue("Couldn't delete the directory", result); + fs.delete(rootDir, true); + } + + static class MockFileSystemForCreate extends MockFileSystem { + @Override + public boolean exists(Path path) { + return false; + } + } + + /** + * a mock fs which throws exception for first 3 times, and then process the call (returns the + * excepted result). + */ + static class MockFileSystem extends FileSystem { + int retryCount; + final static int successRetryCount = 3; + + public MockFileSystem() { + retryCount = 0; + } + + @Override + public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2) throws IOException { + throw new IOException(""); + } + + @Override + public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3, + short arg4, long arg5, Progressable arg6) throws IOException { + LOG.debug("Create, " + retryCount); + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return null; + } + + @Override + public boolean delete(Path arg0) throws IOException { + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public boolean delete(Path arg0, boolean arg1) throws IOException { + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public FileStatus getFileStatus(Path arg0) throws IOException { + FileStatus fs = new FileStatus(); + return fs; + } + + @Override + public boolean exists(Path path) { + return true; + } + + @Override + public URI getUri() { + throw new RuntimeException("Something bad happen"); + } + + @Override + public Path getWorkingDirectory() { + throw new RuntimeException("Something bad happen"); + } + + @Override + public FileStatus[] listStatus(Path arg0) throws IOException { + throw new IOException("Something bad happen"); + } + + @Override + public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException { + LOG.debug("mkdirs, " + retryCount); + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public FSDataInputStream open(Path arg0, int arg1) throws IOException { + throw new IOException("Something bad happen"); + } + + @Override + public boolean rename(Path arg0, Path arg1) throws IOException { + LOG.debug("rename, " + retryCount); + if (retryCount++ < successRetryCount) throw new IOException("Something bad happen"); + return true; + } + + @Override + public void setWorkingDirectory(Path arg0) { + throw new RuntimeException("Something bad happen"); + } + } + +} Index: src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java (revision 1464765) +++ src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java (working copy) @@ -73,7 +73,6 @@ public void tearDown() throws Exception { // Stop the cluster TEST_UTIL.shutdownMiniCluster(); - TEST_UTIL = new HBaseTestingUtility(resetConf); } @Test