diff --git security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index b82ad5d..0b00479 100644 --- security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -18,7 +18,9 @@ import java.io.IOException; import java.net.InetAddress; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.Permission.Action; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @@ -442,7 +445,7 @@ public class AccessController extends BaseRegionObserver * @param families The map of column families-qualifiers. * @throws AccessDeniedException if the authorization check failed */ - private void requirePermission(Permission.Action perm, + public void requirePermission(Permission.Action perm, RegionCoprocessorEnvironment env, Map> families) throws IOException { @@ -986,6 +989,73 @@ public class AccessController extends BaseRegionObserver } } + /** + * Verifies user has WRITE privileges on + * the Column Families invovled in the bulkLoadHFile + * request. Specific Column Write privileges are presently + * ignored. + */ + @Override + public void preBulkLoadHFile(ObserverContext ctx, + List> familyPaths) throws IOException { + List cfs = new LinkedList(); + for(Pair el : familyPaths) { + cfs.add(el.getFirst()); + } + requirePermission(Permission.Action.WRITE, ctx.getEnvironment(), cfs); + } + + private AuthResult hasSomeAccess(RegionCoprocessorEnvironment e, Action action) throws IOException { + User requestUser = getActiveUser(); + byte[] tableName = e.getRegion().getTableDesc().getName(); + AuthResult authResult = permissionGranted(requestUser, + action, e, Collections.EMPTY_MAP); + if (!authResult.isAllowed()) { + for(UserPermission userPerm: + AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), tableName)) { + for(Permission.Action userAction: userPerm.getActions()) { + if(userAction.equals(action)) { + return AuthResult.allow("Access allowed", requestUser, + action, tableName); + } + } + } + } + return authResult; + } + + /** + * Authorization check for + * SecureBulkLoadProtocol.prepareBulkLoad() + * @param e + * @throws IOException + */ + //TODO this should end up as a coprocessor hook + public void prePrepareBulkLoad(RegionCoprocessorEnvironment e) throws IOException { + AuthResult authResult = hasSomeAccess(e, Action.WRITE); + logResult(authResult); + if (!authResult.isAllowed()) { + throw new AccessDeniedException("Insufficient permissions (table=" + + e.getRegion().getTableDesc().getNameAsString() + ", action=WRITE)"); + } + } + + /** + * Authorization security check for + * SecureBulkLoadProtocol.cleanupBulkLoad() + * @param e + * @throws IOException + */ + //TODO this should end up as a coprocessor hook + public void preCleanupBulkLoad(RegionCoprocessorEnvironment e) throws IOException { + AuthResult authResult = hasSomeAccess(e, Action.WRITE); + logResult(authResult); + if (!authResult.isAllowed()) { + throw new AccessDeniedException("Insufficient permissions (table=" + + e.getRegion().getTableDesc().getNameAsString() + ", action=WRITE)"); + } + } + /* ---- AccessControllerProtocol implementation ---- */ /* * These methods are only allowed to be called against the _acl_ region(s). diff --git security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndPoint.java security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndPoint.java new file mode 100644 index 0000000..efb5e98 --- /dev/null +++ security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndPoint.java @@ -0,0 +1,254 @@ +package org.apache.hadoop.hbase.security.access; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RequestContext; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Methods; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; + +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; + +import java.io.IOException; +import java.math.BigInteger; +import java.security.PrivilegedAction; +import java.security.SecureRandom; +import java.util.List; + + +public class SecureBulkLoadEndPoint extends BaseEndpointCoprocessor + implements SecureBulkLoadProtocol { + + public static final long VERSION = 0L; + + //320/5 = 64 characters + private static final int RANDOM_WIDTH = 320; + private static final int RANDOM_RADIX = 32; + + private static Log LOG = LogFactory.getLog(SecureBulkLoadEndPoint.class); + + private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); + private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x"); + private final static String BULKLOAD_STAGING_DIR = "hbase.bulkload.staging.dir"; + + private SecureRandom random; + private FileSystem fs; + private Configuration conf; + + //TODO make this configurable + //two levels so it doesn't get deleted accidentally + //no sticky bit in Hadoop 1.0 + private Path baseStagingDir; + + private RegionCoprocessorEnvironment env; + + + @Override + public void start(CoprocessorEnvironment env) { + super.start(env); + + this.env = (RegionCoprocessorEnvironment)env; + random = new SecureRandom(); + conf = env.getConfiguration(); + baseStagingDir= new Path(conf.get(BULKLOAD_STAGING_DIR, "/tmp/hbase-staging")); + + try { + fs = FileSystem.get(conf); + fs.mkdirs(baseStagingDir, PERM_HIDDEN); + fs.setPermission(baseStagingDir, PERM_HIDDEN); + //no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased + fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN); + FileStatus status = fs.getFileStatus(baseStagingDir); + if(status == null) + throw new IllegalStateException("Failed to create staging directory"); + if(!status.getOwner().equals(User.getCurrent().getShortName())) + throw new IllegalStateException("Directory already exists and is not owned by system user: "+ + status.getOwner()+"!="+User.getCurrent().getShortName()); + } catch (IOException e) { + throw new IllegalStateException("Failed to get FileSystem instance",e); + } + } + + @Override + public String prepareBulkLoad(byte[] tableName) throws IOException { + getAccessController().prePrepareBulkLoad(env); + return createStagingDir(baseStagingDir, getActiveUser(), tableName).toString(); + } + + @Override + public void cleanupBulkLoad(String bulkToken) throws IOException { + getAccessController().preCleanupBulkLoad(env); + fs.delete(createStagingDir(baseStagingDir, getActiveUser(), + env.getRegion().getTableDesc().getName(), new Path(bulkToken).getName()), true); + } + + @Override + public boolean bulkLoadHFiles(final List> familyPaths, + final Token userToken, final String bulkToken) throws IOException { + User user = getActiveUser(); + final UserGroupInformation ugi = user.getUGI(); + ugi.addToken(userToken); + ugi.addToken(getToken(user)); + + HRegion region = env.getRegion(); + boolean bypass = false; + if (region.getCoprocessorHost() != null) { + bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); + } + boolean loaded = false; + if (!bypass) { + loaded = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + FileSystem fs = null; + try { + Configuration conf = HBaseConfiguration.create(); + fs = FileSystem.get(conf); + for(Pair el: familyPaths) { + Path p = new Path(el.getSecond()); + LOG.debug("Setting permission for: " + p); + fs.setPermission(p, PERM_ALL_ACCESS); + Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); + if(!fs.exists(stageFamily)) { + fs.mkdirs(stageFamily); + fs.setPermission(stageFamily, PERM_ALL_ACCESS); + } + } + //We call bulkLoadHFiles as requesting user + //To enable access prior to staging + return env.getRegion().bulkLoadHFiles(familyPaths, + new SecureBulkLoadListener(fs, bulkToken)); + } catch (IOException e) { + LOG.error("Failed to secure bulk load",e); + } catch (Exception e) { + LOG.error("Failed to complete bulk load",e); + } + return false; + } + }); + } + if (region.getCoprocessorHost() != null) { + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + } + return loaded; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (SecureBulkLoadProtocol.class.getName().equals(protocol)) { + return SecureBulkLoadEndPoint.VERSION; + } + LOG.warn("Unknown protocol requested: " + protocol); + return -1; + } + + private AccessController getAccessController() { + return (AccessController) this.env.getRegion() + .getCoprocessorHost().findCoprocessor(AccessController.class.getName()); + } + + private Path createStagingDir(Path baseDir, User user, byte[] tableName) throws IOException { + String randomDir = user.getShortName()+"__"+Bytes.toString(tableName)+"__"+ + (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX)); + return createStagingDir(baseDir, user, tableName, randomDir); + } + + private Path createStagingDir(Path baseDir, User user, byte[] tableName, String randomDir) throws IOException { + Path p = new Path(baseDir, randomDir); + fs.mkdirs(p, PERM_ALL_ACCESS); + fs.setPermission(p, PERM_ALL_ACCESS); + return p; + } + + private User getActiveUser() throws IOException { + User user = RequestContext.getRequestUser(); + if (!RequestContext.isInRequestContext()) { + throw new DoNotRetryIOException("Failed to get requesting user"); + } + return user; + } + + public Token getToken(User user) { + RegionCoprocessorEnvironment regionEnv = env; + RpcServer server = regionEnv.getRegionServerServices().getRpcServer(); + SecretManager mgr = ((org.apache.hadoop.hbase.ipc.SecureServer)server).getSecretManager(); + AuthenticationTokenSecretManager secretManager = (AuthenticationTokenSecretManager)mgr; + return secretManager.generateToken(user.getName()); + } + + public static class SecureBulkLoadListener implements HRegion.InternalBulkLoadListener { + private FileSystem fs; + private String stagingDir; + + public SecureBulkLoadListener(FileSystem fs, String stagingDir) { + this.fs = fs; + this.stagingDir = stagingDir; + } + + @Override + public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { + Path p = new Path(srcPath); + Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); + + if(!isFile(p)) { + throw new IOException("Path does not reference a file: " + p); + } + + LOG.debug("Moving " + p + " to " + stageP); + if(!fs.rename(p, stageP)) { + throw new IOException("Failed to move HFile: " + p + " to " + stageP); + } + return stageP.toString(); + } + + @Override + public void doneBulkLoad(byte[] family, String srcPath) throws IOException { + LOG.debug("Bulk Load done for: " + srcPath); + } + + @Override + public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException { + Path p = new Path(srcPath); + Path stageP = new Path(stagingDir, + new Path(Bytes.toString(family), p.getName())); + LOG.debug("Moving Back " + stageP + " to " + p); + if(!fs.rename(stageP, p)) + throw new IOException("Failed to move HFile: " + stageP + " to " + p); + } + + /** + * Check if the path is referencing a file. + * This is mainly needed to avoid symlinks. + * @param p + * @return true if the p is a file + * @throws IOException + */ + private boolean isFile(Path p) throws IOException { + FileStatus status = fs.getFileStatus(p); + boolean isFile = !status.isDir(); + try { + isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null); + } catch (Exception e) { + } + return isFile; + } + } +} diff --git security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadProtocol.java security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadProtocol.java new file mode 100644 index 0000000..0ca28b9 --- /dev/null +++ security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadProtocol.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.hbase.security.access; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.security.TokenInfo; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.util.List; + +/** + * Provides a secure way to bulk load data onto HBase + * These are internal API. Bulk load should be initiated + * via {@link org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles} + * with security enabled. + */ +@TokenInfo("HBASE_AUTH_TOKEN") +public interface SecureBulkLoadProtocol extends CoprocessorProtocol { + + /** + * Prepare for bulk load. + * Will be called before bulkLoadHFiles() + * @param tableName + * @return a bulkToken which uniquely identifies the bulk session + * @throws IOException + */ + String prepareBulkLoad(byte[] tableName) throws IOException; + + /** + * Cleanup after bulk load. + * Will be called after bulkLoadHFiles(). + * @param bulkToken + * @throws IOException + */ + void cleanupBulkLoad(String bulkToken) throws IOException; + + /** + * Secure version of HRegionServer.bulkLoadHFiles(). + * @param familyPaths column family to HFile path pairs + * @param userToken requesting user's HDFS delegation token + * @param bulkToken + * @return + * @throws IOException + */ + boolean bulkLoadHFiles(List> familyPaths, + Token userToken, String bulkToken) throws IOException; + +} diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java index 1a087b6..679e1fc 100644 --- security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java +++ security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java @@ -33,7 +33,8 @@ public class SecureTestUtil { conf.set("hadoop.security.authentication", "simple"); conf.set("hbase.rpc.engine", SecureRpcEngine.class.getName()); conf.set("hbase.coprocessor.master.classes", AccessController.class.getName()); - conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()); + conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()+ + ","+SecureBulkLoadEndPoint.class.getName()); // add the process running user to superusers String currentUser = User.getCurrent().getName(); conf.set("hbase.superuser", "admin,"+currentUser); diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 765f0af..43da4df 100644 --- security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -28,12 +28,17 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Append; @@ -51,6 +56,9 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; @@ -176,22 +184,35 @@ public class TestAccessController { try { user.runAs(action); fail("Expected AccessDeniedException for user '" + user.getShortName() + "'"); - } catch (RetriesExhaustedWithDetailsException e) { - // in case of batch operations, and put, the client assembles a - // RetriesExhaustedWithDetailsException instead of throwing an - // AccessDeniedException + } catch (AccessDeniedException ade) { + // expected result + } catch (IOException e) { boolean isAccessDeniedException = false; - for (Throwable ex : e.getCauses()) { - if (ex instanceof AccessDeniedException) { - isAccessDeniedException = true; - break; + if(e instanceof RetriesExhaustedWithDetailsException) { + // in case of batch operations, and put, the client assembles a + // RetriesExhaustedWithDetailsException instead of throwing an + // AccessDeniedException + for(Throwable ex : ((RetriesExhaustedWithDetailsException) e).getCauses()) { + if (ex instanceof AccessDeniedException) { + isAccessDeniedException = true; + break; + } } } + else { + // For doBulkLoad calls AccessDeniedException + // is buried in the stack trace + Throwable ex = e; + do { + if (ex instanceof AccessDeniedException) { + isAccessDeniedException = true; + break; + } + } while((ex = ex.getCause()) != null); + } if (!isAccessDeniedException) { fail("Not receiving AccessDeniedException for user '" + user.getShortName() + "'"); } - } catch (AccessDeniedException ade) { - // expected result } } } @@ -616,6 +637,104 @@ public class TestAccessController { } @Test + public void testBulkLoad() throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + final Path dir = TEST_UTIL.getDataTestDir("testBulkLoad"); + fs.mkdirs(dir); + //need to make it globally writable + //so users creating HFiles have write permissions + fs.setPermission(dir, FsPermission.valueOf("-rwxrwxrwx")); + + PrivilegedExceptionAction bulkLoadAction = new PrivilegedExceptionAction() { + public Object run() throws Exception { + int numRows = 3; + + //Making the assumption that the test table won't split between the range + byte[][][] hfileRanges = {{{(byte)0}, {(byte)9}}}; + + Path bulkLoadBasePath = new Path(dir, new Path(User.getCurrent().getName())); + new BulkLoadHelper(bulkLoadBasePath) + .bulkLoadHFile(TEST_TABLE, TEST_FAMILY, Bytes.toBytes("q"), hfileRanges, numRows); + + return null; + } + }; + verifyWrite(bulkLoadAction); + } + + public class BulkLoadHelper { + private final FileSystem fs; + private final Path loadPath; + private final Configuration conf; + + public BulkLoadHelper(Path loadPath) throws IOException { + fs = TEST_UTIL.getTestFileSystem(); + conf = TEST_UTIL.getConfiguration(); + loadPath = loadPath.makeQualified(fs); + this.loadPath = loadPath; + } + + private void createHFile(Path path, + byte[] family, byte[] qualifier, + byte[] startKey, byte[] endKey, int numRows) throws IOException { + + HFile.Writer writer = null; + long now = System.currentTimeMillis(); + try { + writer = HFile.getWriterFactory(conf, new CacheConfig(conf)) + .withPath(fs, path) + .withComparator(KeyValue.KEY_COMPARATOR) + .create(); + // subtract 2 since numRows doesn't include boundary keys + for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, true, numRows-2)) { + KeyValue kv = new KeyValue(key, family, qualifier, now, key); + writer.append(kv); + } + } finally { + if(writer != null) + writer.close(); + } + } + + private void bulkLoadHFile( + byte[] tableName, + byte[] family, + byte[] qualifier, + byte[][][] hfileRanges, + int numRowsPerRange) throws Exception { + + Path familyDir = new Path(loadPath, Bytes.toString(family)); + fs.mkdirs(familyDir); + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + createHFile(new Path(familyDir, "hfile_"+(hfileIdx++)), + family, qualifier, from, to, numRowsPerRange); + } + //set global read so RegionServer can move it + setPermission(loadPath, FsPermission.valueOf("-rwxrwxrwx")); + + HTable table = new HTable(conf, tableName); + TEST_UTIL.waitTableAvailable(tableName, 30000); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + loader.doBulkLoad(loadPath, table); + } + + public void setPermission(Path dir, FsPermission perm) throws IOException { + if(!fs.getFileStatus(dir).isDir()) { + fs.setPermission(dir,perm); + } + else { + for(FileStatus el : fs.listStatus(dir)) { + fs.setPermission(el.getPath(), perm); + setPermission(el.getPath() , perm); + } + } + } + } + + @Test public void testAppend() throws Exception { PrivilegedExceptionAction appendAction = new PrivilegedExceptionAction() { diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 4fa1a14..2870e0c 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -292,6 +292,7 @@ public abstract class CoprocessorHost { public Coprocessor findCoprocessor(String className) { // initialize the coprocessors for (E env: coprocessors) { + LOG.error("-->"+env.getInstance().getClass().getName()); if (env.getInstance().getClass().getName().equals(className) || env.getInstance().getClass().getSimpleName().equals(className)) { return env.getInstance(); diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 922e621..a0b2b88 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -73,8 +73,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.SecureBulkLoadProtocol; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -98,10 +101,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public static String NAME = "completebulkload"; + private boolean useSecure; + private Token userToken; + private String bulkToken; + public LoadIncrementalHFiles(Configuration conf) throws Exception { super(conf); this.cfg = conf; this.hbAdmin = new HBaseAdmin(conf); + this.useSecure = User.isHBaseSecurityEnabled(conf); } private void usage() { @@ -212,10 +220,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return; } - if (queue.isEmpty()) { - LOG.warn("Bulk load operation did not find any files to load in " + - "directory " + hfofDir.toUri() + ". Does it contain files in " + - "subdirectories that correspond to column family names?"); + //If using secure bulk load + //prepare staging directory and token + if(useSecure) { + FileSystem fs = FileSystem.get(cfg); + userToken = fs.getDelegationToken("renewer"); + SecureBulkLoadProtocol proxy = table.coprocessorProxy(SecureBulkLoadProtocol.class, + HConstants.EMPTY_START_ROW); + bulkToken = proxy.prepareBulkLoad(table.getTableName()); } // Assumes that region splits can happen while this occurs. @@ -246,6 +258,20 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } finally { + if(useSecure) { + if(userToken != null) { + try { + userToken.cancel(cfg); + } catch (InterruptedException e) { + LOG.warn("Failed to cancel HDFS delegation token.", e); + } + } + if(bulkToken != null) { + SecureBulkLoadProtocol proxy = table.coprocessorProxy(SecureBulkLoadProtocol.class, + HConstants.EMPTY_START_ROW); + proxy.cleanupBulkLoad(bulkToken); + } + } pool.shutdown(); if (queue != null && !queue.isEmpty()) { StringBuilder err = new StringBuilder(); @@ -482,7 +508,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.debug("Going to connect to server " + location + " for row " + Bytes.toStringBinary(row)); byte[] regionName = location.getRegionInfo().getRegionName(); - return server.bulkLoadHFiles(famPaths, regionName); + if(!useSecure) { + return server.bulkLoadHFiles(famPaths, regionName); + } + else { + HTable table = new HTable(conn.getConfiguration(), tableName); + SecureBulkLoadProtocol proxy = table.coprocessorProxy(SecureBulkLoadProtocol.class, row); + return proxy.bulkLoadHFiles(famPaths, userToken, bulkToken); + } } }; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e8d2c26..e0ac767 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3209,8 +3209,22 @@ public class HRegion implements HeapSize { // , Writable{ * @return true if successful, false if failed recoverably * @throws IOException if failed unrecoverably. */ - public boolean bulkLoadHFiles(List> familyPaths) - throws IOException { + public boolean bulkLoadHFiles(List> familyPaths) throws IOException { + return bulkLoadHFiles(familyPaths, null); + } + + /** + * Attempts to atomically load a group of hfiles. This is critical for loading + * rows with multiple column families atomically. + * + * @param familyPaths List of Pair + * @param bulkLoadListener Internal hooks enabling massaging/preparation of a + * file about to be bulk loaded + * @return true if successful, false if failed recoverably + * @throws IOException if failed unrecoverably. + */ + public boolean bulkLoadHFiles(List> familyPaths, + InternalBulkLoadListener bulkLoadListener) throws IOException { Preconditions.checkNotNull(familyPaths); // we need writeLock for multi-family bulk load startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths)); @@ -3270,7 +3284,14 @@ public class HRegion implements HeapSize { // , Writable{ String path = p.getSecond(); Store store = getStore(familyName); try { - store.bulkLoadHFile(path); + String finalPath = path; + if(bulkLoadListener != null) { + finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); + } + store.bulkLoadHFile(finalPath); + if(bulkLoadListener != null) { + bulkLoadListener.doneBulkLoad(familyName, path); + } } catch (IOException ioe) { // a failure here causes an atomicity violation that we currently // cannot recover from since it is likely a failed hdfs operation. @@ -3278,6 +3299,13 @@ public class HRegion implements HeapSize { // , Writable{ // TODO Need a better story for reverting partial failures due to HDFS. LOG.error("There was a partial failure due to IO when attempting to" + " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond()); + if(bulkLoadListener != null) { + try { + bulkLoadListener.failedBulkLoad(familyName, path); + } catch (Exception ex) { + LOG.error("Error while calling faildBulkLoad", ex); + } + } throw ioe; } } @@ -5226,4 +5254,14 @@ public class HRegion implements HeapSize { // , Writable{ if (bc != null) bc.shutdown(); } } + + public static interface InternalBulkLoadListener { + + String prepareBulkLoad(byte[] family, String srcPath) throws IOException; + + void doneBulkLoad(byte[] family, String srcPath) throws IOException; + + void failedBulkLoad(byte[] family, String srcPath) throws IOException; + + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e0a7c24..2232af9 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; @@ -595,6 +596,16 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + // Retrieve clusterId + // Since cluster status is now up + // ID should have already been set by HMaster + ClusterId clusterId = new ClusterId(this.zooKeeper,this); + if (!clusterId.hasId()) { + this.abort("Cluster ID has not been set"); + } + this.conf.set(HConstants.CLUSTER_ID, clusterId.getId()); + LOG.info("ClusterId : "+clusterId); } /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 0d52a3e..a43b4f3 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -536,7 +536,11 @@ public class Store extends SchemaConfigured implements HeapSize { // Move the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); FileSystem desFs = fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs; - if (!srcFs.equals(desFs)) { + //We can't compare FileSystem instances as + //equals() includes UGI instance as part of the comparison + //and won't work when doing SecureBulkLoad + //TODO deal with viewFS + if (!srcFs.getUri().equals(desFs.getUri())) { LOG.info("File " + srcPath + " on different filesystem than " + "destination store - moving to this filesystem."); Path tmpPath = getTmpPath(); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java new file mode 100644 index 0000000..1f792a7 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java @@ -0,0 +1,90 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.zookeeper.ClusterId; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +/** + * Test metrics incremented on region server operations. + */ +@Category(MediumTests.class) +public class TestClusterId { + + private static final Log LOG = + LogFactory.getLog(TestClusterId.class.getName()); + + private final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private JVMClusterUtil.RegionServerThread rst; + private JVMClusterUtil.MasterThread mst; + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + if(rst != null && rst.getRegionServer() != null) { + rst.getRegionServer().stop("end of test"); + rst.join(); + } + } + + @Test + public void testClusterId() throws Exception { + TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniDFSCluster(1); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + //start region server, needs to be separate + //so we get an unset clusterId + rst = JVMClusterUtil.createRegionServerThread(conf, + HRegionServer.class, 0); + rst.start(); + //Make sure RS is in blocking state + Thread.sleep(10000); + + TEST_UTIL.startMiniHBaseCluster(1, 0); + + rst.waitForServerOnline(); + + ClusterId clusterId = new ClusterId(TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeperWatcher(), null); + assertTrue(clusterId.hasId()); + assertEquals(clusterId.getId(), rst.getRegionServer().getConfiguration().get(HConstants.CLUSTER_ID)); + } +} +