diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java deleted file mode 100644 index e19ce8e..0000000 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.util.test; - -import java.util.Set; - -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * A generator of random data (keys/cfs/columns/values) for load testing. - * Contains LoadTestKVGenerator as a matter of convenience... - */ -@InterfaceAudience.Private -public abstract class LoadTestDataGenerator { - protected final LoadTestKVGenerator kvGenerator; - - // The mutate info column stores information - // about update done to this column family this row. - public final static byte[] MUTATE_INFO = "mutate_info".getBytes(); - - // The increment column always has a long value, - // which can be incremented later on during updates. - public final static byte[] INCREMENT = "increment".getBytes(); - - /** - * Initializes the object. - * @param minValueSize minimum size of the value generated by - * {@link #generateValue(byte[], byte[], byte[])}. - * @param maxValueSize maximum size of the value generated by - * {@link #generateValue(byte[], byte[], byte[])}. - */ - public LoadTestDataGenerator(int minValueSize, int maxValueSize) { - this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize); - } - - /** - * Generates a deterministic, unique hashed row key from a number. That way, the user can - * keep track of numbers, without messing with byte array and ensuring key distribution. - * @param keyBase Base number for a key, such as a loop counter. - */ - public abstract byte[] getDeterministicUniqueKey(long keyBase); - - /** - * Gets column families for the load test table. - * @return The array of byte[]s representing column family names. - */ - public abstract byte[][] getColumnFamilies(); - - /** - * Generates an applicable set of columns to be used for a particular key and family. - * @param rowKey The row key to generate for. - * @param cf The column family name to generate for. - * @return The array of byte[]s representing column names. - */ - public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf); - - /** - * Generates a value to be used for a particular row/cf/column. - * @param rowKey The row key to generate for. - * @param cf The column family name to generate for. - * @param column The column name to generate for. - * @return The value to use. - */ - public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column); - - /** - * Checks that columns for a rowKey and cf are valid if generated via - * {@link #generateColumnsForCf(byte[], byte[])} - * @param rowKey The row key to verify for. - * @param cf The column family name to verify for. - * @param columnSet The column set (for example, encountered by read). - * @return True iff valid. - */ - public abstract boolean verify(byte[] rowKey, byte[] cf, Set columnSet); - - /** - * Checks that value for a rowKey/cf/column is valid if generated via - * {@link #generateValue(byte[], byte[], byte[])} - * @param rowKey The row key to verify for. - * @param cf The column family name to verify for. - * @param column The column name to verify for. - * @param value The value (for example, encountered by read). - * @return True iff valid. - */ - public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value); -} diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java index 4e3a5f7..3a96651 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java @@ -19,9 +19,10 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Set; -import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,15 +33,19 @@ import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Sets; + /** * A base class for tests that do something with the cluster while running * {@link LoadTestTool} to write and verify some data. */ @Category(IntegrationTests.class) public class IntegrationTestIngest extends IntegrationTestBase { - private static final int SERVER_COUNT = 4; // number of slaves for the smallest cluster + private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; private static final long JUNIT_RUN_TIME = 10 * 60 * 1000; + private static final char COMMA = ','; + private static final char COLON = ':'; /** A soft limit on how long we should run */ private static final String RUN_TIME_KEY = "hbase.%s.runtime"; @@ -78,11 +83,11 @@ public class IntegrationTestIngest extends IntegrationTestBase { @Test public void testIngest() throws Exception { - runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, false, 10); + runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10); } private void internalRunIngestTest(long runTime) throws Exception { - runIngestTest(runTime, 2500, 10, 1024, 10, false, 10); + runIngestTest(runTime, 2500, 10, 1024, 10); } @Override @@ -102,7 +107,7 @@ public class IntegrationTestIngest extends IntegrationTestBase { } protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, - int colsPerKey, int recordSize, int writeThreads, boolean useTags, int maxTagsPerKey) throws Exception { + int colsPerKey, int recordSize, int writeThreads) throws Exception { LOG.info("Running ingest"); LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize()); @@ -117,45 +122,23 @@ public class IntegrationTestIngest extends IntegrationTestBase { ((runtime - (System.currentTimeMillis() - start))/60000) + " min"); int ret = -1; - if (useTags) { - ret = loadTool.run(new String[] { "-tn", getTablename(), "-write", - String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), "-start_key", - String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init", - "-usetags", "-num_tags", String.format("1:%d", maxTagsPerKey) }); - } else { - ret = loadTool.run(new String[] { "-tn", getTablename(), "-write", - String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), "-start_key", - String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init" }); - } + + ret = loadTool.run(getArgsForLoadTestTool("-write", + String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); if (0 != ret) { String errorMsg = "Load failed with error code " + ret; LOG.error(errorMsg); Assert.fail(errorMsg); } - if (useTags) { - ret = loadTool.run(new String[] { "-tn", getTablename(), "-update", - String.format("60:%d", writeThreads), "-start_key", String.valueOf(startKey), - "-num_keys", String.valueOf(numKeys), "-skip_init", "-usetags", "-num_tags", - String.format("1:%d", maxTagsPerKey) }); - } else { - ret = loadTool.run(new String[] { "-tn", getTablename(), "-update", - String.format("60:%d", writeThreads), "-start_key", String.valueOf(startKey), - "-num_keys", String.valueOf(numKeys), "-skip_init" }); - } + ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d", writeThreads), + + startKey, numKeys)); if (0 != ret) { String errorMsg = "Update failed with error code " + ret; LOG.error(errorMsg); Assert.fail(errorMsg); } - if (useTags) { - ret = loadTool.run(new String[] { "-tn", getTablename(), "-read", "100:20", "-start_key", - String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init", - "-usetags", "-num_tags", String.format("1:%d", maxTagsPerKey) }); - } else { - ret = loadTool.run(new String[] { "-tn", getTablename(), "-read", "100:20", "-start_key", - String.valueOf(startKey), "-num_keys", String.valueOf(numKeys), "-skip_init" }); - } + ret = loadTool.run(getArgsForLoadTestTool("-read", "100:20", startKey, numKeys)); if (0 != ret) { String errorMsg = "Verification failed with error code " + ret; LOG.error(errorMsg); @@ -164,6 +147,22 @@ public class IntegrationTestIngest extends IntegrationTestBase { startKey += numKeys; } } + + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + List args = new ArrayList(); + args.add("-tn"); + args.add(getTablename()); + args.add(mode); + args.add(modeSpecificArg); + args.add("-start_key"); + args.add(String.valueOf(startKey)); + args.add("-num_keys"); + args.add(String.valueOf(numKeys)); + args.add("-skip_init"); + return args.toArray(new String[args.size()]); + } + /** Estimates a data size based on the cluster size */ private long getNumKeys(int keysPerServer) diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java new file mode 100644 index 0000000..ecb216d --- /dev/null +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithACL.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.security.access.AccessController; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.test.LoadTestGeneratorWithACL; +import org.apache.hadoop.util.ToolRunner; +import org.junit.experimental.categories.Category; + +@Category(IntegrationTests.class) +public class IntegrationTestIngestWithACL extends IntegrationTestIngest { + + private static final char COLON = ':'; + public static final char HYPHEN = '-'; + private static final char COMMA = ','; + public static final String[] userNames = { "user1", "user2", "user3", "user4" }; + + @Override + public void setUpCluster() throws Exception { + util = getTestingUtil(null); + Configuration conf = util.getConfiguration(); + conf.setInt(HFile.FORMAT_VERSION_KEY, 3); + conf.set("hbase.coprocessor.master.classes", AccessController.class.getName()); + conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()); + // conf.set("hbase.superuser", "admin"); + super.setUpCluster(); + } + + @Override + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys); + List tmp = new ArrayList(Arrays.asList(args)); + tmp.add(HYPHEN + LoadTestTool.OPT_GENERATOR); + StringBuilder sb = new StringBuilder(LoadTestGeneratorWithACL.class.getName()); + sb.append(COLON); + sb.append(asCommaSeperatedString(userNames)); + tmp.add(sb.toString()); + return tmp.toArray(new String[tmp.size()]); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestIngestWithACL(), args); + System.exit(ret); + } + + private static String asCommaSeperatedString(String[] list) { + StringBuilder sb = new StringBuilder(); + for (String item : list) { + sb.append(item); + sb.append(COMMA); + } + if (sb.length() > 0) { + // Remove the trailing , + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } +} diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java index 58fa561..2adbe6e 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithTags.java @@ -17,10 +17,21 @@ */ package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithTags; import org.junit.experimental.categories.Category; @Category(IntegrationTests.class) public class IntegrationTestIngestWithTags extends IntegrationTestIngest { + private static final char COLON = ':'; + public static final char HIPHON = '-'; + private int minTagsPerKey = 1, maxTagsPerKey = 10; + private int minTagLength = 16, maxTagLength = 512; @Override public void setUpCluster() throws Exception { getTestingUtil(conf).getConfiguration().setInt("hfile.format.version", 3); @@ -28,9 +39,22 @@ public class IntegrationTestIngestWithTags extends IntegrationTestIngest { } @Override - protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, int colsPerKey, - int recordSize, int writeThreads, boolean useTags, int maxTagsPerKey) throws Exception { - super.runIngestTest(defaultRunTime, keysPerServerPerIter, colsPerKey, recordSize, writeThreads, - true, 10); + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + String[] args = super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys); + List tmp = new ArrayList(Arrays.asList(args)); + // LoadTestDataGeneratorWithTags:minNumTags:maxNumTags:minTagLength:maxTagLength + tmp.add(HIPHON + LoadTestTool.OPT_GENERATOR); + StringBuilder sb = new StringBuilder(LoadTestDataGeneratorWithTags.class.getName()); + sb.append(COLON); + sb.append(minTagsPerKey); + sb.append(COLON); + sb.append(maxTagsPerKey); + sb.append(COLON); + sb.append(minTagLength); + sb.append(COLON); + sb.append(maxTagLength); + tmp.add(sb.toString()); + return tmp.toArray(new String[tmp.size()]); } } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java index 54ba8c8..0c9b508 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import java.security.InvalidParameterException; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -109,6 +110,12 @@ public class IntegrationTestLazyCfLoading { public long getTotalNumberOfKeys() { return totalNumberOfKeys.get(); } + + @Override + public List getArgs() { + // TODO Auto-generated method stub + return null; + } @Override public byte[] getDeterministicUniqueKey(long keyBase) { @@ -167,6 +174,12 @@ public class IntegrationTestLazyCfLoading { scf.setFilterIfMissing(true); return scf; } + + @Override + public void initialize(String[] args) { + // TODO Auto-generated method stub + + } } @Before @@ -232,7 +245,7 @@ public class IntegrationTestLazyCfLoading { LOG.info("Starting writer; the number of keys to write is " + keysToWrite); // TODO : Need to see if tag support has to be given here in the integration test suite - writer.start(1, keysToWrite, WRITER_THREADS, false, 0, 0); + writer.start(1, keysToWrite, WRITER_THREADS); // Now, do scans. long now = EnvironmentEdgeManager.currentTimeMillis(); @@ -287,4 +300,5 @@ public class IntegrationTestLazyCfLoading { Assert.assertTrue("Writer is not done", isWriterDone); // Assert.fail("Boom!"); } + } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/StripeCompactionsPerformanceEvaluation.java hbase-it/src/test/java/org/apache/hadoop/hbase/StripeCompactionsPerformanceEvaluation.java index ec0f149..a1113e1 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/StripeCompactionsPerformanceEvaluation.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/StripeCompactionsPerformanceEvaluation.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.util.List; import java.util.Set; import org.apache.commons.cli.CommandLine; @@ -34,7 +35,6 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.LoadTestTool; import org.apache.hadoop.hbase.util.MultiThreadedAction; import org.apache.hadoop.hbase.util.MultiThreadedReader; import org.apache.hadoop.hbase.util.MultiThreadedWriter; @@ -204,7 +204,7 @@ public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { if (preloadKeys > 0) { MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn); long time = System.currentTimeMillis(); - preloader.start(0, startKey, writeThreads, false, 0, 0); + preloader.start(0, startKey, writeThreads); preloader.waitForFinish(); if (preloader.getNumWriteFailures() > 0) { throw new IOException("Preload failed"); @@ -221,8 +221,8 @@ public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { reader.linkToWriter(writer); long testStartTime = System.currentTimeMillis(); - writer.start(startKey, endKey, writeThreads, false, 0, 0); - reader.start(startKey, endKey, readThreads, /* rdmReadThreads, Long.MAX_VALUE, */ false, 0, 0); + writer.start(startKey, endKey, writeThreads); + reader.start(startKey, endKey, readThreads /* rdmReadThreads, Long.MAX_VALUE, */ ); writer.waitForFinish(); reader.waitForFinish(); // reader.waitForVerification(300000); @@ -347,5 +347,17 @@ public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { public boolean verify(byte[] rowKey, byte[] cf, Set columnSet) { return true; } + + @Override + public void initialize(String[] args) { + // TODO Auto-generated method stub + + } + + @Override + public List getArgs() { + // TODO Auto-generated method stub + return null; + } }; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java new file mode 100644 index 0000000..62fbba2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security.access; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse; + +import com.google.protobuf.ByteString; +import com.google.protobuf.ZeroCopyLiteralByteString; + +/** + * Utility client for doing access control admin operations. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AccessControlClient { + public static GrantResponse grant(Configuration conf, final HTable table, final String userName, + final byte[] family, final byte[] qual, + final AccessControlProtos.Permission.Action... actions) throws Throwable { + HTable ht = null; + try { + ht = new HTable(conf, AccessControlLists.ACL_TABLE_NAME.getName()); + Batch.Call callable = new Batch.Call() { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = new BlockingRpcCallback(); + + @Override + public GrantResponse call(AccessControlService service) throws IOException { + GrantRequest.Builder builder = GrantRequest.newBuilder(); + AccessControlProtos.Permission.Builder ret = AccessControlProtos.Permission.newBuilder(); + AccessControlProtos.TablePermission.Builder permissionBuilder = AccessControlProtos.TablePermission + .newBuilder(); + for (AccessControlProtos.Permission.Action a : actions) { + permissionBuilder.addAction(a); + } + if (table.getTableName() == null) { + throw new NullPointerException("TableName cannot be null"); + } + permissionBuilder.setTableName(ProtobufUtil.toProtoTableName(table.getName())); + + if (family != null) { + permissionBuilder.setFamily(ZeroCopyLiteralByteString.wrap(family)); + } + if (qual != null) { + permissionBuilder.setQualifier(ZeroCopyLiteralByteString.wrap(qual)); + } + ret.setType(AccessControlProtos.Permission.Type.Table).setTablePermission( + permissionBuilder); + builder.setUserPermission(AccessControlProtos.UserPermission.newBuilder() + .setUser(ByteString.copyFromUtf8(userName)).setPermission(ret)); + service.grant(controller, builder.build(), rpcCallback); + return rpcCallback.get(); + } + }; + Map result = ht.coprocessorService(AccessControlService.class, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable); + return result.values().iterator().next(); // There will be exactly one + // region for labels + // table and so one entry in + // result Map. + } finally { + if (ht != null) { + try { + ht.close(); + } catch (IOException e) { + throw e; + } + } + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java index 82204d1..804be3b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java @@ -350,8 +350,8 @@ public class TableAuthManager { private boolean checkCellPermissions(User user, Cell cell, Permission.Action action) { try { List perms = AccessControlLists.getCellPermissionsForUser(user, cell); - if (LOG.isTraceEnabled()) { - LOG.trace("Found perms for user " + user.getShortName() + " in cell " + + if (LOG.isDebugEnabled()) { + LOG.debug("Found perms for user " + user.getShortName() + " in cell " + cell + ": " + perms); } for (Permission p: perms) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index f46402b..d707194 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.io.InterruptedIOException; +import java.lang.reflect.Constructor; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; @@ -38,13 +39,16 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessControlClient; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.apache.hadoop.util.ToolRunner; @@ -55,6 +59,8 @@ import org.apache.hadoop.util.ToolRunner; */ public class LoadTestTool extends AbstractHBaseTool { + private static final String COLON = ":"; + private static final Log LOG = LogFactory.getLog(LoadTestTool.class); /** Table name for the test */ @@ -107,13 +113,21 @@ public class LoadTestTool extends AbstractHBaseTool { public static final String OPT_INMEMORY = "in_memory"; public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " + "inmemory as far as possible. Not guaranteed that reads are always served from inmemory"; - public static final String OPT_USETAGS = "usetags"; - public static final String OPT_USAGE_USETAG = "Adds tags with every KV. This option would be used" + - " only if the HFileV3 version is used"; - public static final String OPT_NUM_TAGS = "num_tags"; - public static final String OPT_USAGE_NUM_TAGS = "Specifies the minimum and number of tags to be" - + " added per KV"; + public static final String OPT_GENERATOR = "generator"; + public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." + + " Any args for this class can be passed as colon separated after class name"; + + public static final String OPT_ACL_USERS = "acl_users"; + public static final String OPT_USAGE_ACL_USERS = "Specifies the users that will be associated with the permissions. Users will be" + + " picked up in a round robin fashion. In order to use this option AccessController should be turned on."; + public static final String OPT_ACL_PERMS = "acl_perms"; + public static final String OPT_USAGE_ACL_PERMS = "Specifies the permissions that needs to be added. Permissions will be " + + "picked up in a round robin fashion. Every permission will be mapped with the users specified" + + "Eg: READ:WRITE, READ, ADMIN. If the acl_users are 'user1, 'user2','user3' then user1 will have READ and WRITE permissions, " + + " user2 will have READ permission and user3 will have ADMIN permissions. In order to use this option AccessController should be turned on."; + + protected static final String OPT_KEY_WINDOW = "key_window"; protected static final String OPT_WRITE = "write"; @@ -153,9 +167,8 @@ public class LoadTestTool extends AbstractHBaseTool { protected Compression.Algorithm compressAlgo; protected BloomType bloomType; private boolean inMemoryCF; - private boolean useTags; - private int minNumTags = 1; - private int maxNumTags = 1; + + private User userOwner; // Writer options protected int numWriterThreads = DEFAULT_NUM_THREADS; protected int minColsPerKey, maxColsPerKey; @@ -179,13 +192,16 @@ public class LoadTestTool extends AbstractHBaseTool { // console tool itself should only be used from console. protected boolean isSkipInit = false; protected boolean isInitOnly = false; + + protected boolean isACL = false; + protected boolean grantPermission = false; protected Cipher cipher = null; protected String[] splitColonSeparated(String option, int minNumCols, int maxNumCols) { String optVal = cmd.getOptionValue(option); - String[] cols = optVal.split(":"); + String[] cols = optVal.split(COLON); if (cols.length < minNumCols || cols.length > maxNumCols) { throw new IllegalArgumentException("Expected at least " + minNumCols + " columns but no more than " + maxNumCols + @@ -269,8 +285,10 @@ public class LoadTestTool extends AbstractHBaseTool { addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " + "separate updates for every column in a row"); addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); - addOptNoArg(OPT_USETAGS, OPT_USAGE_USETAG); - addOptWithArg(OPT_NUM_TAGS, OPT_USAGE_NUM_TAGS + " The default is 1:1"); + addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); + + addOptWithArg(OPT_ACL_USERS, OPT_USAGE_ACL_USERS); + addOptWithArg(OPT_ACL_PERMS, OPT_USAGE_ACL_PERMS); addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); addOptWithArg(OPT_START_KEY, "The first key to read/write " + @@ -406,20 +424,6 @@ public class LoadTestTool extends AbstractHBaseTool { BloomType.valueOf(bloomStr); inMemoryCF = cmd.hasOption(OPT_INMEMORY); - useTags = cmd.hasOption(OPT_USETAGS); - if (useTags) { - if (cmd.hasOption(OPT_NUM_TAGS)) { - String[] readOpts = splitColonSeparated(OPT_NUM_TAGS, 1, 2); - int colIndex = 0; - minNumTags = parseInt(readOpts[colIndex++], 1, 100); - if (colIndex < readOpts.length) { - maxNumTags = parseInt(readOpts[colIndex++], 1, 100); - } - } - System.out.println("Using tags, number of tags per KV: min=" + minNumTags + ", max=" - + maxNumTags); - } - if (cmd.hasOption(OPT_ENCRYPTION)) { cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); } @@ -455,24 +459,74 @@ public class LoadTestTool extends AbstractHBaseTool { if (!isSkipInit) { initTestTable(); } - - LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator( - minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, COLUMN_FAMILY); + LoadTestDataGenerator dataGen = null; + if (cmd.hasOption(OPT_GENERATOR)) { + String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); + userOwner = User.createUserForTesting(conf, "owner", new String[0]); + if(clazzAndArgs[0].equals("org.apache.hadoop.hbase.util.test.LoadTestGeneratorWithACL")) { + LOG.info("ACL is on"); + isACL = true; + } + dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); + String[] args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, + 1, clazzAndArgs.length); + dataGen.initialize(args); + } else { + // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator + dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, + minColsPerKey, maxColsPerKey, COLUMN_FAMILY); + } + + if(isACL) { + if (!grantPermission) { + conf.set("hadoop.security.authorization", "false"); + conf.set("hadoop.security.authentication", "simple"); + LOG.info("Granting permission for the user " + userOwner.getShortName()); + HTable table = new HTable(conf, tableName); + AccessControlProtos.Permission.Action[] actions = { + AccessControlProtos.Permission.Action.ADMIN, + AccessControlProtos.Permission.Action.CREATE, + AccessControlProtos.Permission.Action.READ, AccessControlProtos.Permission.Action.WRITE }; + + try { + AccessControlClient.grant(conf, table, userOwner.getShortName(), COLUMN_FAMILY, null, actions); + } catch (Throwable e) { + LOG.error("Error in granting permission for the user " + userOwner.getShortName(), e); + } + grantPermission = true; + } + } if (isWrite) { - writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); - writerThreads.setMultiPut(isMultiPut); + if (isACL) { + writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); + writerThreads.setMultiPut(isMultiPut); + } else { + writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); + writerThreads.setMultiPut(isMultiPut); + } } if (isUpdate) { - updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); - updaterThreads.setBatchUpdate(isBatchUpdate); + if (isACL) { + updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, userOwner); + updaterThreads.setBatchUpdate(isBatchUpdate); + } else { + updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); + updaterThreads.setBatchUpdate(isBatchUpdate); + } } if (isRead) { - readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); - readerThreads.setMaxErrors(maxReadErrors); - readerThreads.setKeyWindow(keyWindow); + if (isACL) { + readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent); + readerThreads.setMaxErrors(maxReadErrors); + readerThreads.setKeyWindow(keyWindow); + } else { + readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); + readerThreads.setMaxErrors(maxReadErrors); + readerThreads.setKeyWindow(keyWindow); + } } if (isUpdate && isWrite) { @@ -489,20 +543,20 @@ public class LoadTestTool extends AbstractHBaseTool { if (isWrite) { System.out.println("Starting to write data..."); - writerThreads.start(startKey, endKey, numWriterThreads, useTags, minNumTags, maxNumTags); + writerThreads.start(startKey, endKey, numWriterThreads); } if (isUpdate) { LOG.info("Starting to mutate data..."); System.out.println("Starting to mutate data..."); // TODO : currently append and increment operations not tested with tags - // Will update this aftet it is done - updaterThreads.start(startKey, endKey, numUpdaterThreads, true, minNumTags, maxNumTags); + // Will update this after it is done + updaterThreads.start(startKey, endKey, numUpdaterThreads); } if (isRead) { System.out.println("Starting to read data..."); - readerThreads.start(startKey, endKey, numReaderThreads, useTags, 0, 0); + readerThreads.start(startKey, endKey, numReaderThreads); } if (isWrite) { @@ -530,8 +584,20 @@ public class LoadTestTool extends AbstractHBaseTool { } return success ? EXIT_SUCCESS : EXIT_FAILURE; } - - static byte[] generateData(final Random r, int length) { + + private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException { + try { + Class clazz = Class.forName(clazzName); + Constructor constructor = clazz.getConstructor(int.class, int.class, int.class, int.class, + byte[][].class); + return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, + minColsPerKey, maxColsPerKey, COLUMN_FAMILIES); + } catch (Exception e) { + throw new IOException(e); + } + } + + public static byte[] generateData(final Random r, int length) { byte [] b = new byte [length]; int i = 0; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index 51f998f..992262c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -68,9 +69,6 @@ public abstract class MultiThreadedAction { protected AtomicLong totalOpTimeMs = new AtomicLong(); protected boolean verbose = false; - protected boolean useTags = false; - protected int minNumTags = 1; - protected int maxNumTags = 1; protected LoadTestDataGenerator dataGenerator = null; @@ -99,6 +97,11 @@ public abstract class MultiThreadedAction { // Default values for tests that didn't care to provide theirs. this(256, 1024, 1, 10, columnFamilies); } + + @Override + public void initialize(String[] args) { + + } @Override public byte[] getDeterministicUniqueKey(long keyBase) { @@ -134,6 +137,11 @@ public abstract class MultiThreadedAction { public boolean verify(byte[] rowKey, byte[] cf, Set columnSet) { return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey); } + + @Override + public List getArgs() { + return null; + } } /** "R" or "W" */ @@ -153,14 +161,10 @@ public abstract class MultiThreadedAction { this.actionLetter = actionLetter; } - public void start(long startKey, long endKey, int numThreads, boolean useTags, int minNumTags, - int maxNumTags) throws IOException { + public void start(long startKey, long endKey, int numThreads) throws IOException { this.startKey = startKey; this.endKey = endKey; this.numThreads = numThreads; - this.useTags = useTags; - this.minNumTags = minNumTags; - this.maxNumTags = maxNumTags; (new Thread(new ProgressReporter(actionLetter))).start(); } @@ -310,6 +314,7 @@ public abstract class MultiThreadedAction { * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note * that to use this multiPut should be used, or verification * has to happen after writes, otherwise there can be races. + * @param isNullExpected * @return */ public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues, @@ -345,7 +350,7 @@ public abstract class MultiThreadedAction { } Map mutateInfo = null; - if (verifyCfAndColumnIntegrity || verifyValues) { + if ((verifyCfAndColumnIntegrity || verifyValues )) { if (!columnValues.containsKey(MUTATE_INFO)) { LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index e4953e8..aa1162f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -37,11 +37,11 @@ public class MultiThreadedReader extends MultiThreadedAction { private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class); - private Set readers = new HashSet(); + protected Set readers = new HashSet(); private final double verifyPercent; private volatile boolean aborted; - private MultiThreadedWriterBase writer = null; + protected MultiThreadedWriterBase writer = null; /** * The number of keys verified in a sequence. This will never be larger than @@ -65,8 +65,9 @@ public class MultiThreadedReader extends MultiThreadedAction public static final int DEFAULT_KEY_WINDOW = 0; protected AtomicLong numKeysVerified = new AtomicLong(0); - private AtomicLong numReadErrors = new AtomicLong(0); - private AtomicLong numReadFailures = new AtomicLong(0); + protected AtomicLong numReadErrors = new AtomicLong(0); + protected AtomicLong numReadFailures = new AtomicLong(0); + protected AtomicLong nullResult = new AtomicLong(0); private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; @@ -91,29 +92,32 @@ public class MultiThreadedReader extends MultiThreadedAction } @Override - public void start(long startKey, long endKey, int numThreads, boolean useTags, - int minNumTags, int maxNumTags) throws IOException { - super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags); + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); if (verbose) { LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); } + addReaderThreads(numThreads); + startThreads(readers); + } + + protected void addReaderThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseReaderThread reader = new HBaseReaderThread(i); readers.add(reader); } - startThreads(readers); } public class HBaseReaderThread extends Thread { - private final int readerId; - private final HTable table; + protected final int readerId; + protected final HTable table; /** The "current" key being read. Increases from startKey to endKey. */ private long curKey; /** Time when the thread started */ - private long startTimeMs; + protected long startTimeMs; /** If we are ahead of the writer and reading a random key. */ private boolean readingRandomKey; @@ -124,21 +128,31 @@ public class MultiThreadedReader extends MultiThreadedAction */ public HBaseReaderThread(int readerId) throws IOException { this.readerId = readerId; - table = new HTable(conf, tableName); + table = createTableInstance(); setName(getClass().getSimpleName() + "_" + readerId); } + protected HTable createTableInstance() throws IOException { + return new HTable(conf, tableName); + } + @Override public void run() { try { runReader(); } finally { - try { + closeTable(); + numThreadsWorking.decrementAndGet(); + } + } + + protected void closeTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); } } @@ -235,10 +249,11 @@ public class MultiThreadedReader extends MultiThreadedAction } try { + get = dataGenerator.beforeGet(keyToRead, get); if (verbose) { LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } - queryKey(get, RandomUtils.nextInt(100) < verifyPercent); + queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead); } catch (IOException e) { numReadFailures.addAndGet(1); LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") @@ -248,12 +263,18 @@ public class MultiThreadedReader extends MultiThreadedAction return get; } - public void queryKey(Get get, boolean verify) throws IOException { + public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { String rowKey = Bytes.toString(get.getRow()); // read the data long start = System.currentTimeMillis(); Result result = table.get(get); + getResultMetricUpdation(verify, rowKey, start, result, table, false); + } + + protected void getResultMetricUpdation(boolean verify, String rowKey, long start, + Result result, HTable table, boolean isNullExpected) + throws IOException { totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); numKeys.addAndGet(1); if (!result.isEmpty()) { @@ -265,9 +286,13 @@ public class MultiThreadedReader extends MultiThreadedAction Bytes.toBytes(rowKey)); LOG.info("Key = " + rowKey + ", RegionServer: " + hloc.getHostname()); + if(isNullExpected) { + nullResult.incrementAndGet(); + LOG.debug("Null result obtained for the key ="+rowKey); + return; + } } - - boolean isOk = verifyResultAgainstDataGenerator(result, verify); + boolean isOk = verifyResultAgainstDataGenerator(result, verify, false); long numErrorsAfterThis = 0; if (isOk) { long cols = 0; @@ -305,6 +330,10 @@ public class MultiThreadedReader extends MultiThreadedAction public long getNumUniqueKeysVerified() { return numUniqueKeysVerified.get(); } + + public long getNullResultsCount() { + return nullResult.get(); + } @Override protected String progressInfo() { @@ -312,6 +341,7 @@ public class MultiThreadedReader extends MultiThreadedAction appendToStatus(sb, "verified", numKeysVerified.get()); appendToStatus(sb, "READ FAILURES", numReadFailures.get()); appendToStatus(sb, "READ ERRORS", numReadErrors.get()); + appendToStatus(sb, "NULL RESULT", nullResult.get()); return sb.toString(); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java new file mode 100644 index 0000000..646d77a --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * A MultiThreadReader that helps to work with ACL + * + */ +public class MultiThreadedReaderWithACL extends MultiThreadedReader { + private static final Log LOG = LogFactory.getLog(MultiThreadedReaderWithACL.class); + + private Map userVsTable = new HashMap(); + private String[] userNames; + + public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double verifyPercent) { + super(dataGen, conf, tableName, verifyPercent); + } + + @Override + protected void addReaderThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseReaderThread reader = new HBaseReaderThreadWithACL(i); + readers.add(reader); + } + } + static interface AccessAction extends PrivilegedExceptionAction { + } + + public class HBaseReaderThreadWithACL extends HBaseReaderThread { + + public HBaseReaderThreadWithACL(int readerId) throws IOException { + super(readerId); + } + + @Override + protected HTable createTableInstance() throws IOException { + return null; + } + + @Override + protected void closeTable() { + Collection tables = userVsTable.values(); + Iterator iterator = tables.iterator(); + while(iterator.hasNext()) { + HTable table = null; + try { + table = iterator.next(); + table.close(); + } catch(Exception e) { + LOG.error("Error while closing the table "+table.getName().getNamespaceAsString() , e); + } + } + } + + + @Override + public void queryKey(final Get get, final boolean verify, final long keyToRead) + throws IOException { + final String rowKey = Bytes.toString(get.getRow()); + + // read the data + final long start = System.currentTimeMillis(); + AccessAction action = new AccessAction() { + @Override + public Object run() throws Exception { + HTable localTable = null; + try { + get.setACLStrategy(true); + Result result = null; + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + int mod = ((int) keyToRead % userNames.length); + if(userVsTable.get(userNames[mod]) == null) { + localTable = new HTable(conf, tableName); + userVsTable.put(userNames[mod], localTable); + result = localTable.get(get); + } else { + localTable = userVsTable.get(userNames[mod]); + result = localTable.get(get); + } + boolean isNullExpected = ((((int) keyToRead % 100)) == 0); + LOG.info("Read happening from ACL "+isNullExpected); + getResultMetricUpdation(verify, rowKey, start, result, localTable, isNullExpected); + } catch (IOException e) { + recordFailure(keyToRead); + } + return null; + } + }; + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + if (userNames != null && userNames.length > 0) { + int mod = ((int) keyToRead % userNames.length); + UserGroupInformation realUserUgi = + UserGroupInformation.createRemoteUser(userNames[mod]); + User user = User.create(realUserUgi); + try { + user.runAs(action); + } catch (Exception e) { + recordFailure(keyToRead); + } + } + } + + private void recordFailure(final long keyToRead) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + (System.currentTimeMillis() - startTimeMs) + " ms"); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java index e0edeaf..8a8ad7b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java @@ -56,7 +56,7 @@ import com.google.common.base.Preconditions; public class MultiThreadedUpdater extends MultiThreadedWriterBase { private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class); - private Set updaters = new HashSet(); + protected Set updaters = new HashSet(); private MultiThreadedWriterBase writer = null; private boolean isBatchUpdate = false; @@ -79,20 +79,23 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { } @Override - public void start(long startKey, long endKey, int numThreads, boolean useTags, int minNumTags, - int maxNumTags) throws IOException { - super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags); + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); if (verbose) { LOG.debug("Updating keys [" + startKey + ", " + endKey + ")"); } + addUpdaterThreads(numThreads); + + startThreads(updaters); + } + + protected void addUpdaterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseUpdaterThread updater = new HBaseUpdaterThread(i); updaters.add(updater); } - - startThreads(updaters); } private long getNextKeyToUpdate() { @@ -116,12 +119,16 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { } } - private class HBaseUpdaterThread extends Thread { - private final HTable table; + protected class HBaseUpdaterThread extends Thread { + protected final HTable table; public HBaseUpdaterThread(int updaterId) throws IOException { setName(getClass().getSimpleName() + "_" + updaterId); - table = new HTable(conf, tableName); + table = createTableInstance(); + } + + protected HTable createTableInstance() throws IOException { + return new HTable(conf, tableName); } public void run() { @@ -152,66 +159,72 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { numCols.addAndGet(1); app = new Append(rowKey); } - Result result = null; + Get get = new Get(rowKey); + get.addFamily(cf); try { - Get get = new Get(rowKey); - get.addFamily(cf); - result = table.get(get); - } catch (IOException ie) { - LOG.warn("Failed to get the row for key = [" - + rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie); + get = dataGenerator.beforeGet(rowKeyBase, get); + } catch (Exception e) { + // Ideally wont happen + LOG.warn("Failed to get the row for key = [" + get.getRow() + + "], column family = [" + Bytes.toString(cf) + "]", e); } + Result result = getRowKey(get, rowKeyBase, cf); Map columnValues = result != null ? result.getFamilyMap(cf) : null; if (columnValues == null) { - failedKeySet.add(rowKeyBase); - LOG.error("Failed to update the row with key = [" - + rowKey + "], since we could not get the original row"); - } - for (byte[] column : columnValues.keySet()) { - if (Bytes.equals(column, INCREMENT) - || Bytes.equals(column, MUTATE_INFO)) { - continue; + if (((int) rowKeyBase % 100 == 0)) { + LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey)); + } else { + failedKeySet.add(rowKeyBase); + LOG.error("Failed to update the row with key = [" + rowKey + + "], since we could not get the original row"); } - MutationType mt = MutationType.valueOf( - RandomUtils.nextInt(MutationType.values().length)); - long columnHash = Arrays.hashCode(column); - long hashCode = cfHash + columnHash; - byte[] hashCodeBytes = Bytes.toBytes(hashCode); - byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; - if (hashCode % 2 == 0) { - Cell kv = result.getColumnLatestCell(cf, column); - checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; - Preconditions.checkNotNull(checkedValue, - "Column value to be checked should not be null"); - } - buf.setLength(0); // Clear the buffer - buf.append("#").append(Bytes.toString(column)).append(":"); - ++columnCount; - switch (mt) { - case PUT: - Put put = new Put(rowKey); - put.add(cf, column, hashCodeBytes); - mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); - buf.append(MutationType.PUT.getNumber()); - break; - case DELETE: - Delete delete = new Delete(rowKey); - // Delete all versions since a put - // could be called multiple times if CM is used - delete.deleteColumns(cf, column); - mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); - buf.append(MutationType.DELETE.getNumber()); - break; - default: - buf.append(MutationType.APPEND.getNumber()); - app.add(cf, column, hashCodeBytes); - } - app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); - if (!isBatchUpdate) { - mutate(table, app, rowKeyBase); - numCols.addAndGet(1); - app = new Append(rowKey); + } + if(columnValues != null) { + for (byte[] column : columnValues.keySet()) { + if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) { + continue; + } + MutationType mt = MutationType + .valueOf(RandomUtils.nextInt(MutationType.values().length)); + long columnHash = Arrays.hashCode(column); + long hashCode = cfHash + columnHash; + byte[] hashCodeBytes = Bytes.toBytes(hashCode); + byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; + if (hashCode % 2 == 0) { + Cell kv = result.getColumnLatestCell(cf, column); + checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; + Preconditions.checkNotNull(checkedValue, + "Column value to be checked should not be null"); + } + buf.setLength(0); // Clear the buffer + buf.append("#").append(Bytes.toString(column)).append(":"); + ++columnCount; + switch (mt) { + case PUT: + Put put = new Put(rowKey); + put.add(cf, column, hashCodeBytes); + mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.PUT.getNumber()); + break; + case DELETE: + Delete delete = new Delete(rowKey); + // Delete all versions since a put + // could be called multiple times if CM is used + delete.deleteColumns(cf, column); + mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.DELETE.getNumber()); + break; + default: + buf.append(MutationType.APPEND.getNumber()); + app.add(cf, column, hashCodeBytes); + } + app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); + if (!isBatchUpdate) { + mutate(table, app, rowKeyBase); + numCols.addAndGet(1); + app = new Append(rowKey); + } } } } @@ -230,12 +243,71 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { } } } finally { - try { + closeHTable(); + numThreadsWorking.decrementAndGet(); + } + } + + protected void closeHTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + } + + protected Result getRowKey(Get get, long rowKeyBase, byte[] cf) { + Result result = null; + try { + result = table.get(get); + } catch (IOException ie) { + LOG.warn( + "Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + return result; + } + + public void mutate(HTable table, Mutation m, long keyBase) { + mutate(table, m, keyBase, null, null, null, null); + } + + public void mutate(HTable table, Mutation m, + long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) { + long start = System.currentTimeMillis(); + try { + m = dataGenerator.beforeMutate(keyBase, m); + if (m instanceof Increment) { + table.increment((Increment)m); + } else if (m instanceof Append) { + table.append((Append)m); + } else if (m instanceof Put) { + table.checkAndPut(row, cf, q, v, (Put)m); + } else if (m instanceof Delete) { + table.checkAndDelete(row, cf, q, v, (Delete)m); + } else { + throw new IllegalArgumentException( + "unsupported mutation " + m.getClass().getSimpleName()); + } + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + + exceptionInfo); } } } @@ -248,44 +320,4 @@ public class MultiThreadedUpdater extends MultiThreadedWriterBase { System.out.println("Failed to update key: " + key); } } - - public void mutate(HTable table, Mutation m, long keyBase) { - mutate(table, m, keyBase, null, null, null, null); - } - - public void mutate(HTable table, Mutation m, - long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) { - long start = System.currentTimeMillis(); - try { - if (m instanceof Increment) { - table.increment((Increment)m); - } else if (m instanceof Append) { - table.append((Append)m); - } else if (m instanceof Put) { - table.checkAndPut(row, cf, q, v, (Put)m); - } else if (m instanceof Delete) { - table.checkAndDelete(row, cf, q, v, (Delete)m); - } else { - throw new IllegalArgumentException( - "unsupported mutation " + m.getClass().getSimpleName()); - } - totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); - } catch (IOException e) { - failedKeySet.add(keyBase); - String exceptionInfo; - if (e instanceof RetriesExhaustedWithDetailsException) { - RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; - exceptionInfo = aggEx.getExhaustiveDescription(); - } else { - StringWriter stackWriter = new StringWriter(); - PrintWriter pw = new PrintWriter(stackWriter); - e.printStackTrace(pw); - pw.flush(); - exceptionInfo = StringUtils.stringifyException(e); - } - LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + - "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " - + exceptionInfo); - } - } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java new file mode 100644 index 0000000..a845b25 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdaterWithACL.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +/** + * A MultiThreadReader that helps to work with ACL + * + */ +public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater { + private static final Log LOG = LogFactory.getLog(MultiThreadedUpdaterWithACL.class); + + private User userOwner; + private Map userVsTable = new HashMap(); + private String[] userNames; + + public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double updatePercent, + User userOwner) { + super(dataGen, conf, tableName, updatePercent); + this.userOwner = userOwner; + } + + static interface AccessAction extends PrivilegedExceptionAction { + } + + @Override + protected void addUpdaterThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i); + updaters.add(updater); + } + } + public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread { + + private HTable table; + private WriteAccessAction writerAction = new WriteAccessAction(); + public HBaseUpdaterThreadWithACL(int updaterId) throws IOException { + super(updaterId); + } + + @Override + protected HTable createTableInstance() throws IOException { + return null; + } + + @Override + protected void closeHTable() { + try { + if (table != null) { + table.close(); + } + Collection tables = userVsTable.values(); + Iterator iterator = tables.iterator(); + while (iterator.hasNext()) { + HTable table = null; + try { + table = iterator.next(); + table.close(); + } catch (Exception e) { + LOG.error("Error while closing the table " + table.getName().getNamespaceAsString(), e); + } + } + } catch (Exception e) { + LOG.error("Error while closing the HTable ", e); + } + } + @Override + protected Result getRowKey(final Get get, final long rowKeyBase, final byte[] cf) { + AccessAction action = new AccessAction() { + + @Override + public Object run() throws Exception { + Result res = null; + HTable localTable = null; + try { + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + int mod = ((int) rowKeyBase % userNames.length); + if(userVsTable.get(userNames[mod]) == null) { + localTable = new HTable(conf, tableName); + userVsTable.put(userNames[mod], localTable); + res = localTable.get(get); + } else { + localTable = userVsTable.get(userNames[mod]); + res = localTable.get(get); + } + } catch (IOException ie) { + LOG.warn( + "Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + return res; + } + }; + + List args = dataGenerator.getArgs(); + Object object = args.get(0); + userNames = (String[]) object; + if (userNames != null && userNames.length > 0) { + int mod = ((int) rowKeyBase % userNames.length); + UserGroupInformation realUserUgi = + UserGroupInformation.createRemoteUser(userNames[mod]); + User user = User.create(realUserUgi); + try { + Result result = (Result) user.runAs(action); + return result; + } catch (Exception ie) { + LOG.warn( + "Failed to get the row for key = [" + get.getRow() + "], column family = [" + + Bytes.toString(cf) + "]", ie); + } + } + return null; + } + + @Override + public void mutate(final HTable table, Mutation m, final long keyBase, final byte[] row, + final byte[] cf, final byte[] q, final byte[] v) { + final long start = System.currentTimeMillis(); + try { + m = dataGenerator.beforeMutate(keyBase, m); + writerAction.setMutation(m); + writerAction.setCF(cf); + writerAction.setRow(row); + writerAction.setQualifier(q); + writerAction.setValue(v); + writerAction.setStartTime(start); + writerAction.setKeyBase(keyBase); + userOwner.runAs(writerAction); + } catch (IOException e) { + recordFailure(m, keyBase, start, e); + } catch (InterruptedException e) { + failedKeySet.add(keyBase); + } + } + class WriteAccessAction implements AccessAction { + private HTable table; + private long start; + private Mutation m; + private long keyBase; + private byte[] row; + private byte[] cf; + private byte[] q; + private byte[] v; + + public WriteAccessAction() { + + } + + public void setStartTime(final long start) { + this.start = start; + } + public void setMutation(final Mutation m) { + this.m = m; + } + public void setRow(final byte[] row) { + this.row = row; + } + public void setCF(final byte[] cf) { + this.cf = cf; + } + public void setQualifier(final byte[] q) { + this.q = q; + } + + public void setValue(final byte[] v) { + this.v = v; + } + + public void setKeyBase(final long keyBase) { + this.keyBase = keyBase; + } + + @Override + public Object run() throws Exception { + try { + if(table == null) { + table = new HTable(conf, tableName); + } + if (m instanceof Increment) { + table.increment((Increment) m); + } else if (m instanceof Append) { + table.append((Append) m); + } else if (m instanceof Put) { + table.checkAndPut(row, cf, q, v, (Put) m); + } else if (m instanceof Delete) { + table.checkAndDelete(row, cf, q, v, (Delete) m); + } else { + throw new IllegalArgumentException("unsupported mutation " + + m.getClass().getSimpleName()); + } + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + recordFailure(m, keyBase, start, e); + } + return null; + } + + } + private void recordFailure(final Mutation m, final long keyBase, + final long start, IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + + exceptionInfo); + } + } + + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java index 324720f..4ca3178 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Arrays; import java.util.HashSet; import java.util.Random; @@ -32,23 +34,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; /** Creates multiple threads that write key/values into the */ public class MultiThreadedWriter extends MultiThreadedWriterBase { private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); - private Set writers = new HashSet(); + protected Set writers = new HashSet(); - private boolean isMultiPut = false; + protected boolean isMultiPut = false; - private Random random = new Random(); - // TODO: Make this configurable - private int minTagLength = 16; - private int maxTagLength = 512; + protected Random random = new Random(); public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName) { @@ -61,28 +61,35 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { } @Override - public void start(long startKey, long endKey, int numThreads, boolean useTags, - int minNumTags, int maxNumTags) throws IOException { - super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags); + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); if (verbose) { LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); } + createWriterThreads(numThreads); + + startThreads(writers); + } + + protected void createWriterThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { HBaseWriterThread writer = new HBaseWriterThread(i); writers.add(writer); } - - startThreads(writers); } - private class HBaseWriterThread extends Thread { + public class HBaseWriterThread extends Thread { private final HTable table; public HBaseWriterThread(int writerId) throws IOException { setName(getClass().getSimpleName() + "_" + writerId); - table = new HTable(conf, tableName); + table = createTableInstance(); + } + + protected HTable createTableInstance() throws IOException { + return new HTable(conf, tableName); } public void run() { @@ -96,26 +103,9 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { int columnCount = 0; for (byte[] cf : columnFamilies) { byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf); - int numTags; - if (minNumTags == maxNumTags) { - numTags = minNumTags; - } else { - numTags = minNumTags + random.nextInt(maxNumTags - minNumTags); - } - Tag[] tags = new Tag[numTags]; for (byte[] column : columns) { byte[] value = dataGenerator.generateValue(rowKey, cf, column); - byte[] tag = LoadTestTool.generateData(random, - minTagLength + random.nextInt(maxTagLength - minTagLength)); - if(useTags) { - for (int n = 0; n < numTags; n++) { - Tag t = new Tag((byte) n, tag); - tags[n] = t; - } - put.add(cf, column, value, tags); - } else { - put.add(cf, column, value); - } + put.add(cf, column, value); ++columnCount; if (!isMultiPut) { insert(table, put, rowKeyBase); @@ -144,12 +134,41 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase { } } } finally { - try { + closeHTable(); + numThreadsWorking.decrementAndGet(); + } + } + + public void insert(HTable table, Put put, long keyBase) { + long start = System.currentTimeMillis(); + try { + table.put(put); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + + exceptionInfo); + } + } + protected void closeHTable() { + try { + if (table != null) { table.close(); - } catch (IOException e) { - LOG.error("Error closing table", e); } - numThreadsWorking.decrementAndGet(); + } catch (IOException e) { + LOG.error("Error closing table", e); } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java index ffa1821..60e2824 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; @@ -36,10 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; -import org.apache.hadoop.util.StringUtils; /** Creates multiple threads that write key/values into the */ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { @@ -83,9 +78,8 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { } @Override - public void start(long startKey, long endKey, int numThreads, boolean useTags, int minNumTags, - int maxNumTags) throws IOException { - super.start(startKey, endKey, numThreads, useTags, minNumTags, maxNumTags); + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); nextKeyToWrite.set(startKey); wroteUpToKey.set(startKey - 1); @@ -173,7 +167,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { return failedKeySet.size(); } - public void insert(HTable table, Put put, long keyBase) { +/* public void insert(HTable table, Put put, long keyBase) { long start = System.currentTimeMillis(); try { table.put(put); @@ -195,7 +189,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo); } - } + }*/ /** * The max key until which all keys have been inserted/updated (successfully or not). diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java new file mode 100644 index 0000000..a2a3dc3 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterWithACL.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; + +/** + * MultiThreadedWriter that helps in testing ACL + * + */ +public class MultiThreadedWriterWithACL extends MultiThreadedWriter { + + private static final Log LOG = LogFactory.getLog(MultiThreadedWriterWithACL.class); + private User userOwner; + public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, User userOwner) { + super(dataGen, conf, tableName); + this.userOwner = userOwner; + } + + static interface AccessAction extends PrivilegedExceptionAction { + } + + @Override + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); + } + + @Override + protected void createWriterThreads(int numThreads) throws IOException { + for (int i = 0; i < numThreads; ++i) { + HBaseWriterThread writer = new HBaseWriterThreadWithACL(i); + writers.add(writer); + } + } + + public class HBaseWriterThreadWithACL extends HBaseWriterThread { + + private HTable table; + private WriteAccessAction writerAction = new WriteAccessAction(); + public HBaseWriterThreadWithACL(int writerId) throws IOException { + super(writerId); + } + @Override + protected HTable createTableInstance() throws IOException { + return null; + } + + @Override + protected void closeHTable() { + if (table != null) { + try { + table.close(); + } catch (Exception e) { + LOG.error("Error in closing the table", e); + } + } + } + + @Override + public void insert(final HTable table, Put put, final long keyBase) { + final long start = System.currentTimeMillis(); + try { + try { + put = (Put) dataGenerator.beforeMutate(keyBase, put); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + LOG.error("Failed to insert: " + keyBase + ": region information: " + + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo); + } + writerAction.setPut(put); + writerAction.setKeyBase(keyBase); + writerAction.setStartTime(start); + userOwner.runAs(writerAction); + } catch (IOException e) { + recordFailure(table, put, keyBase, start, e); + } catch (InterruptedException e) { + failedKeySet.add(keyBase); + } + } + class WriteAccessAction implements AccessAction { + private Put put; + private long keyBase; + private long start; + + public WriteAccessAction() { + } + + public void setPut(final Put put) { + this.put = put; + } + + public void setKeyBase(final long keyBase) { + this.keyBase = keyBase; + } + + public void setStartTime(final long start) { + this.start = start; + } + + @Override + public Object run() throws Exception { + try { + if (table == null) { + table = new HTable(conf, tableName); + } + table.put(put); + } catch (IOException e) { + recordFailure(table, put, keyBase, start, e); + } + return null; + } + + } + } + + + + private void recordFailure(final HTable table, final Put put, final long keyBase, + final long start, IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + + exceptionInfo); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java index 45c5957..51d2c4a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.commons.cli.CommandLine; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -81,7 +81,7 @@ public class RestartMetaTest extends AbstractHBaseTool { minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY); MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); writer.setMultiPut(true); - writer.start(startKey, endKey, numThreads, false, 0, 0); + writer.start(startKey, endKey, numThreads); System.out.printf("Started loading data..."); writer.waitForFinish(); System.out.printf("Finished loading data..."); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java index 86b7db7..eab23d5 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java @@ -45,8 +45,8 @@ public class TestMiniClusterLoadParallel readerThreads.linkToWriter(writerThreads); - writerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0); - readerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0); + writerThreads.start(0, numKeys, NUM_THREADS); + readerThreads.start(0, numKeys, NUM_THREADS); writerThreads.waitForFinish(); readerThreads.waitForFinish(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java index 032bc78..3fa8d13 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java @@ -26,12 +26,12 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.compress.Compression; @@ -129,11 +129,11 @@ public class TestMiniClusterLoadSequential { } protected void runLoadTestOnExistingTable() throws IOException { - writerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0); + writerThreads.start(0, numKeys, NUM_THREADS); writerThreads.waitForFinish(); assertEquals(0, writerThreads.getNumWriteFailures()); - readerThreads.start(0, numKeys, NUM_THREADS, false, 0, 0); + readerThreads.start(0, numKeys, NUM_THREADS); readerThreads.waitForFinish(); assertEquals(0, readerThreads.getNumReadFailures()); assertEquals(0, readerThreads.getNumReadErrors()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java new file mode 100644 index 0000000..4466c28 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util.test; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; + +/** + * A generator of random data (keys/cfs/columns/values) for load testing. + * Contains LoadTestKVGenerator as a matter of convenience... + */ +@InterfaceAudience.Private +public abstract class LoadTestDataGenerator { + protected LoadTestKVGenerator kvGenerator; + + // The mutate info column stores information + // about update done to this column family this row. + public final static byte[] MUTATE_INFO = "mutate_info".getBytes(); + + // The increment column always has a long value, + // which can be incremented later on during updates. + public final static byte[] INCREMENT = "increment".getBytes(); + + public LoadTestDataGenerator() { + + } + + /** + * Initializes the object. + * @param minValueSize minimum size of the value generated by + * {@link #generateValue(byte[], byte[], byte[])}. + * @param maxValueSize maximum size of the value generated by + * {@link #generateValue(byte[], byte[], byte[])}. + */ + public LoadTestDataGenerator(int minValueSize, int maxValueSize) { + this.kvGenerator = new LoadTestKVGenerator(minValueSize, maxValueSize); + } + + /** + * initialize the LoadTestDataGenerator + * @param args init args + */ + public abstract void initialize(String[] args); + + /** + * Generates a deterministic, unique hashed row key from a number. That way, the user can + * keep track of numbers, without messing with byte array and ensuring key distribution. + * @param keyBase Base number for a key, such as a loop counter. + */ + public abstract byte[] getDeterministicUniqueKey(long keyBase); + + /** + * Gets column families for the load test table. + * @return The array of byte[]s representing column family names. + */ + public abstract byte[][] getColumnFamilies(); + + /** + * Generates an applicable set of columns to be used for a particular key and family. + * @param rowKey The row key to generate for. + * @param cf The column family name to generate for. + * @return The array of byte[]s representing column names. + */ + public abstract byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf); + + /** + * Generates a value to be used for a particular row/cf/column. + * @param rowKey The row key to generate for. + * @param cf The column family name to generate for. + * @param column The column name to generate for. + * @return The value to use. + */ + public abstract byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column); + + /** + * Checks that columns for a rowKey and cf are valid if generated via + * {@link #generateColumnsForCf(byte[], byte[])} + * @param rowKey The row key to verify for. + * @param cf The column family name to verify for. + * @param columnSet The column set (for example, encountered by read). + * @return True iff valid. + */ + public abstract boolean verify(byte[] rowKey, byte[] cf, Set columnSet); + + /** + * Checks that value for a rowKey/cf/column is valid if generated via + * {@link #generateValue(byte[], byte[], byte[])} + * @param rowKey The row key to verify for. + * @param cf The column family name to verify for. + * @param column The column name to verify for. + * @param value The value (for example, encountered by read). + * @return True iff valid. + */ + public abstract boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value); + + /** + * Giving a chance for the LoadTestDataGenerator to change the Mutation load. + * @param rowkeyBase + * @param m + * @return updated Mutation + * @throws IOException + */ + public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException { + return m; + } + + /** + * Giving a chance for the LoadTestDataGenerator to change the Get load. + * @param rowkeyBase + * @param get + * @return updated Get + * @throws IOException + */ + public Get beforeGet(long rowkeyBase, Get get) throws IOException { + return get; + } + + public abstract List getArgs(); +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGeneratorWithTags.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGeneratorWithTags.java new file mode 100644 index 0000000..3115cad --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestDataGeneratorWithTags.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.MultiThreadedAction.DefaultDataGenerator; + +@InterfaceAudience.Private +public class LoadTestDataGeneratorWithTags extends DefaultDataGenerator { + + private int minNumTags, maxNumTags; + private int minTagLength, maxTagLength; + private Random random = new Random(); + + public LoadTestDataGeneratorWithTags(int minValueSize, int maxValueSize, int minColumnsPerKey, + int maxColumnsPerKey, byte[]... columnFamilies) { + super(minValueSize, maxValueSize, minColumnsPerKey, maxColumnsPerKey, columnFamilies); + } + + @Override + public void initialize(String[] args) { + if (args.length != 4) { + throw new IllegalArgumentException("LoadTestDataGeneratorWithTags must have " + + "4 initialization arguments"); + } + // 1st arg in args is the min number of tags to be used with every cell + this.minNumTags = Integer.parseInt(args[0]); + // 2nd arg in args is the max number of tags to be used with every cell + this.maxNumTags = Integer.parseInt(args[1]); + // 3rd arg in args is the min tag length + this.minTagLength = Integer.parseInt(args[2]); + // 4th arg in args is the max tag length + this.maxTagLength = Integer.parseInt(args[3]); + } + + @Override + public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException { + if (m instanceof Put) { + List updatedCells = new ArrayList(); + int numTags; + if (minNumTags == maxNumTags) { + numTags = minNumTags; + } else { + numTags = minNumTags + random.nextInt(maxNumTags - minNumTags); + } + List tags; + for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { + Cell cell = cellScanner.current(); + byte[] tag = LoadTestTool.generateData(random, + minTagLength + random.nextInt(maxTagLength - minTagLength)); + tags = new ArrayList(); + for (int n = 0; n < numTags; n++) { + tags.add(new Tag((byte) 127, tag)); + } + Cell updatedCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), tags); + updatedCells.add(updatedCell); + } + m.getFamilyCellMap().clear(); + // Clear and add new Cells to the Mutation. + for (Cell cell : updatedCells) { + ((Put) m).add(cell); + } + } + return m; + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestGeneratorWithACL.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestGeneratorWithACL.java new file mode 100644 index 0000000..a988d3f --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/test/LoadTestGeneratorWithACL.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util.test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.util.MultiThreadedAction.DefaultDataGenerator; + +@InterfaceAudience.Private +public class LoadTestGeneratorWithACL extends DefaultDataGenerator { + private static final Log LOG = LogFactory.getLog(LoadTestGeneratorWithACL.class); + private String[] userNames = null; + private static final String COMMA = ","; + + public LoadTestGeneratorWithACL(int minValueSize, int maxValueSize, int minColumnsPerKey, + int maxColumnsPerKey, byte[]... columnFamilies) { + super(minValueSize, maxValueSize, minColumnsPerKey, maxColumnsPerKey, columnFamilies); + } + + @Override + public void initialize(String[] args) { + if (args.length < 1 || args.length > 2) { + throw new IllegalArgumentException("LoadTestGeneratorWithACL can have " + + "1 arguement which would be the user list"); + } + String temp = args[0]; + // This will be comma separated list of expressions. + this.userNames = temp.split(COMMA); + } + + @Override + public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException { + if (!(m instanceof Delete)) { + if (userNames != null && userNames.length > 0) { + int mod = ((int) rowkeyBase % this.userNames.length); + if (((int) rowkeyBase % 100) == 0) { + // These cells cannot be read back when running as user userName[mod] + if (LOG.isTraceEnabled()) { + LOG.trace("Adding special perm " + rowkeyBase); + } + m.setACL(userNames[mod], new Permission(Permission.Action.WRITE)); + } else { + m.setACL(userNames[mod], new Permission(Permission.Action.READ)); + } + } + } + return m; + } + + @Override + public Get beforeGet(long rowkeyBase, Get get) { + // there is nothing to do here + return get; + } + + @Override + public List getArgs() { + List args = new ArrayList(); + args.add(userNames); + return args; + } +}