diff --git a/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index f14cd4e..b1c4beb 100644 --- a/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.snapshot.ExportSnapshotException; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -519,7 +521,8 @@ public final class ExportSnapshot extends Configured implements Tool { /** * Run Map-Reduce Job to perform the files copy. */ - private boolean runCopyJob(final Path inputRoot, final Path outputRoot, + private boolean runCopyJob(final FileSystem inputFs, final Path inputRoot, + final FileSystem outputFs, final Path outputRoot, final List> snapshotFiles, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, final int mappers) throws IOException, InterruptedException, ClassNotFoundException { @@ -550,7 +553,66 @@ public final class ExportSnapshot extends Configured implements Tool { SequenceFileInputFormat.addInputPath(job, path); } - return job.waitForCompletion(true); + FsDelegationToken inputFsToken = new FsDelegationToken("irenewer"); + FsDelegationToken outputFsToken = new FsDelegationToken("orenewer"); + try { + // Acquire the delegation Tokens + LOG.info("Acquire input-fs delegation token"); + inputFsToken.acquireDelegationToken(inputFs); + LOG.info("Acquire output-fs delegation token"); + outputFsToken.acquireDelegationToken(outputFs); + + // Run the MR Job + return job.waitForCompletion(true); + } finally { + inputFsToken.releaseDelegationToken(); + outputFsToken.releaseDelegationToken(); + } + } + + /** + * Helper class to obtain a filesystem delegation token. + * Mainly used by Map-Reduce jobs that requires to read/write data to + * a remote file-system (e.g. BulkLoad, ExportSnapshot). + */ + static class FsDelegationToken { + private final String renewer; + + private boolean hasForwardedToken = false; + private Token userToken = null; + private FileSystem fs = null; + + public FsDelegationToken(final String renewer) { + this.renewer = renewer; + } + + public void acquireDelegationToken(final FileSystem fs) + throws IOException { + if (User.isSecurityEnabled()) { + this.fs = fs; + userToken = User.getCurrent().getToken("HDFS_DELEGATION_TOKEN", + fs.getCanonicalServiceName()); + if (userToken == null) { + hasForwardedToken = false; + userToken = fs.getDelegationToken(renewer); + } else { + hasForwardedToken = true; + LOG.info("Use the existing token: " + userToken); + } + } + } + + public void releaseDelegationToken() { + if (User.isSecurityEnabled()) { + if (userToken != null && !hasForwardedToken) { + try { + userToken.cancel(this.fs.getConf()); + } catch (Exception e) { + LOG.warn("Failed to cancel HDFS delegation token.", e); + } + } + } + } } /** @@ -653,7 +715,7 @@ public final class ExportSnapshot extends Configured implements Tool { if (files.size() == 0) { LOG.warn("There are 0 store file to be copied. There may be no data in the table."); } else { - if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum, + if (!runCopyJob(inputFs, inputRoot, outputFs, outputRoot, files, verifyChecksum, filesUser, filesGroup, filesMode, mappers)) { throw new ExportSnapshotException("Snapshot export failed!"); } diff --git a/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java b/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java index a666c5d..3de2b3b 100644 --- a/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java +++ b/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java @@ -72,7 +72,7 @@ import org.junit.experimental.categories.Category; public class TestExportSnapshot { private final Log LOG = LogFactory.getLog(getClass()); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static byte[] FAMILY = Bytes.toBytes("cf"); @@ -81,13 +81,19 @@ public class TestExportSnapshot { private byte[] tableName; private HBaseAdmin admin; + public static void setUpBaseConf(Configuration conf) { + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + conf.setInt("hbase.regionserver.msginterval", 100); + conf.setInt("hbase.client.pause", 250); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf.setBoolean("hbase.master.enabletable.roundrobin", true); + conf.setInt("mapreduce.map.max.attempts", 10); + conf.setInt("mapred.map.max.attempts", 10); + } + @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); - TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); - TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); - TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); + setUpBaseConf(TEST_UTIL.getConfiguration()); TEST_UTIL.startMiniCluster(3); } @@ -237,7 +243,7 @@ public class TestExportSnapshot { */ private void testExportFileSystemState(final byte[] tableName, final byte[] snapshotName, int filesExpected) throws Exception { - Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis()); + Path copyDir = getHdfsDestinationDir(); URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri(); FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration()); copyDir = copyDir.makeQualified(fs); @@ -314,6 +320,13 @@ public class TestExportSnapshot { }); } + private Path getHdfsDestinationDir() { + Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis()); + LOG.info("HDFS export destination path: " + path); + return path; + } + private Set listFiles(final FileSystem fs, final Path root, final Path dir) throws IOException { Set files = new HashSet();