diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 82ac573..b1c4ed6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -262,7 +262,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { //If using secure bulk load //prepare staging directory and token if (userProvider.isHBaseSecurityEnabled()) { - FileSystem fs = FileSystem.get(getConf()); + FileSystem fs = FileSystem.get(hfofDir.toUri(), getConf()); fsDelegationToken.acquireDelegationToken(fs); bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 8b8c9d7..1b7cd02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Coprocessor; @@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.Pair; @@ -107,7 +109,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService private SecureRandom random; private FileSystem fs; - private Configuration conf; + private static Configuration conf; //two levels so it doesn't get deleted accidentally //no sticky bit in Hadoop 1.0 @@ -220,6 +222,21 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } boolean loaded = false; if (!bypass) { + // Get the target fs (current server fs) delegation token + // Since we have checked the permission via 'preBulkLoadHFile', now let's give + // the 'request user' necessary token to operate on target (server) fs. + // After this point the 'doAs' user will hold two tokens, one for source fs + // (for request user), another for target fs (for server principal) + FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + try { + targetfsDelegationToken.acquireDelegationToken(fs); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); + done.run(null); + return; + } + ugi.addToken(targetfsDelegationToken.getUserToken()); + loaded = ugi.doAs(new PrivilegedAction() { @Override public Boolean run() { @@ -229,8 +246,8 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService fs = FileSystem.get(conf); for(Pair el: familyPaths) { Path p = new Path(el.getSecond()); - LOG.trace("Setting permission for: " + p); - fs.setPermission(p, PERM_ALL_ACCESS); + // LOG.trace("Setting permission for: " + p); + // fs.setPermission(p, PERM_ALL_ACCESS); Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); if(!fs.exists(stageFamily)) { fs.mkdirs(stageFamily); @@ -303,8 +320,11 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } private static class SecureBulkLoadListener implements HRegion.BulkLoadListener { + // Target (current server) filesystem private FileSystem fs; private String stagingDir; + // Source filesystem + private FileSystem srcFs = null; public SecureBulkLoadListener(FileSystem fs, String stagingDir) { this.fs = fs; @@ -315,14 +335,26 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); + if (srcFs == null) { + srcFs = FileSystem.get(p.toUri(), conf); + } if(!isFile(p)) { throw new IOException("Path does not reference a file: " + p); } - LOG.debug("Moving " + p + " to " + stageP); - if(!fs.rename(p, stageP)) { - throw new IOException("Failed to move HFile: " + p + " to " + stageP); + // Check to see if the source and target filesystems are the same + // TODO will improve this using latest JIRA + if (!srcFs.getUri().equals(fs.getUri())) { + LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + + "the destination filesystem. Copying file over to destination staging dir."); + FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } + else { + LOG.debug("Moving " + p + " to " + stageP); + if(!fs.rename(p, stageP)) { + throw new IOException("Failed to move HFile: " + p + " to " + stageP); + } } return stageP.toString(); } @@ -350,7 +382,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService * @throws IOException */ private boolean isFile(Path p) throws IOException { - FileStatus status = fs.getFileStatus(p); + FileStatus status = srcFs.getFileStatus(p); boolean isFile = !status.isDir(); try { isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);