diff --git security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndPoint.java security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndPoint.java new file mode 100644 index 0000000..1a6d86c --- /dev/null +++ security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndPoint.java @@ -0,0 +1,187 @@ +package org.apache.hadoop.hbase.security.access; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RequestContext; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.security.TokenInfo; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; + +import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; + +import java.io.IOException; +import java.math.BigInteger; +import java.security.PrivilegedAction; +import java.security.SecureRandom; + + +public class SecureBulkLoadEndPoint extends BaseEndpointCoprocessor + implements SecureBulkLoadProtocol { + + public static final long VERSION = 0L; + + //320/5 = 64 characters + private static final int RANDOM_WIDTH = 320; + private static final int RANDOM_RADIX = 32; + + private static Log LOG = LogFactory.getLog(SecureBulkLoadEndPoint.class); + + private SecureRandom random; + private FileSystem fs; + private Configuration conf; + + //TODO make this configurable + //two levels so it doesn't get deleted accidentally + //no sticky bit in Hadoop 1.0 + private Path stagingDir = new Path("/tmp/hbase-staging"); + + private CoprocessorEnvironment env; + + + @Override + public void start(CoprocessorEnvironment env) { + super.start(env); + this.env = env; + random = new SecureRandom(); + conf = env.getConfiguration(); + try { + FileStatus status = null; + fs = FileSystem.get(env.getConfiguration()); + fs.mkdirs(stagingDir, new FsPermission((short) 0711)); + fs.setPermission(stagingDir, new FsPermission((short) 0711)); + //no sticky bit in 1.0, making directory nonempty so it never gets erased + fs.mkdirs(new Path(stagingDir,"DONOTERASE"), new FsPermission((short)0711)); + status = fs.getFileStatus(stagingDir); + if(status == null) + throw new IllegalStateException("Failed to create staging directory"); + if(!status.getOwner().equals(User.getCurrent().getShortName())) + throw new IllegalStateException("Directory already exists and is not owned by system user: "+ + status.getOwner()+"!="+User.getCurrent().getShortName()); + } catch (IOException e) { + throw new IllegalStateException("Failed to get FileSystem instance",e); + } + } + + @Override + public boolean secureCompleteBulkLoad(final String tableName, String srcDir, Token userToken) throws IOException { + final Path srcPath = new Path(srcDir); + final Path basePath = createRandomDir(stagingDir); + final Path contentPath = new Path(basePath, "content"); + + User user = getActiveUser(); + UserGroupInformation ugi = user.getUGI(); + ugi.addToken(userToken); + ugi.addToken(getToken(user)); + + //TODO handle failure scenarios + //- bulk load process fails + //- the regionserver doing secureCompleteBulkLoad dies + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + FileSystem fs = null; + try { + Configuration conf = HBaseConfiguration.create(); + fs = FileSystem.get(conf); + LOG.debug("Setting permission for: "+srcPath); + setPermission(fs, srcPath, new FsPermission((short) 0777)); + LOG.debug("Moving " + srcPath + " to " + contentPath); + fs.rename(srcPath, contentPath); + LOG.debug("Performing Bulk Load for: "+tableName); + completeBulkLoad(tableName, contentPath, conf); + LOG.debug("Bulk Load Complete"); + return true; + } catch (IOException e) { + LOG.error("Failed to secure bulk load",e); + } catch (Exception e) { + LOG.error("Failed to complete bulk load",e); + } + return false; + } + }); + return success; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (SecureBulkLoadProtocol.class.getName().equals(protocol)) { + return SecureBulkLoadEndPoint.VERSION; + } + LOG.warn("Unknown protocol requested: "+protocol); + return -1; + } + + private void completeBulkLoad(String tableName, Path srcPath, Configuration conf) throws Exception { + new LoadIncrementalHFiles(conf).doBulkLoad(srcPath, new HTable(conf, tableName)); + } + + private Path createRandomDir(Path parentDir) throws IOException { + Path p = new Path(parentDir,new BigInteger(RANDOM_WIDTH,random).toString(RANDOM_RADIX)); + fs.mkdirs(p, new FsPermission((short) 0777)); + fs.setPermission(p, new FsPermission((short) 0777)); + return p; + } + + private User getActiveUser() throws IOException { + User user = RequestContext.getRequestUser(); + if (!RequestContext.isInRequestContext()) { + throw new DoNotRetryIOException("Failed to get requesting user"); + } + return user; + } + + //TODO protect against doing chmod 777 on symbolic links + //this will be a security issue with Hadoop 2.0 since it'll support symbolic links + //we should use reflection to fix this now + private void setPermission(FileSystem userFS, Path path, FsPermission perm) throws IOException { + userFS.setPermission(path, perm); + if(userFS.getFileStatus(path).isDir()) { + for(FileStatus stat: userFS.listStatus(path)) { + if(stat.isDir()) + setPermission(userFS, stat.getPath(),perm); + else + userFS.setPermission(stat.getPath(), perm); + } + } + } + + public Token getToken(User user) { + RegionCoprocessorEnvironment regionEnv = + (RegionCoprocessorEnvironment)env; + RpcServer server = regionEnv.getRegionServerServices().getRpcServer(); + SecretManager mgr = ((org.apache.hadoop.hbase.ipc.SecureServer)server).getSecretManager(); + AuthenticationTokenSecretManager secretManager = (AuthenticationTokenSecretManager)mgr; + return secretManager.generateToken(user.getName()); + } + + public static void main(String args[]) throws IOException { + Configuration conf = HBaseConfiguration.create(); + HTable table = new HTable(conf, args[0]); + FileSystem fs = FileSystem.get(conf); + Token token = fs.getDelegationToken("renewer"); + SecureBulkLoadProtocol proxy = table.coprocessorProxy(SecureBulkLoadProtocol.class, + HConstants.EMPTY_START_ROW); + proxy.secureCompleteBulkLoad(args[0] , args[1], token); + } + +} diff --git security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadProtocol.java security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadProtocol.java new file mode 100644 index 0000000..ce394ea --- /dev/null +++ security/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadProtocol.java @@ -0,0 +1,14 @@ +package org.apache.hadoop.hbase.security.access; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.security.TokenInfo; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; + +@TokenInfo("HBASE_AUTH_TOKEN") +public interface SecureBulkLoadProtocol extends CoprocessorProtocol { + + public boolean secureCompleteBulkLoad(String tableName, String src, Token userToken) throws IOException; + +} diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java index 1a087b6..679e1fc 100644 --- security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java +++ security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java @@ -33,7 +33,8 @@ public class SecureTestUtil { conf.set("hadoop.security.authentication", "simple"); conf.set("hbase.rpc.engine", SecureRpcEngine.class.getName()); conf.set("hbase.coprocessor.master.classes", AccessController.class.getName()); - conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()); + conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()+ + ","+SecureBulkLoadEndPoint.class.getName()); // add the process running user to superusers String currentUser = User.getCurrent().getName(); conf.set("hbase.superuser", "admin,"+currentUser); diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndPoint.java security/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndPoint.java new file mode 100644 index 0000000..728e48b --- /dev/null +++ security/src/test/java/org/apache/hadoop/hbase/security/access/TestSecureBulkLoadEndPoint.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.access; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.token.Token; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestSecureBulkLoadEndPoint { + private static Log LOG = LogFactory.getLog(TestSecureBulkLoadEndPoint.class); + private static HBaseTestingUtility TEST_UTIL; + + private static User ADMIN; + private static User READER; + private static User LIMITED; + private static User DENIED; + + private static byte[] TABLE = Bytes.toBytes("testtable"); + private static byte[] FAMILY = Bytes.toBytes("f1"); + private static byte[] PRIVATE_COL = Bytes.toBytes("private"); + private static byte[] PUBLIC_COL = Bytes.toBytes("public"); + + private static final byte[][] SPLIT_KEYS = new byte[][] { + Bytes.toBytes("ddd"), + Bytes.toBytes("ppp") + }; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + Configuration conf = TEST_UTIL.getConfiguration(); + SecureTestUtil.enableSecurity(conf); + String baseuser = User.getCurrent().getShortName(); + conf.set("hbase.superuser", conf.get("hbase.superuser", "") + + String.format(",%s.hfs.0,%s.hfs.1,%s.hfs.2", baseuser, baseuser, baseuser)); + TEST_UTIL.startMiniCluster(); + TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000); + + ADMIN = User.createUserForTesting(conf, "admin", new String[]{"supergroup"}); + READER = User.createUserForTesting(conf, "reader", new String[0]); + LIMITED = User.createUserForTesting(conf, "limited", new String[0]); + DENIED = User.createUserForTesting(conf, "denied", new String[0]); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testSimpleLoad() throws Exception { + runTest(this.getClass().getName()+"testSimpleLoad", StoreFile.BloomType.NONE, + new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }); + } + + private void runTest(String testName, StoreFile.BloomType bloomType, + byte[][][] hfileRanges) throws Exception { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + final Path dir = TEST_UTIL.getDataTestDir(testName).makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + + int hfileIdx = 0; + for (byte[][] range : hfileRanges) { + byte[] from = range[0]; + byte[] to = range[1]; + createHFile(TEST_UTIL.getConfiguration(), fs, new Path(familyDir, "" + + hfileIdx++), FAMILY, PUBLIC_COL, from, to, 1000); + } + int expectedRows = hfileIdx * 1000; + + final byte[] TABLE = Bytes.toBytes("mytable_"+testName); + + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + HTableDescriptor htd = new HTableDescriptor(TABLE); + HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY); + familyDesc.setBloomFilterType(bloomType); + htd.addFamily(familyDesc); + admin.createTable(htd, SPLIT_KEYS); + + final HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLE); + TEST_UTIL.waitTableAvailable(TABLE, 30000); + + //Bulk load + final HTable rootTable = new HTable(TEST_UTIL.getConfiguration(), "-ROOT-"); + final Token token = new Token(); + assertTrue( + READER.runAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + SecureBulkLoadProtocol prot = rootTable.coprocessorProxy(SecureBulkLoadProtocol.class, + HConstants.EMPTY_START_ROW); + return prot.secureCompleteBulkLoad(Bytes.toString(table.getTableName()), dir.toString(), token); + } + }) + ); + + assertEquals(expectedRows, TEST_UTIL.countRows(table)); + } + + /** + * Create an HFile with the given number of rows between a given + * start key and end key. + * TODO put me in an HFileTestUtil or something? + */ + private static void createHFile( + Configuration conf, + FileSystem fs, Path path, + byte[] family, byte[] qualifier, + byte[] startKey, byte[] endKey, int numRows) throws IOException { + HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)) + .withPath(fs, path) + .withComparator(KeyValue.KEY_COMPARATOR) + .create(); + long now = System.currentTimeMillis(); + try { + // subtract 2 since iterateOnSplits doesn't include boundary keys + for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) { + KeyValue kv = new KeyValue(key, family, qualifier, now, key); + writer.append(kv); + } + } finally { + writer.close(); + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 230d6b1..4b56e11 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; @@ -356,6 +357,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private ObjectName mxBean = null; /** + * ClusterId + */ + private ClusterId clusterId = null; + + /** * Starts a HRegionServer at the default location * * @param conf @@ -557,6 +563,12 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private void preRegistrationInitialization(){ try { initializeZooKeeper(); + + clusterId = new ClusterId(zooKeeper, this); + if(clusterId.hasId()) { + conf.set(HConstants.CLUSTER_ID, clusterId.getId()); + } + initializeThreads(); int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4); for (int i = 0; i < nbBlocks; i++) {