diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index 73f3542..b94afa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -84,6 +84,9 @@ public final class ExportSnapshot extends Configured implements Tool { private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; + private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; + private static final String CONF_COPY_RETRY = "snapshot.export.copy.retry"; + private static final String CONF_COPY_RETRY_WAIT = "snapshot.export.copy.retry.wait"; static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry"; @@ -104,6 +107,9 @@ public final class ExportSnapshot extends Configured implements Tool { private String filesGroup; private String filesUser; private short filesMode; + private int bufferSize; + private int copyRetry; + private int copyRetryWait; private FileSystem outputFs; private Path outputArchive; @@ -114,7 +120,7 @@ public final class ExportSnapshot extends Configured implements Tool { private Path inputRoot; @Override - public void setup(Context context) { + public void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); @@ -123,6 +129,8 @@ public final class ExportSnapshot extends Configured implements Tool { filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); + copyRetry = conf.getInt(CONF_COPY_RETRY, 1); + copyRetryWait = conf.getInt(CONF_COPY_RETRY_WAIT, 1000); inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); @@ -132,14 +140,18 @@ public final class ExportSnapshot extends Configured implements Tool { try { inputFs = FileSystem.get(inputRoot.toUri(), conf); } catch (IOException e) { - throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e); + throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); } try { outputFs = FileSystem.get(outputRoot.toUri(), conf); } catch (IOException e) { - throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e); + throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); } + + int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(), BUFFER_SIZE); + bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); + LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); } @Override @@ -148,9 +160,22 @@ public final class ExportSnapshot extends Configured implements Tool { Path inputPath = new Path(key.toString()); Path outputPath = getOutputPath(inputPath); - LOG.info("copy file input=" + inputPath + " output=" + outputPath); - copyFile(context, inputPath, outputPath); - LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); + // Additional retry for a single file (we have also the retry at map task level) + String msg = "copy file input=" + inputPath + " output=" + outputPath; + LOG.info(msg); + int nattempts = 0; + while (true) { + try { + copyFile(context, inputPath, outputPath); + break; + } catch (IOException e) { + if (++nattempts >= copyRetry) { + throw e; + } + LOG.warn("Failed copy attempt " + nattempts + ". retrying...", e); + Thread.sleep(nattempts * copyRetryWait); + } + } } /** @@ -206,24 +231,15 @@ public final class ExportSnapshot extends Configured implements Tool { throws IOException { injectTestFailure(context, inputPath); - FSDataInputStream in = openSourceFile(inputPath); - if (in == null) { - context.getCounter(Counter.MISSING_FILES).increment(1); - throw new FileNotFoundException(inputPath.toString()); - } - + FSDataInputStream in = openSourceFile(context, inputPath); try { - // Verify if the input file exists - FileStatus inputStat = getFileStatus(inputFs, inputPath); - if (inputStat == null) { - context.getCounter(Counter.MISSING_FILES).increment(1); - throw new FileNotFoundException(inputPath.toString()); - } + // Get the file information + FileStatus inputStat = getSourceFileStatus(context, inputPath); // Verify if the output file exists and is the same that we want to copy if (outputFs.exists(outputPath)) { FileStatus outputStat = outputFs.getFileStatus(outputPath); - if (sameFile(inputStat, outputStat)) { + if (outputStat != null && sameFile(inputStat, outputStat)) { LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file."); return; } @@ -240,15 +256,22 @@ public final class ExportSnapshot extends Configured implements Tool { out.close(); } - // Preserve attributes - preserveAttributes(outputPath, inputStat); + // Try to Preserve attributes + if (!preserveAttributes(outputPath, inputStat)) { + LOG.warn("You may have to run manually chown on: " + outputPath); + } } finally { in.close(); } } /** - * Preserve the files attribute selected by the user copying them from the source file + * Try to Preserve the files attribute selected by the user copying them from the source file + * This is only required when you are exporting as a different user than "hbase" or on a system + * that doesn't have the "hbase" user. + * + * This is not considered a blocking failure since the user can force a chmod with the user + * that knows is available on the system. */ private boolean preserveAttributes(final Path path, final FileStatus refStat) { FileStatus stat; @@ -266,38 +289,47 @@ public final class ExportSnapshot extends Configured implements Tool { outputFs.setPermission(path, refStat.getPermission()); } } catch (IOException e) { - LOG.error("Unable to set the permission for file=" + path, e); + LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage()); return false; } - try { - String user = (filesUser != null) ? filesUser : refStat.getOwner(); - String group = (filesGroup != null) ? filesGroup : refStat.getGroup(); - if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { - outputFs.setOwner(path, user, group); + String user = stringIsNotEmpty(filesUser) ? filesUser : refStat.getOwner(); + String group = stringIsNotEmpty(filesGroup) ? filesGroup : refStat.getGroup(); + if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { + try { + if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { + outputFs.setOwner(path, user, group); + } + } catch (IOException e) { + LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage()); + LOG.warn("The user/group may not exists on the destination cluster: user=" + + user + " group=" + group); + return false; } - } catch (IOException e) { - LOG.error("Unable to set the owner/group for file=" + path, e); - return false; } return true; } + private boolean stringIsNotEmpty(final String str) { + return str != null && str.length() > 0; + } + private void copyData(final Context context, final Path inputPath, final FSDataInputStream in, final Path outputPath, final FSDataOutputStream out, final long inputFileSize) throws IOException { final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + - " (%.3f%%)"; + " (%.1f%%) from " + inputPath + " to " + outputPath; try { - byte[] buffer = new byte[BUFFER_SIZE]; + byte[] buffer = new byte[bufferSize]; long totalBytesWritten = 0; int reportBytes = 0; int bytesRead; + long stime = System.currentTimeMillis(); while ((bytesRead = in.read(buffer)) > 0) { out.write(buffer, 0, bytesRead); totalBytesWritten += bytesRead; @@ -307,17 +339,16 @@ public final class ExportSnapshot extends Configured implements Tool { context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); context.setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), - totalBytesWritten/(float)inputFileSize) + - " from " + inputPath + " to " + outputPath); + (totalBytesWritten/(float)inputFileSize) * 100.0f)); reportBytes = 0; } } + long etime = System.currentTimeMillis(); context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); context.setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), - totalBytesWritten/(float)inputFileSize) + - " from " + inputPath + " to " + outputPath); + (totalBytesWritten/(float)inputFileSize) * 100.0f)); // Verify that the written size match if (totalBytesWritten != inputFileSize) { @@ -325,6 +356,12 @@ public final class ExportSnapshot extends Configured implements Tool { " expected=" + inputFileSize + " for file=" + inputPath; throw new IOException(msg); } + + LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); + LOG.info("size=" + totalBytesWritten + + " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" + + " time=" + StringUtils.formatTimeDiff(etime, stime) + + String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0)); } catch (IOException e) { LOG.error("Error copying " + inputPath + " to " + outputPath, e); context.getCounter(Counter.COPY_FAILED).increment(1); @@ -332,7 +369,12 @@ public final class ExportSnapshot extends Configured implements Tool { } } - private FSDataInputStream openSourceFile(final Path path) { + /** + * Try to open the "source" file. + * Throws an IOException if the communication with the inputFs fail or + * if the file is not found. + */ + private FSDataInputStream openSourceFile(Context context, final Path path) throws IOException { try { if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) { return new HFileLink(inputRoot, inputArchive, path).open(inputFs); @@ -342,26 +384,34 @@ public final class ExportSnapshot extends Configured implements Tool { return new HLogLink(inputRoot, serverName, logName).open(inputFs); } return inputFs.open(path); + } catch (FileNotFoundException e) { + context.getCounter(Counter.MISSING_FILES).increment(1); + LOG.error("Unable to open source file=" + path, e); + throw e; } catch (IOException e) { LOG.error("Unable to open source file=" + path, e); - return null; + throw e; } } - private FileStatus getFileStatus(final FileSystem fs, final Path path) { + private FileStatus getSourceFileStatus(Context context, final Path path) throws IOException { try { if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) { HFileLink link = new HFileLink(inputRoot, inputArchive, path); - return link.getFileStatus(fs); + return link.getFileStatus(inputFs); } else if (isHLogLinkPath(path)) { String serverName = path.getParent().getName(); String logName = path.getName(); - return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs); + return new HLogLink(inputRoot, serverName, logName).getFileStatus(inputFs); } - return fs.getFileStatus(path); + return inputFs.getFileStatus(path); + } catch (FileNotFoundException e) { + context.getCounter(Counter.MISSING_FILES).increment(1); + LOG.error("Unable to get the status for source file=" + path, e); + throw e; } catch (IOException e) { - LOG.warn("Unable to get the status for file=" + path); - return null; + LOG.error("Unable to get the status for source file=" + path, e); + throw e; } } @@ -676,11 +726,18 @@ public final class ExportSnapshot extends Configured implements Tool { } // Check if the snapshot already in-progress - if (!overwrite && outputFs.exists(snapshotTmpDir)) { - System.err.println("A snapshot with the same name '" + snapshotName + "' may be in-progress"); - System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); - System.err.println("consider removing " + snapshotTmpDir + " before retrying export"); - return 1; + if (outputFs.exists(snapshotTmpDir)) { + if (overwrite) { + if (!outputFs.delete(snapshotTmpDir, true)) { + System.err.println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); + return 1; + } + } else { + System.err.println("A snapshot with the same name '" + snapshotName + "' may be in-progress"); + System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); + System.err.println("consider removing " + snapshotTmpDir + " before retrying export"); + return 1; + } } // Step 0 - Extract snapshot files to copy @@ -690,12 +747,10 @@ public final class ExportSnapshot extends Configured implements Tool { // The snapshot references must be copied before the hfiles otherwise the cleaner // will remove them because they are unreferenced. try { - FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, overwrite, conf); + FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf); } catch (IOException e) { - System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir + - " to=" + snapshotTmpDir); - e.printStackTrace(System.err); - return 1; + throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + + snapshotDir + " to=" + snapshotTmpDir); } // Step 2 - Start MR Job to copy files @@ -721,8 +776,7 @@ public final class ExportSnapshot extends Configured implements Tool { return 0; } catch (Exception e) { LOG.error("Snapshot export failed", e); - System.err.println("Snapshot export failed!"); - e.printStackTrace(System.err); + outputFs.delete(snapshotTmpDir, true); outputFs.delete(outputSnapshotDir, true); return 1; }