diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java new file mode 100644 index 0000000..4d09f73 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security.access; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ZeroCopyLiteralByteString; + +/** + * Utility client for doing access control admin operations. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AccessControlClient { + /** + * Grants permission on the specified table for the specified user + * @param conf + * @param tableName + * @param userName + * @param family + * @param qual + * @param actions + * @return GrantResponse + * @throws Throwable + */ + public static GrantResponse grant(Configuration conf, final TableName tableName, + final String userName, final byte[] family, final byte[] qual, + final AccessControlProtos.Permission.Action... actions) throws Throwable { + HTable ht = null; + try { + TableName aclTableName = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl"); + ht = new HTable(conf, aclTableName.getName()); + Batch.Call callable = + new Batch.Call() { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + + @Override + public GrantResponse call(AccessControlService service) throws IOException { + GrantRequest.Builder builder = GrantRequest.newBuilder(); + AccessControlProtos.Permission.Builder ret = + AccessControlProtos.Permission.newBuilder(); + AccessControlProtos.TablePermission.Builder permissionBuilder = + AccessControlProtos.TablePermission + .newBuilder(); + for (AccessControlProtos.Permission.Action a : actions) { + permissionBuilder.addAction(a); + } + permissionBuilder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + + if (family != null) { + permissionBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family)); + } + if (qual != null) { + permissionBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(qual)); + } + ret.setType(AccessControlProtos.Permission.Type.Table).setTablePermission( + permissionBuilder); + builder.setUserPermission(AccessControlProtos.UserPermission.newBuilder() + .setUser(ByteString.copyFromUtf8(userName)).setPermission(ret)); + service.grant(controller, builder.build(), rpcCallback); + return rpcCallback.get(); + } + }; + Map result = ht.coprocessorService(AccessControlService.class, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable); + return result.values().iterator().next(); // There will be exactly one + // region for labels + // table and so one entry in + // result Map. + } finally { + if (ht != null) { + ht.close(); + } + } + } + + /** + * Revokes the permission on the table + * @param conf + * @param username + * @param tableName + * @param family + * @param qualifier + * @param actions + * @return RevokeResponse + * @throws Throwable + */ + public static RevokeResponse revoke(Configuration conf, final String username, + final TableName tableName, final byte[] family, final byte[] qualifier, + final AccessControlProtos.Permission.Action... actions) throws Throwable { + HTable ht = null; + try { + TableName aclTableName = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, + "acl"); + ht = new HTable(conf, aclTableName.getName()); + Batch.Call callable = + new Batch.Call() { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + + @Override + public RevokeResponse call(AccessControlService service) throws IOException { + AccessControlProtos.Permission.Builder ret = + AccessControlProtos.Permission.newBuilder(); + AccessControlProtos.TablePermission.Builder permissionBuilder = + AccessControlProtos.TablePermission.newBuilder(); + for (AccessControlProtos.Permission.Action a : actions) { + permissionBuilder.addAction(a); + } + if (tableName != null) { + permissionBuilder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + } + if (family != null) { + permissionBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family)); + } + if (qualifier != null) { + permissionBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(qualifier)); + } + ret.setType(AccessControlProtos.Permission.Type.Table).setTablePermission( + permissionBuilder); + RevokeRequest builder = AccessControlProtos.RevokeRequest + .newBuilder() + .setUserPermission( + AccessControlProtos.UserPermission.newBuilder() + .setUser(ByteString.copyFromUtf8(username)).setPermission(ret)).build(); + service.revoke(controller, builder, rpcCallback); + return rpcCallback.get(); + } + }; + Map result = ht.coprocessorService(AccessControlService.class, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable); + return result.values().iterator().next(); + + } finally { + if (ht != null) { + ht.close(); + } + } + } +} diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java new file mode 100644 index 0000000..77d81d5 --- /dev/null +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.security.access.AccessController; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; +import org.apache.hadoop.util.ToolRunner; +import org.junit.experimental.categories.Category; +/** + * /** + * An Integration class for tests that does something with the cluster while running + * {@link LoadTestTool} to write and verify some data. + * Verifies whether cells for users with only WRITE permissions are not read back + * and cells with READ permissions are read back. + * Every operation happens in the user's specific context + */ +@Category(IntegrationTests.class) +public class IntegrationTestIngestWithACL extends IntegrationTestIngest { + + private static final char COLON = ':'; + public static final char HYPHEN = '-'; + private static final char COMMA = ','; + private static final int SPECIAL_PERM_CELL_INSERTION_FACTOR = 100; + public static final String[] userNames = { "user1", "user2", "user3", "user4" }; + + @Override + public void setUpCluster() throws Exception { + util = getTestingUtil(null); + Configuration conf = util.getConfiguration(); + conf.setInt(HFile.FORMAT_VERSION_KEY, 3); + conf.set("hbase.coprocessor.master.classes", AccessController.class.getName()); + conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()); + // conf.set("hbase.superuser", "admin"); + super.setUpCluster(); + } + + @Override + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys); + List tmp = new ArrayList(Arrays.asList(args)); + tmp.add(HYPHEN + LoadTestTool.OPT_GENERATOR); + StringBuilder sb = new StringBuilder(LoadTestDataGeneratorWithACL.class.getName()); + sb.append(COLON); + sb.append(asCommaSeperatedString(userNames)); + sb.append(COLON); + sb.append(Integer.toString(SPECIAL_PERM_CELL_INSERTION_FACTOR)); + tmp.add(sb.toString()); + return tmp.toArray(new String[tmp.size()]); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestIngestWithACL(), args); + System.exit(ret); + } + + private static String asCommaSeperatedString(String[] list) { + StringBuilder sb = new StringBuilder(); + for (String item : list) { + sb.append(item); + sb.append(COMMA); + } + if (sb.length() > 0) { + // Remove the trailing , + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java index 82204d1..e37258b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hbase.security.access; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,11 +37,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import java.io.*; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; /** * Performs authorization checks for a given user's assigned permissions diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/LoadTestDataGeneratorWithVisibilityLabels.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/LoadTestDataGeneratorWithVisibilityLabels.java index 417564c..aa74f2a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/LoadTestDataGeneratorWithVisibilityLabels.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/LoadTestDataGeneratorWithVisibilityLabels.java @@ -38,6 +38,7 @@ public class LoadTestDataGeneratorWithVisibilityLabels extends DefaultDataGenera @Override public void initialize(String[] args) { + super.initialize(args); if (args.length < 1 || args.length > 2) { throw new IllegalArgumentException("LoadTestDataGeneratorWithVisibilityLabels can have " + "1 or 2 initialization arguments"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGeneratorWithTags.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGeneratorWithTags.java index b7a5d30..5b06af8 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGeneratorWithTags.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestDataGeneratorWithTags.java @@ -45,6 +45,7 @@ public class LoadTestDataGeneratorWithTags extends DefaultDataGenerator { @Override public void initialize(String[] args) { + super.initialize(args); if (args.length != 4) { throw new IllegalArgumentException("LoadTestDataGeneratorWithTags must have " + "4 initialization arguments. ie. minNumTags:maxNumTags:minTagLength:maxTagLength"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 3116b93..f233c48 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -39,14 +39,18 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessControlClient; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; import org.apache.hadoop.util.ToolRunner; /** @@ -152,6 +156,8 @@ public class LoadTestTool extends AbstractHBaseTool { protected Compression.Algorithm compressAlgo; protected BloomType bloomType; private boolean inMemoryCF; + + private User userOwner; // Writer options protected int numWriterThreads = DEFAULT_NUM_THREADS; protected int minColsPerKey, maxColsPerKey; @@ -435,11 +441,14 @@ public class LoadTestTool extends AbstractHBaseTool { if (!isSkipInit) { initTestTable(); } - LoadTestDataGenerator dataGen = null; if (cmd.hasOption(OPT_GENERATOR)) { String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); + if(dataGen instanceof LoadTestDataGeneratorWithACL) { + LOG.info("ACL is on"); + userOwner = User.createUserForTesting(conf, "owner", new String[0]); + } String[] args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); dataGen.initialize(args); @@ -449,18 +458,50 @@ public class LoadTestTool extends AbstractHBaseTool { minColsPerKey, maxColsPerKey, COLUMN_FAMILY); } + if(userOwner != null) { + conf.set("hadoop.security.authorization", "false"); + conf.set("hadoop.security.authentication", "simple"); + LOG.info("Granting permission for the user " + userOwner.getShortName()); + HTable table = new HTable(conf, tableName); + AccessControlProtos.Permission.Action[] actions = { + AccessControlProtos.Permission.Action.ADMIN, + AccessControlProtos.Permission.Action.CREATE, AccessControlProtos.Permission.Action.READ, + AccessControlProtos.Permission.Action.WRITE }; + + try { + AccessControlClient.grant(conf, table.getName(), userOwner.getShortName(), COLUMN_FAMILY, + null, actions); + } catch (Throwable e) { + LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e); + return EXIT_FAILURE; + } + } + if (isWrite) { - writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); + if (userOwner != null) { + writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); + } else { + writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); + } writerThreads.setMultiPut(isMultiPut); } if (isUpdate) { - updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); + if (userOwner != null) { + updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, + userOwner); + } else { + updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); + } updaterThreads.setBatchUpdate(isBatchUpdate); } if (isRead) { - readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); + if (userOwner != null) { + readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent); + } else { + readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); + } readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); } @@ -533,7 +574,7 @@ public class LoadTestTool extends AbstractHBaseTool { } } - static byte[] generateData(final Random r, int length) { + public static byte[] generateData(final Random r, int length) { byte [] b = new byte [length]; int i = 0; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index d6a2570..0edeea7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -37,11 +37,11 @@ public class MultiThreadedReader extends MultiThreadedAction { private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class); - private Set readers = new HashSet(); + protected Set readers = new HashSet(); private final double verifyPercent; private volatile boolean aborted; - private MultiThreadedWriterBase writer = null; + protected MultiThreadedWriterBase writer = null; /** * The number of keys verified in a sequence. This will never be larger than @@ -65,8 +65,9 @@ public class MultiThreadedReader extends MultiThreadedAction public static final int DEFAULT_KEY_WINDOW = 0; protected AtomicLong numKeysVerified = new AtomicLong(0); - private AtomicLong numReadErrors = new AtomicLong(0); - private AtomicLong numReadFailures = new AtomicLong(0); + protected AtomicLong numReadErrors = new AtomicLong(0); + protected AtomicLong numReadFailures = new AtomicLong(0); + protected AtomicLong nullResult = new AtomicLong(0); private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; @@ -97,22 +98,26 @@ public class MultiThreadedReader extends MultiThreadedAction LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); } + addReaderThreads(numThreads); + startThreads(readers); + } + + protected void addReaderThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseReaderThread reader = new HBaseReaderThread(i); readers.add(reader); } - startThreads(readers); } public class HBaseReaderThread extends Thread { - private final int readerId; - private final HTable table; + protected final int readerId; + protected final HTable table; /** The "current" key being read. Increases from startKey to endKey. */ private long curKey; /** Time when the thread started */ - private long startTimeMs; + protected long startTimeMs; /** If we are ahead of the writer and reading a random key. */ private boolean readingRandomKey; @@ -123,21 +128,31 @@ public class MultiThreadedReader extends MultiThreadedAction */ public HBaseReaderThread(int readerId) throws IOException { this.readerId = readerId; - table = new HTable(conf, tableName); + table = createTable(); setName(getClass().getSimpleName() + "_" + readerId); } + protected HTable createTable() throws IOException { + return new HTable(conf, tableName); + } + @Override public void run() { try { runReader(); } finally { - try { + closeTable(); + numThreadsWorking.decrementAndGet(); + } + } + + protected void closeTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); } } @@ -238,7 +253,7 @@ public class MultiThreadedReader extends MultiThreadedAction if (verbose) { LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } - queryKey(get, RandomUtils.nextInt(100) < verifyPercent); + queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead); } catch (IOException e) { numReadFailures.addAndGet(1); LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") @@ -248,12 +263,18 @@ public class MultiThreadedReader extends MultiThreadedAction return get; } - public void queryKey(Get get, boolean verify) throws IOException { + public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { String rowKey = Bytes.toString(get.getRow()); // read the data long start = System.currentTimeMillis(); Result result = table.get(get); + getResultMetricUpdation(verify, rowKey, start, result, table, false); + } + + protected void getResultMetricUpdation(boolean verify, String rowKey, long start, + Result result, HTable table, boolean isNullExpected) + throws IOException { totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); numKeys.addAndGet(1); if (!result.isEmpty()) { @@ -265,9 +286,13 @@ public class MultiThreadedReader extends MultiThreadedAction Bytes.toBytes(rowKey)); LOG.info("Key = " + rowKey + ", RegionServer: " + hloc.getHostname()); + if(isNullExpected) { + nullResult.incrementAndGet(); + LOG.debug("Null result obtained for the key ="+rowKey); + return; + } } - - boolean isOk = verifyResultAgainstDataGenerator(result, verify); + boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); long numErrorsAfterThis = 0; if (isOk) { long cols = 0; @@ -306,12 +331,17 @@ public class MultiThreadedReader extends MultiThreadedAction return numUniqueKeysVerified.get(); } + public long getNullResultsCount() { + return nullResult.get(); + } + @Override protected String progressInfo() { StringBuilder sb = new StringBuilder(); appendToStatus(sb, "verified", numKeysVerified.get()); appendToStatus(sb, "READ FAILURES", numReadFailures.get()); appendToStatus(sb, "READ ERRORS", numReadErrors.get()); + appendToStatus(sb, "NULL RESULT", nullResult.get()); return sb.toString(); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java new file mode 100644 index 0000000..b71ff6d --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * A MultiThreadReader that helps to work with ACL + */ +public class MultiThreadedReaderWithACL extends MultiThreadedReader { + private static final Log LOG = LogFactory.getLog(MultiThreadedReaderWithACL.class); + + private static final String COMMA = ","; + /** + * Maps user with Table instance. Because the table instance has to be created + * per user inorder to work in that user's context + */ + private Map userVsTable = new HashMap(); + private Map users = new HashMap(); + private String[] userNames; + + public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double verifyPercent) { + super(dataGen, conf, tableName, verifyPercent); + userNames = dataGenerator.getArgs()[0].split(COMMA); + } + + @Override + protected void addReaderThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseReaderThread reader = new HBaseReaderThreadWithACL(i); + readers.add(reader); + } + } + + public class HBaseReaderThreadWithACL extends HBaseReaderThread { + + public HBaseReaderThreadWithACL(int readerId) throws IOException { + super(readerId); + } + + @Override + protected HTable createTable() throws IOException { + return null; + } + + @Override + protected void closeTable() { + for (HTable table : userVsTable.values()) { + try { + table.close(); + } catch (Exception e) { + LOG.error("Error while closing the table " + table.getName(), e); + } + } + } + + @Override + public void queryKey(final Get get, final boolean verify, final long keyToRead) + throws IOException { + final String rowKey = Bytes.toString(get.getRow()); + + // read the data + final long start = System.currentTimeMillis(); + PrivilegedExceptionAction action = new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + HTable localTable = null; + try { + get.setACLStrategy(true); + Result result = null; + int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[1]); + int mod = ((int) keyToRead % userNames.length); + if (userVsTable.get(userNames[mod]) == null) { + localTable = new HTable(conf, tableName); + userVsTable.put(userNames[mod], localTable); + result = localTable.get(get); + } else { + localTable = userVsTable.get(userNames[mod]); + result = localTable.get(get); + } + boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0); + LOG.info("Read happening from ACL " + isNullExpected); + getResultMetricUpdation(verify, rowKey, start, result, localTable, isNullExpected); + } catch (IOException e) { + recordFailure(keyToRead); + } + return null; + } + }; + if (userNames != null && userNames.length > 0) { + int mod = ((int) keyToRead % userNames.length); + User user; + if(!users.containsKey(userNames[mod])) { + UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); + user = User.create(realUserUgi); + users.put(userNames[mod], user); + } else { + user = users.get(userNames[mod]); + } + try { + user.runAs(action); + } catch (Exception e) { + recordFailure(keyToRead); + } + } + } + + private void recordFailure(final long keyToRead) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", " + + "time from start: " + (System.currentTimeMillis() - startTimeMs) + " ms"); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java index 30a7116..ed19ccf 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java @@ -56,7 +56,7 @@ import com.google.common.base.Preconditions; public class MultiThreadedUpdater extends MultiThreadedWriterBase { private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class); - private Set updaters = new HashSet(); + protected Set updaters = new HashSet(); private MultiThreadedWriterBase writer = null; private boolean isBatchUpdate = false; @@ -86,12 +86,16 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { LOG.debug("Updating keys [" + startKey + ", " + endKey + ")"); } + addUpdaterThreads(numThreads); + + startThreads(updaters); + } + + protected void addUpdaterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseUpdaterThread updater = new HBaseUpdaterThread(i); updaters.add(updater); } - - startThreads(updaters); } private long getNextKeyToUpdate() { @@ -115,12 +119,16 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { } } - private class HBaseUpdaterThread extends Thread { - private final HTable table; + protected class HBaseUpdaterThread extends Thread { + protected final HTable table; public HBaseUpdaterThread(int updaterId) throws IOException { setName(getClass().getSimpleName() + "_" + updaterId); - table = new HTable(conf, tableName); + table = createTable(); + } + + protected HTable createTable() throws IOException { + return new HTable(conf, tableName); } public void run() { @@ -151,67 +159,73 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { numCols.addAndGet(1); app = new Append(rowKey); } - Result result = null; + Get get = new Get(rowKey); + get.addFamily(cf); try { - Get get = new Get(rowKey); - get.addFamily(cf); get = dataGenerator.beforeGet(rowKeyBase, get); - result = table.get(get); - } catch (IOException ie) { - LOG.warn("Failed to get the row for key = [" - + rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie); + } catch (Exception e) { + // Ideally wont happen + LOG.warn("Failed to modify the get from the load generator = [" + get.getRow() + + "], column family = [" + Bytes.toString(cf) + "]", e); } + Result result = getRow(get, rowKeyBase, cf); Map columnValues = result != null ? result.getFamilyMap(cf) : null; if (columnValues == null) { - failedKeySet.add(rowKeyBase); - LOG.error("Failed to update the row with key = [" - + rowKey + "], since we could not get the original row"); - } - for (byte[] column : columnValues.keySet()) { - if (Bytes.equals(column, INCREMENT) - || Bytes.equals(column, MUTATE_INFO)) { - continue; - } - MutationType mt = MutationType.valueOf( - RandomUtils.nextInt(MutationType.values().length)); - long columnHash = Arrays.hashCode(column); - long hashCode = cfHash + columnHash; - byte[] hashCodeBytes = Bytes.toBytes(hashCode); - byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; - if (hashCode % 2 == 0) { - Cell kv = result.getColumnLatestCell(cf, column); - checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; - Preconditions.checkNotNull(checkedValue, - "Column value to be checked should not be null"); + int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[1]); + if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) { + LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey)); + } else { + failedKeySet.add(rowKeyBase); + LOG.error("Failed to update the row with key = [" + rowKey + + "], since we could not get the original row"); } - buf.setLength(0); // Clear the buffer - buf.append("#").append(Bytes.toString(column)).append(":"); - ++columnCount; - switch (mt) { - case PUT: - Put put = new Put(rowKey); - put.add(cf, column, hashCodeBytes); - mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); - buf.append(MutationType.PUT.getNumber()); - break; - case DELETE: - Delete delete = new Delete(rowKey); - // Delete all versions since a put - // could be called multiple times if CM is used - delete.deleteColumns(cf, column); - mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); - buf.append(MutationType.DELETE.getNumber()); - break; - default: - buf.append(MutationType.APPEND.getNumber()); - app.add(cf, column, hashCodeBytes); - } - app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); - if (!isBatchUpdate) { - mutate(table, app, rowKeyBase); - numCols.addAndGet(1); - app = new Append(rowKey); + } + if(columnValues != null) { + for (byte[] column : columnValues.keySet()) { + if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) { + continue; + } + MutationType mt = MutationType + .valueOf(RandomUtils.nextInt(MutationType.values().length)); + long columnHash = Arrays.hashCode(column); + long hashCode = cfHash + columnHash; + byte[] hashCodeBytes = Bytes.toBytes(hashCode); + byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; + if (hashCode % 2 == 0) { + Cell kv = result.getColumnLatestCell(cf, column); + checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; + Preconditions.checkNotNull(checkedValue, + "Column value to be checked should not be null"); + } + buf.setLength(0); // Clear the buffer + buf.append("#").append(Bytes.toString(column)).append(":"); + ++columnCount; + switch (mt) { + case PUT: + Put put = new Put(rowKey); + put.add(cf, column, hashCodeBytes); + mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.PUT.getNumber()); + break; + case DELETE: + Delete delete = new Delete(rowKey); + // Delete all versions since a put + // could be called multiple times if CM is used + delete.deleteColumns(cf, column); + mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.DELETE.getNumber()); + break; + default: + buf.append(MutationType.APPEND.getNumber()); + app.add(cf, column, hashCodeBytes); + } + app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); + if (!isBatchUpdate) { + mutate(table, app, rowKeyBase); + numCols.addAndGet(1); + app = new Append(rowKey); + } } } } @@ -230,12 +244,72 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { } } } finally { - try { + closeHTable(); + numThreadsWorking.decrementAndGet(); + } + } + + protected void closeHTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + } + + protected Result getRow(Get get, long rowKeyBase, byte[] cf) { + Result result = null; + try { + result = table.get(get); + } catch (IOException ie) { + LOG.warn( + "Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + return result; + } + + public void mutate(HTable table, Mutation m, long keyBase) { + mutate(table, m, keyBase, null, null, null, null); + } + + public void mutate(HTable table, Mutation m, + long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) { + long start = System.currentTimeMillis(); + try { + m = dataGenerator.beforeMutate(keyBase, m); + if (m instanceof Increment) { + table.increment((Increment)m); + } else if (m instanceof Append) { + table.append((Append)m); + } else if (m instanceof Put) { + table.checkAndPut(row, cf, q, v, (Put)m); + } else if (m instanceof Delete) { + table.checkAndDelete(row, cf, q, v, (Delete)m); + } else { + throw new IllegalArgumentException( + "unsupported mutation " + m.getClass().getSimpleName()); + } + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to mutate: " + keyBase + " after " + + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + + exceptionInfo); } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java new file mode 100644 index 0000000..996bf55 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +/** + * A MultiThreadUpdater that helps to work with ACL + */ +public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater { + private static final Log LOG = LogFactory.getLog(MultiThreadedUpdaterWithACL.class); + private final static String COMMA= ","; + private User userOwner; + /** + * Maps user with Table instance. Because the table instance has to be created + * per user inorder to work in that user's context + */ + private Map userVsTable = new HashMap(); + private Map users = new HashMap(); + private String[] userNames; + + public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double updatePercent, User userOwner) { + super(dataGen, conf, tableName, updatePercent); + this.userOwner = userOwner; + userNames = dataGenerator.getArgs()[0].split(COMMA); + } + + @Override + protected void addUpdaterThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i); + updaters.add(updater); + } + } + + public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread { + + private HTable table; + private MutateAccessAction mutateAction = new MutateAccessAction(); + + public HBaseUpdaterThreadWithACL(int updaterId) throws IOException { + super(updaterId); + } + + @Override + protected HTable createTable() throws IOException { + return null; + } + + @Override + protected void closeHTable() { + try { + if (table != null) { + table.close(); + } + for (HTable table : userVsTable.values()) { + try { + table.close(); + } catch (Exception e) { + LOG.error("Error while closing the table " + table.getName(), e); + } + } + } catch (Exception e) { + LOG.error("Error while closing the HTable "+table.getName(), e); + } + } + + @Override + protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) { + PrivilegedExceptionAction action = new PrivilegedExceptionAction() { + + @Override + public Object run() throws Exception { + Result res = null; + HTable localTable = null; + try { + int mod = ((int) rowKeyBase % userNames.length); + if (userVsTable.get(userNames[mod]) == null) { + localTable = new HTable(conf, tableName); + userVsTable.put(userNames[mod], localTable); + res = localTable.get(get); + } else { + localTable = userVsTable.get(userNames[mod]); + res = localTable.get(get); + } + } catch (IOException ie) { + LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + return res; + } + }; + + if (userNames != null && userNames.length > 0) { + int mod = ((int) rowKeyBase % userNames.length); + User user; + if(!users.containsKey(userNames[mod])) { + UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); + user = User.create(realUserUgi); + users.put(userNames[mod], user); + } else { + user = users.get(userNames[mod]); + } + try { + Result result = (Result) user.runAs(action); + return result; + } catch (Exception ie) { + LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + } + // This means that no users were present + return null; + } + + @Override + public void mutate(final HTable table, Mutation m, final long keyBase, final byte[] row, + final byte[] cf, final byte[] q, final byte[] v) { + final long start = System.currentTimeMillis(); + try { + m = dataGenerator.beforeMutate(keyBase, m); + mutateAction.setMutation(m); + mutateAction.setCF(cf); + mutateAction.setRow(row); + mutateAction.setQualifier(q); + mutateAction.setValue(v); + mutateAction.setStartTime(start); + mutateAction.setKeyBase(keyBase); + userOwner.runAs(mutateAction); + } catch (IOException e) { + recordFailure(m, keyBase, start, e); + } catch (InterruptedException e) { + failedKeySet.add(keyBase); + } + } + + class MutateAccessAction implements PrivilegedExceptionAction { + private HTable table; + private long start; + private Mutation m; + private long keyBase; + private byte[] row; + private byte[] cf; + private byte[] q; + private byte[] v; + + public MutateAccessAction() { + + } + + public void setStartTime(final long start) { + this.start = start; + } + + public void setMutation(final Mutation m) { + this.m = m; + } + + public void setRow(final byte[] row) { + this.row = row; + } + + public void setCF(final byte[] cf) { + this.cf = cf; + } + + public void setQualifier(final byte[] q) { + this.q = q; + } + + public void setValue(final byte[] v) { + this.v = v; + } + + public void setKeyBase(final long keyBase) { + this.keyBase = keyBase; + } + + @Override + public Object run() throws Exception { + try { + if (table == null) { + table = new HTable(conf, tableName); + } + if (m instanceof Increment) { + table.increment((Increment) m); + } else if (m instanceof Append) { + table.append((Append) m); + } else if (m instanceof Put) { + table.checkAndPut(row, cf, q, v, (Put) m); + } else if (m instanceof Delete) { + table.checkAndDelete(row, cf, q, v, (Delete) m); + } else { + throw new IllegalArgumentException("unsupported mutation " + + m.getClass().getSimpleName()); + } + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + recordFailure(m, keyBase, start, e); + } + return null; + } + } + + private void recordFailure(final Mutation m, final long keyBase, + final long start, IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + + exceptionInfo); + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java index 9553f03..bfe3ebd 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -33,15 +35,17 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; /** Creates multiple threads that write key/values into the */ public class MultiThreadedWriter extends MultiThreadedWriterBase { private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); - private Set writers = new HashSet(); + protected Set writers = new HashSet(); - private boolean isMultiPut = false; + protected boolean isMultiPut = false; public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName) { @@ -61,20 +65,28 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); } + createWriterThreads(numThreads); + + startThreads(writers); + } + + protected void createWriterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseWriterThread writer = new HBaseWriterThread(i); writers.add(writer); } - - startThreads(writers); } - private class HBaseWriterThread extends Thread { + public class HBaseWriterThread extends Thread { private final HTable table; public HBaseWriterThread(int writerId) throws IOException { setName(getClass().getSimpleName() + "_" + writerId); - table = new HTable(conf, tableName); + table = createTable(); + } + + protected HTable createTable() throws IOException { + return new HTable(conf, tableName); } public void run() { @@ -119,12 +131,42 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { } } } finally { - try { + closeHTable(); + numThreadsWorking.decrementAndGet(); + } + } + + public void insert(HTable table, Put put, long keyBase) { + long start = System.currentTimeMillis(); + try { + put = (Put) dataGenerator.beforeMutate(keyBase, put); + table.put(put); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + + "; errors: " + exceptionInfo); + } + } + protected void closeHTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java index 4bb873f..9373e6f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; @@ -36,10 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; -import org.apache.hadoop.util.StringUtils; /** Creates multiple threads that write key/values into the */ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { @@ -171,31 +166,6 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { return failedKeySet.size(); } - public void insert(HTable table, Put put, long keyBase) { - long start = System.currentTimeMillis(); - try { - put = (Put) dataGenerator.beforeMutate(keyBase, put); - table.put(put); - totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); - } catch (IOException e) { - failedKeySet.add(keyBase); - String exceptionInfo; - if (e instanceof RetriesExhaustedWithDetailsException) { - RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; - exceptionInfo = aggEx.getExhaustiveDescription(); - } else { - StringWriter stackWriter = new StringWriter(); - PrintWriter pw = new PrintWriter(stackWriter); - e.printStackTrace(pw); - pw.flush(); - exceptionInfo = StringUtils.stringifyException(e); - } - LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + - "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " - + exceptionInfo); - } - } - /** * The max key until which all keys have been inserted/updated (successfully or not). * @return the last key that we have inserted/updated all keys up to (inclusive) diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java new file mode 100644 index 0000000..8ab3560 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; + +/** + * MultiThreadedWriter that helps in testing ACL + */ +public class MultiThreadedWriterWithACL extends MultiThreadedWriter { + + private static final Log LOG = LogFactory.getLog(MultiThreadedWriterWithACL.class); + private User userOwner; + + public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, User userOwner) { + super(dataGen, conf, tableName); + this.userOwner = userOwner; + } + + @Override + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); + } + + @Override + protected void createWriterThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseWriterThread writer = new HBaseWriterThreadWithACL(i); + writers.add(writer); + } + } + + public class HBaseWriterThreadWithACL extends HBaseWriterThread { + + private HTable table; + private WriteAccessAction writerAction = new WriteAccessAction(); + + public HBaseWriterThreadWithACL(int writerId) throws IOException { + super(writerId); + } + + @Override + protected HTable createTable() throws IOException { + return null; + } + + @Override + protected void closeHTable() { + if (table != null) { + try { + table.close(); + } catch (Exception e) { + LOG.error("Error in closing the table "+table.getName(), e); + } + } + } + + @Override + public void insert(final HTable table, Put put, final long keyBase) { + final long start = System.currentTimeMillis(); + try { + put = (Put) dataGenerator.beforeMutate(keyBase, put); + writerAction.setPut(put); + writerAction.setKeyBase(keyBase); + writerAction.setStartTime(start); + userOwner.runAs(writerAction); + } catch (IOException e) { + recordFailure(table, put, keyBase, start, e); + } catch (InterruptedException e) { + failedKeySet.add(keyBase); + } + } + + class WriteAccessAction implements PrivilegedExceptionAction { + private Put put; + private long keyBase; + private long start; + + public WriteAccessAction() { + } + + public void setPut(final Put put) { + this.put = put; + } + + public void setKeyBase(final long keyBase) { + this.keyBase = keyBase; + } + + public void setStartTime(final long start) { + this.start = start; + } + + @Override + public Object run() throws Exception { + try { + if (table == null) { + table = new HTable(conf, tableName); + } + table.put(put); + } catch (IOException e) { + recordFailure(table, put, keyBase, start, e); + } + return null; + } + } + } + + private void recordFailure(final HTable table, final Put put, final long keyBase, + final long start, IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + + exceptionInfo); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java index d267647..51d2c4a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.commons.cli.CommandLine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java index aed7edd..80948c7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java @@ -25,8 +25,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java index a526e10..56c0f09 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java @@ -39,6 +39,8 @@ public abstract class LoadTestDataGenerator { // which can be incremented later on during updates. public final static byte[] INCREMENT = "increment".getBytes(); + protected String[] args; + public LoadTestDataGenerator() { } @@ -56,10 +58,12 @@ public abstract class LoadTestDataGenerator { /** * initialize the LoadTestDataGenerator - * @param args init args + * + * @param args + * init args */ public void initialize(String[] args) { - + this.args = args; } /** @@ -134,4 +138,12 @@ public abstract class LoadTestDataGenerator { public Get beforeGet(long rowkeyBase, Get get) throws IOException { return get; } + + /** + * Return the arguments passed to the generator as list of object + * @return + */ + public String[] getArgs() { + return this.args; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGeneratorWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGeneratorWithACL.java new file mode 100644 index 0000000..b2988cf --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGeneratorWithACL.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util.test; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.util.MultiThreadedAction.DefaultDataGenerator; + +@InterfaceAudience.Private +public class LoadTestDataGeneratorWithACL extends DefaultDataGenerator { + private static final Log LOG = LogFactory.getLog(LoadTestDataGeneratorWithACL.class); + private String[] userNames = null; + private static final String COMMA = ","; + private int specialPermCellInsertionFactor = 100; + + public LoadTestDataGeneratorWithACL(int minValueSize, int maxValueSize, int minColumnsPerKey, + int maxColumnsPerKey, byte[]... columnFamilies) { + super(minValueSize, maxValueSize, minColumnsPerKey, maxColumnsPerKey, columnFamilies); + } + + @Override + public void initialize(String[] args) { + super.initialize(args); + if (args.length != 2) { + throw new IllegalArgumentException( + "LoadTestDataGeneratorWithACL can have " + + "1st arguement which would be the user list and the 2nd argument " + + "should be the factor representing " + + "the row keys for which only write ACLs will be added."); + } + String temp = args[0]; + // This will be comma separated list of expressions. + this.userNames = temp.split(COMMA); + this.specialPermCellInsertionFactor = Integer.parseInt(args[1]); + } + + @Override + public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException { + if (!(m instanceof Delete)) { + if (userNames != null && userNames.length > 0) { + int mod = ((int) rowkeyBase % this.userNames.length); + if (((int) rowkeyBase % specialPermCellInsertionFactor) == 0) { + // These cells cannot be read back when running as user userName[mod] + if (LOG.isTraceEnabled()) { + LOG.trace("Adding special perm " + rowkeyBase); + } + m.setACL(userNames[mod], new Permission(Permission.Action.WRITE)); + } else { + m.setACL(userNames[mod], new Permission(Permission.Action.READ)); + } + } + } + return m; + } +}