diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java new file mode 100644 index 0000000..ecb216d --- /dev/null +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.security.access.AccessController; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.test.LoadTestGeneratorWithACL; +import org.apache.hadoop.util.ToolRunner; +import org.junit.experimental.categories.Category; + +@Category(IntegrationTests.class) +public class IntegrationTestIngestWithACL extends IntegrationTestIngest { + + private static final char COLON = ':'; + public static final char HYPHEN = '-'; + private static final char COMMA = ','; + public static final String[] userNames = { "user1", "user2", "user3", "user4" }; + + @Override + public void setUpCluster() throws Exception { + util = getTestingUtil(null); + Configuration conf = util.getConfiguration(); + conf.setInt(HFile.FORMAT_VERSION_KEY, 3); + conf.set("hbase.coprocessor.master.classes", AccessController.class.getName()); + conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()); + // conf.set("hbase.superuser", "admin"); + super.setUpCluster(); + } + + @Override + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys); + List tmp = new ArrayList(Arrays.asList(args)); + tmp.add(HYPHEN + LoadTestTool.OPT_GENERATOR); + StringBuilder sb = new StringBuilder(LoadTestGeneratorWithACL.class.getName()); + sb.append(COLON); + sb.append(asCommaSeperatedString(userNames)); + tmp.add(sb.toString()); + return tmp.toArray(new String[tmp.size()]); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestIngestWithACL(), args); + System.exit(ret); + } + + private static String asCommaSeperatedString(String[] list) { + StringBuilder sb = new StringBuilder(); + for (String item : list) { + sb.append(item); + sb.append(COMMA); + } + if (sb.length() > 0) { + // Remove the trailing , + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } +} diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java index e428cb2..4cbe5da 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java @@ -28,6 +28,7 @@ import org.junit.experimental.categories.Category; @Category(IntegrationTests.class) public class IntegrationTestIngestWithTags extends IntegrationTestIngest { + public static final char HIPHEN = '-'; private static final char COLON = ':'; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java new file mode 100644 index 0000000..62fbba2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security.access; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ZeroCopyLiteralByteString; + +/** + * Utility client for doing access control admin operations. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AccessControlClient { + public static GrantResponse grant(Configuration conf, final HTable table, final String userName, + final byte[] family, final byte[] qual, + final AccessControlProtos.Permission.Action... actions) throws Throwable { + HTable ht = null; + try { + ht = new HTable(conf, AccessControlLists.ACL_TABLE_NAME.getName()); + Batch.Call callable = new Batch.Call() { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = new BlockingRpcCallback(); + + @Override + public GrantResponse call(AccessControlService service) throws IOException { + GrantRequest.Builder builder = GrantRequest.newBuilder(); + AccessControlProtos.Permission.Builder ret = AccessControlProtos.Permission.newBuilder(); + AccessControlProtos.TablePermission.Builder permissionBuilder = AccessControlProtos.TablePermission + .newBuilder(); + for (AccessControlProtos.Permission.Action a : actions) { + permissionBuilder.addAction(a); + } + if (table.getTableName() == null) { + throw new NullPointerException("TableName cannot be null"); + } + permissionBuilder.setTableName(ProtobufUtil.toProtoTableName(table.getName())); + + if (family != null) { + permissionBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family)); + } + if (qual != null) { + permissionBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(qual)); + } + ret.setType(AccessControlProtos.Permission.Type.Table).setTablePermission( + permissionBuilder); + builder.setUserPermission(AccessControlProtos.UserPermission.newBuilder() + .setUser(ByteString.copyFromUtf8(userName)).setPermission(ret)); + service.grant(controller, builder.build(), rpcCallback); + return rpcCallback.get(); + } + }; + Map result = ht.coprocessorService(AccessControlService.class, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable); + return result.values().iterator().next(); // There will be exactly one + // region for labels + // table and so one entry in + // result Map. + } finally { + if (ht != null) { + try { + ht.close(); + } catch (IOException e) { + throw e; + } + } + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java index 82204d1..e37258b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hbase.security.access; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,11 +37,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import java.io.*; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; /** * Performs authorization checks for a given user's assigned permissions diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 3116b93..420c337 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -39,14 +39,18 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessControlClient; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.hbase.util.test.LoadTestGeneratorWithACL; import org.apache.hadoop.util.ToolRunner; /** @@ -152,6 +156,8 @@ public class LoadTestTool extends AbstractHBaseTool { protected Compression.Algorithm compressAlgo; protected BloomType bloomType; private boolean inMemoryCF; + + private User userOwner; // Writer options protected int numWriterThreads = DEFAULT_NUM_THREADS; protected int minColsPerKey, maxColsPerKey; @@ -176,6 +182,9 @@ public class LoadTestTool extends AbstractHBaseTool { protected boolean isSkipInit = false; protected boolean isInitOnly = false; + protected boolean isACL = false; + protected boolean grantPermission = false; + protected Cipher cipher = null; protected String[] splitColonSeparated(String option, @@ -435,11 +444,15 @@ public class LoadTestTool extends AbstractHBaseTool { if (!isSkipInit) { initTestTable(); } - LoadTestDataGenerator dataGen = null; if (cmd.hasOption(OPT_GENERATOR)) { String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); + if(dataGen instanceof LoadTestGeneratorWithACL) { + LOG.info("ACL is on"); + userOwner = User.createUserForTesting(conf, "owner", new String[0]); + isACL = true; + } String[] args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); dataGen.initialize(args); @@ -449,20 +462,56 @@ public class LoadTestTool extends AbstractHBaseTool { minColsPerKey, maxColsPerKey, COLUMN_FAMILY); } + if(isACL) { + if (!grantPermission) { + conf.set("hadoop.security.authorization", "false"); + conf.set("hadoop.security.authentication", "simple"); + LOG.info("Granting permission for the user " + userOwner.getShortName()); + HTable table = new HTable(conf, tableName); + AccessControlProtos.Permission.Action[] actions = { + AccessControlProtos.Permission.Action.ADMIN, + AccessControlProtos.Permission.Action.CREATE, + AccessControlProtos.Permission.Action.READ, AccessControlProtos.Permission.Action.WRITE }; + + try { + AccessControlClient.grant(conf, table, userOwner.getShortName(), COLUMN_FAMILY, null, actions); + } catch (Throwable e) { + LOG.error("Error in granting permission for the user " + userOwner.getShortName(), e); + } + grantPermission = true; + } + } + if (isWrite) { - writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); - writerThreads.setMultiPut(isMultiPut); + if (isACL) { + writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); + writerThreads.setMultiPut(isMultiPut); + } else { + writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); + writerThreads.setMultiPut(isMultiPut); + } } if (isUpdate) { - updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); - updaterThreads.setBatchUpdate(isBatchUpdate); + if (isACL) { + updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, userOwner); + updaterThreads.setBatchUpdate(isBatchUpdate); + } else { + updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); + updaterThreads.setBatchUpdate(isBatchUpdate); + } } if (isRead) { - readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); - readerThreads.setMaxErrors(maxReadErrors); - readerThreads.setKeyWindow(keyWindow); + if (isACL) { + readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent); + readerThreads.setMaxErrors(maxReadErrors); + readerThreads.setKeyWindow(keyWindow); + } else { + readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); + readerThreads.setMaxErrors(maxReadErrors); + readerThreads.setKeyWindow(keyWindow); + } } if (isUpdate && isWrite) { @@ -533,7 +582,7 @@ public class LoadTestTool extends AbstractHBaseTool { } } - static byte[] generateData(final Random r, int length) { + public static byte[] generateData(final Random r, int length) { byte [] b = new byte [length]; int i = 0; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index d6a2570..a7bfe97 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -37,11 +37,11 @@ public class MultiThreadedReader extends MultiThreadedAction { private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class); - private Set readers = new HashSet(); + protected Set readers = new HashSet(); private final double verifyPercent; private volatile boolean aborted; - private MultiThreadedWriterBase writer = null; + protected MultiThreadedWriterBase writer = null; /** * The number of keys verified in a sequence. This will never be larger than @@ -65,8 +65,9 @@ public class MultiThreadedReader extends MultiThreadedAction public static final int DEFAULT_KEY_WINDOW = 0; protected AtomicLong numKeysVerified = new AtomicLong(0); - private AtomicLong numReadErrors = new AtomicLong(0); - private AtomicLong numReadFailures = new AtomicLong(0); + protected AtomicLong numReadErrors = new AtomicLong(0); + protected AtomicLong numReadFailures = new AtomicLong(0); + protected AtomicLong nullResult = new AtomicLong(0); private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; @@ -97,22 +98,26 @@ public class MultiThreadedReader extends MultiThreadedAction LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); } + addReaderThreads(numThreads); + startThreads(readers); + } + + protected void addReaderThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseReaderThread reader = new HBaseReaderThread(i); readers.add(reader); } - startThreads(readers); } public class HBaseReaderThread extends Thread { - private final int readerId; - private final HTable table; + protected final int readerId; + protected final HTable table; /** The "current" key being read. Increases from startKey to endKey. */ private long curKey; /** Time when the thread started */ - private long startTimeMs; + protected long startTimeMs; /** If we are ahead of the writer and reading a random key. */ private boolean readingRandomKey; @@ -123,21 +128,31 @@ public class MultiThreadedReader extends MultiThreadedAction */ public HBaseReaderThread(int readerId) throws IOException { this.readerId = readerId; - table = new HTable(conf, tableName); + table = createTableInstance(); setName(getClass().getSimpleName() + "_" + readerId); } + protected HTable createTableInstance() throws IOException { + return new HTable(conf, tableName); + } + @Override public void run() { try { runReader(); } finally { - try { + closeTable(); + numThreadsWorking.decrementAndGet(); + } + } + + protected void closeTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); } } @@ -238,7 +253,7 @@ public class MultiThreadedReader extends MultiThreadedAction if (verbose) { LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } - queryKey(get, RandomUtils.nextInt(100) < verifyPercent); + queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead); } catch (IOException e) { numReadFailures.addAndGet(1); LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") @@ -248,12 +263,18 @@ public class MultiThreadedReader extends MultiThreadedAction return get; } - public void queryKey(Get get, boolean verify) throws IOException { + public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { String rowKey = Bytes.toString(get.getRow()); // read the data long start = System.currentTimeMillis(); Result result = table.get(get); + getResultMetricUpdation(verify, rowKey, start, result, table, false); + } + + protected void getResultMetricUpdation(boolean verify, String rowKey, long start, + Result result, HTable table, boolean isNullExpected) + throws IOException { totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); numKeys.addAndGet(1); if (!result.isEmpty()) { @@ -265,9 +286,13 @@ public class MultiThreadedReader extends MultiThreadedAction Bytes.toBytes(rowKey)); LOG.info("Key = " + rowKey + ", RegionServer: " + hloc.getHostname()); + if(isNullExpected) { + nullResult.incrementAndGet(); + LOG.debug("Null result obtained for the key ="+rowKey); + return; + } } - - boolean isOk = verifyResultAgainstDataGenerator(result, verify); + boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); long numErrorsAfterThis = 0; if (isOk) { long cols = 0; @@ -306,12 +331,17 @@ public class MultiThreadedReader extends MultiThreadedAction return numUniqueKeysVerified.get(); } + public long getNullResultsCount() { + return nullResult.get(); + } + @Override protected String progressInfo() { StringBuilder sb = new StringBuilder(); appendToStatus(sb, "verified", numKeysVerified.get()); appendToStatus(sb, "READ FAILURES", numReadFailures.get()); appendToStatus(sb, "READ ERRORS", numReadErrors.get()); + appendToStatus(sb, "NULL RESULT", nullResult.get()); return sb.toString(); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java new file mode 100644 index 0000000..39e9ff2 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * A MultiThreadReader that helps to work with ACL + * + */ +public class MultiThreadedReaderWithACL extends MultiThreadedReader { + private static final Log LOG = LogFactory.getLog(MultiThreadedReaderWithACL.class); + + private Map userVsTable = new HashMap(); + private String[] userNames; + + public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double verifyPercent) { + super(dataGen, conf, tableName, verifyPercent); + } + + @Override + protected void addReaderThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseReaderThread reader = new HBaseReaderThreadWithACL(i); + readers.add(reader); + } + } + + static interface AccessAction extends PrivilegedExceptionAction { + } + + public class HBaseReaderThreadWithACL extends HBaseReaderThread { + + public HBaseReaderThreadWithACL(int readerId) throws IOException { + super(readerId); + } + + @Override + protected HTable createTableInstance() throws IOException { + return null; + } + + @Override + protected void closeTable() { + Collection tables = userVsTable.values(); + Iterator iterator = tables.iterator(); + while (iterator.hasNext()) { + HTable table = null; + try { + table = iterator.next(); + table.close(); + } catch (Exception e) { + LOG.error("Error while closing the table " + table.getName().getNamespaceAsString(), e); + } + } + } + + @Override + public void queryKey(final Get get, final boolean verify, final long keyToRead) + throws IOException { + final String rowKey = Bytes.toString(get.getRow()); + + // read the data + final long start = System.currentTimeMillis(); + AccessAction action = new AccessAction() { + @Override + public Object run() throws Exception { + HTable localTable = null; + try { + get.setACLStrategy(true); + Result result = null; + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + int mod = ((int) keyToRead % userNames.length); + if (userVsTable.get(userNames[mod]) == null) { + localTable = new HTable(conf, tableName); + userVsTable.put(userNames[mod], localTable); + result = localTable.get(get); + } else { + localTable = userVsTable.get(userNames[mod]); + result = localTable.get(get); + } + boolean isNullExpected = ((((int) keyToRead % 100)) == 0); + LOG.info("Read happening from ACL " + isNullExpected); + getResultMetricUpdation(verify, rowKey, start, result, localTable, isNullExpected); + } catch (IOException e) { + recordFailure(keyToRead); + } + return null; + } + }; + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + if (userNames != null && userNames.length > 0) { + int mod = ((int) keyToRead % userNames.length); + UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); + User user = User.create(realUserUgi); + try { + user.runAs(action); + } catch (Exception e) { + recordFailure(keyToRead); + } + } + } + + private void recordFailure(final long keyToRead) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java index 30a7116..5049ff7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java @@ -56,7 +56,7 @@ import com.google.common.base.Preconditions; public class MultiThreadedUpdater extends MultiThreadedWriterBase { private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class); - private Set updaters = new HashSet(); + protected Set updaters = new HashSet(); private MultiThreadedWriterBase writer = null; private boolean isBatchUpdate = false; @@ -86,12 +86,16 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { LOG.debug("Updating keys [" + startKey + ", " + endKey + ")"); } + addUpdaterThreads(numThreads); + + startThreads(updaters); + } + + protected void addUpdaterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseUpdaterThread updater = new HBaseUpdaterThread(i); updaters.add(updater); } - - startThreads(updaters); } private long getNextKeyToUpdate() { @@ -115,12 +119,16 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { } } - private class HBaseUpdaterThread extends Thread { - private final HTable table; + protected class HBaseUpdaterThread extends Thread { + protected final HTable table; public HBaseUpdaterThread(int updaterId) throws IOException { setName(getClass().getSimpleName() + "_" + updaterId); - table = new HTable(conf, tableName); + table = createTableInstance(); + } + + protected HTable createTableInstance() throws IOException { + return new HTable(conf, tableName); } public void run() { @@ -151,67 +159,72 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { numCols.addAndGet(1); app = new Append(rowKey); } - Result result = null; + Get get = new Get(rowKey); + get.addFamily(cf); try { - Get get = new Get(rowKey); - get.addFamily(cf); get = dataGenerator.beforeGet(rowKeyBase, get); - result = table.get(get); - } catch (IOException ie) { - LOG.warn("Failed to get the row for key = [" - + rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie); + } catch (Exception e) { + // Ideally wont happen + LOG.warn("Failed to get the row for key = [" + get.getRow() + + "], column family = [" + Bytes.toString(cf) + "]", e); } + Result result = getRowKey(get, rowKeyBase, cf); Map columnValues = result != null ? result.getFamilyMap(cf) : null; if (columnValues == null) { - failedKeySet.add(rowKeyBase); - LOG.error("Failed to update the row with key = [" - + rowKey + "], since we could not get the original row"); - } - for (byte[] column : columnValues.keySet()) { - if (Bytes.equals(column, INCREMENT) - || Bytes.equals(column, MUTATE_INFO)) { - continue; - } - MutationType mt = MutationType.valueOf( - RandomUtils.nextInt(MutationType.values().length)); - long columnHash = Arrays.hashCode(column); - long hashCode = cfHash + columnHash; - byte[] hashCodeBytes = Bytes.toBytes(hashCode); - byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; - if (hashCode % 2 == 0) { - Cell kv = result.getColumnLatestCell(cf, column); - checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; - Preconditions.checkNotNull(checkedValue, - "Column value to be checked should not be null"); + if (((int) rowKeyBase % 100 == 0)) { + LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey)); + } else { + failedKeySet.add(rowKeyBase); + LOG.error("Failed to update the row with key = [" + rowKey + + "], since we could not get the original row"); } - buf.setLength(0); // Clear the buffer - buf.append("#").append(Bytes.toString(column)).append(":"); - ++columnCount; - switch (mt) { - case PUT: - Put put = new Put(rowKey); - put.add(cf, column, hashCodeBytes); - mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); - buf.append(MutationType.PUT.getNumber()); - break; - case DELETE: - Delete delete = new Delete(rowKey); - // Delete all versions since a put - // could be called multiple times if CM is used - delete.deleteColumns(cf, column); - mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); - buf.append(MutationType.DELETE.getNumber()); - break; - default: - buf.append(MutationType.APPEND.getNumber()); - app.add(cf, column, hashCodeBytes); - } - app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); - if (!isBatchUpdate) { - mutate(table, app, rowKeyBase); - numCols.addAndGet(1); - app = new Append(rowKey); + } + if(columnValues != null) { + for (byte[] column : columnValues.keySet()) { + if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) { + continue; + } + MutationType mt = MutationType + .valueOf(RandomUtils.nextInt(MutationType.values().length)); + long columnHash = Arrays.hashCode(column); + long hashCode = cfHash + columnHash; + byte[] hashCodeBytes = Bytes.toBytes(hashCode); + byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; + if (hashCode % 2 == 0) { + Cell kv = result.getColumnLatestCell(cf, column); + checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; + Preconditions.checkNotNull(checkedValue, + "Column value to be checked should not be null"); + } + buf.setLength(0); // Clear the buffer + buf.append("#").append(Bytes.toString(column)).append(":"); + ++columnCount; + switch (mt) { + case PUT: + Put put = new Put(rowKey); + put.add(cf, column, hashCodeBytes); + mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.PUT.getNumber()); + break; + case DELETE: + Delete delete = new Delete(rowKey); + // Delete all versions since a put + // could be called multiple times if CM is used + delete.deleteColumns(cf, column); + mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.DELETE.getNumber()); + break; + default: + buf.append(MutationType.APPEND.getNumber()); + app.add(cf, column, hashCodeBytes); + } + app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); + if (!isBatchUpdate) { + mutate(table, app, rowKeyBase); + numCols.addAndGet(1); + app = new Append(rowKey); + } } } } @@ -230,12 +243,71 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { } } } finally { - try { + closeHTable(); + numThreadsWorking.decrementAndGet(); + } + } + + protected void closeHTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + } + + protected Result getRowKey(Get get, long rowKeyBase, byte[] cf) { + Result result = null; + try { + result = table.get(get); + } catch (IOException ie) { + LOG.warn( + "Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + return result; + } + + public void mutate(HTable table, Mutation m, long keyBase) { + mutate(table, m, keyBase, null, null, null, null); + } + + public void mutate(HTable table, Mutation m, + long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) { + long start = System.currentTimeMillis(); + try { + m = dataGenerator.beforeMutate(keyBase, m); + if (m instanceof Increment) { + table.increment((Increment)m); + } else if (m instanceof Append) { + table.append((Append)m); + } else if (m instanceof Put) { + table.checkAndPut(row, cf, q, v, (Put)m); + } else if (m instanceof Delete) { + table.checkAndDelete(row, cf, q, v, (Delete)m); + } else { + throw new IllegalArgumentException( + "unsupported mutation " + m.getClass().getSimpleName()); + } + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + + exceptionInfo); } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java new file mode 100644 index 0000000..9ac2b29 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; + +/** + * A MultiThreadReader that helps to work with ACL + * + */ +public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater { + private static final Log LOG = LogFactory.getLog(MultiThreadedUpdaterWithACL.class); + + private User userOwner; + private Map userVsTable = new HashMap(); + private String[] userNames; + + public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double updatePercent, User userOwner) { + super(dataGen, conf, tableName, updatePercent); + this.userOwner = userOwner; + } + + static interface AccessAction extends PrivilegedExceptionAction { + } + + @Override + protected void addUpdaterThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i); + updaters.add(updater); + } + } + + public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread { + + private HTable table; + private WriteAccessAction writerAction = new WriteAccessAction(); + + public HBaseUpdaterThreadWithACL(int updaterId) throws IOException { + super(updaterId); + } + + @Override + protected HTable createTableInstance() throws IOException { + return null; + } + + @Override + protected void closeHTable() { + try { + if (table != null) { + table.close(); + } + Collection tables = userVsTable.values(); + Iterator iterator = tables.iterator(); + while (iterator.hasNext()) { + HTable table = null; + try { + table = iterator.next(); + table.close(); + } catch (Exception e) { + LOG.error("Error while closing the table " + table.getName().getNamespaceAsString(), e); + } + } + } catch (Exception e) { + LOG.error("Error while closing the HTable ", e); + } + } + + @Override + protected Result getRowKey(final Get get, final long rowKeyBase, final byte[] cf) { + AccessAction action = new AccessAction() { + + @Override + public Object run() throws Exception { + Result res = null; + HTable localTable = null; + try { + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + int mod = ((int) rowKeyBase % userNames.length); + if (userVsTable.get(userNames[mod]) == null) { + localTable = new HTable(conf, tableName); + userVsTable.put(userNames[mod], localTable); + res = localTable.get(get); + } else { + localTable = userVsTable.get(userNames[mod]); + res = localTable.get(get); + } + } catch (IOException ie) { + LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + return res; + } + }; + + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + if (userNames != null && userNames.length > 0) { + int mod = ((int) rowKeyBase % userNames.length); + UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]); + User user = User.create(realUserUgi); + try { + Result result = (Result) user.runAs(action); + return result; + } catch (Exception ie) { + LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + } + return null; + } + + @Override + public void mutate(final HTable table, Mutation m, final long keyBase, final byte[] row, + final byte[] cf, final byte[] q, final byte[] v) { + final long start = System.currentTimeMillis(); + try { + m = dataGenerator.beforeMutate(keyBase, m); + writerAction.setMutation(m); + writerAction.setCF(cf); + writerAction.setRow(row); + writerAction.setQualifier(q); + writerAction.setValue(v); + writerAction.setStartTime(start); + writerAction.setKeyBase(keyBase); + userOwner.runAs(writerAction); + } catch (IOException e) { + recordFailure(m, keyBase, start, e); + } catch (InterruptedException e) { + failedKeySet.add(keyBase); + } + } + + class WriteAccessAction implements AccessAction { + private HTable table; + private long start; + private Mutation m; + private long keyBase; + private byte[] row; + private byte[] cf; + private byte[] q; + private byte[] v; + + public WriteAccessAction() { + + } + + public void setStartTime(final long start) { + this.start = start; + } + + public void setMutation(final Mutation m) { + this.m = m; + } + + public void setRow(final byte[] row) { + this.row = row; + } + + public void setCF(final byte[] cf) { + this.cf = cf; + } + + public void setQualifier(final byte[] q) { + this.q = q; + } + + public void setValue(final byte[] v) { + this.v = v; + } + + public void setKeyBase(final long keyBase) { + this.keyBase = keyBase; + } + + @Override + public Object run() throws Exception { + try { + if (table == null) { + table = new HTable(conf, tableName); + } + if (m instanceof Increment) { + table.increment((Increment) m); + } else if (m instanceof Append) { + table.append((Append) m); + } else if (m instanceof Put) { + table.checkAndPut(row, cf, q, v, (Put) m); + } else if (m instanceof Delete) { + table.checkAndDelete(row, cf, q, v, (Delete) m); + } else { + throw new IllegalArgumentException("unsupported mutation " + + m.getClass().getSimpleName()); + } + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + recordFailure(m, keyBase, start, e); + } + return null; + } + } + + private void recordFailure(final Mutation m, final long keyBase, final long start, IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + + exceptionInfo); + } + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java index 9553f03..17fe6fc 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -22,8 +22,11 @@ import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Arrays; import java.util.HashSet; +import java.util.Random; import java.util.Set; import org.apache.commons.logging.Log; @@ -33,15 +36,19 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; /** Creates multiple threads that write key/values into the */ public class MultiThreadedWriter extends MultiThreadedWriterBase { private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); - private Set writers = new HashSet(); + protected Set writers = new HashSet(); - private boolean isMultiPut = false; + protected boolean isMultiPut = false; + + protected Random random = new Random(); public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName) { @@ -61,20 +68,28 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); } + createWriterThreads(numThreads); + + startThreads(writers); + } + + protected void createWriterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseWriterThread writer = new HBaseWriterThread(i); writers.add(writer); } - - startThreads(writers); } - private class HBaseWriterThread extends Thread { + public class HBaseWriterThread extends Thread { private final HTable table; public HBaseWriterThread(int writerId) throws IOException { setName(getClass().getSimpleName() + "_" + writerId); - table = new HTable(conf, tableName); + table = createTableInstance(); + } + + protected HTable createTableInstance() throws IOException { + return new HTable(conf, tableName); } public void run() { @@ -119,12 +134,42 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { } } } finally { - try { + closeHTable(); + numThreadsWorking.decrementAndGet(); + } + } + + public void insert(HTable table, Put put, long keyBase) { + long start = System.currentTimeMillis(); + try { + put = (Put) dataGenerator.beforeMutate(keyBase, put); + table.put(put); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + + exceptionInfo); + } + } + protected void closeHTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java index 4bb873f..9373e6f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; @@ -36,10 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; -import org.apache.hadoop.util.StringUtils; /** Creates multiple threads that write key/values into the */ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { @@ -171,31 +166,6 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { return failedKeySet.size(); } - public void insert(HTable table, Put put, long keyBase) { - long start = System.currentTimeMillis(); - try { - put = (Put) dataGenerator.beforeMutate(keyBase, put); - table.put(put); - totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); - } catch (IOException e) { - failedKeySet.add(keyBase); - String exceptionInfo; - if (e instanceof RetriesExhaustedWithDetailsException) { - RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; - exceptionInfo = aggEx.getExhaustiveDescription(); - } else { - StringWriter stackWriter = new StringWriter(); - PrintWriter pw = new PrintWriter(stackWriter); - e.printStackTrace(pw); - pw.flush(); - exceptionInfo = StringUtils.stringifyException(e); - } - LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + - "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " - + exceptionInfo); - } - } - /** * The max key until which all keys have been inserted/updated (successfully or not). * @return the last key that we have inserted/updated all keys up to (inclusive) diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java new file mode 100644 index 0000000..0fe13a5 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; + +/** + * MultiThreadedWriter that helps in testing ACL + * + */ +public class MultiThreadedWriterWithACL extends MultiThreadedWriter { + + private static final Log LOG = LogFactory.getLog(MultiThreadedWriterWithACL.class); + private User userOwner; + + public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, User userOwner) { + super(dataGen, conf, tableName); + this.userOwner = userOwner; + } + + static interface AccessAction extends PrivilegedExceptionAction { + } + + @Override + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); + } + + @Override + protected void createWriterThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseWriterThread writer = new HBaseWriterThreadWithACL(i); + writers.add(writer); + } + } + + public class HBaseWriterThreadWithACL extends HBaseWriterThread { + + private HTable table; + private WriteAccessAction writerAction = new WriteAccessAction(); + + public HBaseWriterThreadWithACL(int writerId) throws IOException { + super(writerId); + } + + @Override + protected HTable createTableInstance() throws IOException { + return null; + } + + @Override + protected void closeHTable() { + if (table != null) { + try { + table.close(); + } catch (Exception e) { + LOG.error("Error in closing the table", e); + } + } + } + + @Override + public void insert(final HTable table, Put put, final long keyBase) { + final long start = System.currentTimeMillis(); + try { + put = (Put) dataGenerator.beforeMutate(keyBase, put); + writerAction.setPut(put); + writerAction.setKeyBase(keyBase); + writerAction.setStartTime(start); + userOwner.runAs(writerAction); + } catch (IOException e) { + recordFailure(table, put, keyBase, start, e); + } catch (InterruptedException e) { + failedKeySet.add(keyBase); + } + } + + class WriteAccessAction implements AccessAction { + private Put put; + private long keyBase; + private long start; + + public WriteAccessAction() { + } + + public void setPut(final Put put) { + this.put = put; + } + + public void setKeyBase(final long keyBase) { + this.keyBase = keyBase; + } + + public void setStartTime(final long start) { + this.start = start; + } + + @Override + public Object run() throws Exception { + try { + if (table == null) { + table = new HTable(conf, tableName); + } + table.put(put); + } catch (IOException e) { + recordFailure(table, put, keyBase, start, e); + } + return null; + } + } + } + + private void recordFailure(final HTable table, final Put put, final long keyBase, + final long start, IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + + exceptionInfo); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java index d267647..51d2c4a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.commons.cli.CommandLine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java index aed7edd..80948c7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java @@ -25,8 +25,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java index a526e10..2fb2544 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.util.test; import java.io.IOException; +import java.util.List; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; @@ -59,7 +60,6 @@ public abstract class LoadTestDataGenerator { * @param args init args */ public void initialize(String[] args) { - } /** @@ -134,4 +134,12 @@ public abstract class LoadTestDataGenerator { public Get beforeGet(long rowkeyBase, Get get) throws IOException { return get; } + + /** + * Return the arguments passed to the generator as list of object + * @return + */ + public List getArgs() { + return null; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestGeneratorWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestGeneratorWithACL.java new file mode 100644 index 0000000..0429ec7 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestGeneratorWithACL.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.util.MultiThreadedAction.DefaultDataGenerator; + +@InterfaceAudience.Private +public class LoadTestGeneratorWithACL extends DefaultDataGenerator { + private static final Log LOG = LogFactory.getLog(LoadTestGeneratorWithACL.class); + private String[] userNames = null; + private static final String COMMA = ","; + + public LoadTestGeneratorWithACL(int minValueSize, int maxValueSize, int minColumnsPerKey, + int maxColumnsPerKey, byte[]... columnFamilies) { + super(minValueSize, maxValueSize, minColumnsPerKey, maxColumnsPerKey, columnFamilies); + } + + @Override + public void initialize(String[] args) { + if (args.length < 1 || args.length > 2) { + throw new IllegalArgumentException("LoadTestGeneratorWithACL can have " + + "1 arguement which would be the user list"); + } + String temp = args[0]; + // This will be comma separated list of expressions. + this.userNames = temp.split(COMMA); + } + + @Override + public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException { + if (!(m instanceof Delete)) { + if (userNames != null && userNames.length > 0) { + int mod = ((int) rowkeyBase % this.userNames.length); + if (((int) rowkeyBase % 100) == 0) { + // These cells cannot be read back when running as user userName[mod] + if (LOG.isTraceEnabled()) { + LOG.trace("Adding special perm " + rowkeyBase); + } + m.setACL(userNames[mod], new Permission(Permission.Action.WRITE)); + } else { + m.setACL(userNames[mod], new Permission(Permission.Action.READ)); + } + } + } + return m; + } + + @Override + public List getArgs() { + List args = new ArrayList(); + args.add(userNames); + return args; + } +}