diff --git standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java index 2ab9388301..6fcfed615c 100644 --- standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java +++ standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore.tools; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,15 +30,44 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Formatter; +import java.util.List; import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.tools.Constants.HMS_DEFAULT_PORT; -import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.*; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkCreatePartition; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkCreatePartitions; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkDeleteCreate; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkDeleteWithPartitions; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkDropDatabase; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkDropPartition; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkDropPartitions; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetNotificationId; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitionNames; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitions; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetPartitionsByName; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkGetTable; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListAllTables; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListDatabases; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListManyPartitions; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListPartition; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListTables; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkLocks; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkOpenTxns; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkRenameTable; +import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkTableCreate; import static org.apache.hadoop.hive.metastore.tools.Util.getServerUri; +import static org.apache.hadoop.hive.metastore.tools.Util.throwingSupplierWrapper; import static picocli.CommandLine.Command; import static picocli.CommandLine.Option; @@ -54,7 +84,7 @@ private static final TimeUnit scale = TimeUnit.MILLISECONDS; private static final String CSV_SEPARATOR = "\t"; private static final String TEST_TABLE = "bench_table"; - + private static final String THREAD_NAME_FORMAT = "hms_benchmark_suite_%d"; @Option(names = {"-H", "--host"}, description = "HMS Host", paramLabel = "URI") private String host; @@ -84,7 +114,7 @@ @Option(names = {"-o", "--output"}, description = "output file") private String outputFile; - @Option(names = {"-T", "--threads"}, description = "number of concurrent threads") + @Option(names = {"-T", "--threads"}, description = "number of concurrent threads/clients") private int nThreads = 2; @Option(names = {"--confdir"}, description = "configuration directory") @@ -147,11 +177,43 @@ public void run() { LOG.info("Using warmup " + warmup + " spin " + spinCount + " nparams " + nParameters + " threads " + nThreads); - StringBuilder sb = new StringBuilder(); - BenchData bData = new BenchData(dbName, tableName); + if (nThreads > 1) { + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat(THREAD_NAME_FORMAT) + .setDaemon(true) + .build(); + ExecutorService executor = Executors.newFixedThreadPool(nThreads, tf); + + try { + List> results = new ArrayList<>(); + List benchmarkSuites; + + for (int i = 0; i < nThreads; i++) { + results.add(executor.submit(this::runSuite)); + } + + benchmarkSuites = results.stream().map(r -> throwingSupplierWrapper(r::get)).collect(Collectors.toList()); + renderResults(benchmarkSuites); + } finally { + executor.shutdownNow(); + } + + } else { + try { + renderResult(runSuite()); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private BenchmarkSuite runSuite() { + String threadSafeDbName = this.dbName; + BenchData bData = new BenchData(threadSafeDbName, tableName); MicroBenchmark bench = new MicroBenchmark(warmup, spinCount); BenchmarkSuite suite = new BenchmarkSuite(); + BenchmarkSuite result = null; suite .setScale(scale) @@ -178,7 +240,9 @@ public void run() { .add("dropDatabase", () -> benchmarkDropDatabase(bench, bData, 1)) .add("openTxn", - () -> benchmarkOpenTxns(bench, bData, 1)); + () -> benchmarkOpenTxns(bench, bData, 1)) + .add("createLock", + () -> benchmarkLocks(bench, bData, 1, nParameters)); for (int howMany: instances) { suite.add("listTables" + '.' + howMany, @@ -202,58 +266,89 @@ public void run() { .add("dropDatabase" + '.' + howMany, () -> benchmarkDropDatabase(bench, bData, howMany)) .add("openTxns" + '.' + howMany, - () -> benchmarkOpenTxns(bench, bData, howMany)); + () -> benchmarkOpenTxns(bench, bData, howMany)) + .add("createLocks" + '.' + howMany, + () -> benchmarkLocks(bench, bData, howMany, nParameters)); } if (doList) { suite.listMatching(matches, exclude).forEach(System.out::println); - return; + return suite; } - LOG.info("Using table '{}.{}", dbName, tableName); + LOG.info("Using table '{}.{}", threadSafeDbName, tableName); try (HMSClient client = new HMSClient(getServerUri(host, String.valueOf(port)), confDir)) { bData.setClient(client); - if (!client.dbExists(dbName)) { - client.createDatabase(dbName); + if (!client.dbExists(threadSafeDbName)) { + client.createDatabase(threadSafeDbName); } - if (client.tableExists(dbName, tableName)) { - client.dropTable(dbName, tableName); + if (client.tableExists(threadSafeDbName, tableName)) { + client.dropTable(threadSafeDbName, tableName); } // Arrange various benchmarks in a suite - BenchmarkSuite result = suite.runMatching(matches, exclude); + result = suite.runMatching(matches, exclude); + } catch (Exception e) { + e.printStackTrace(); + } - Formatter fmt = new Formatter(sb); - if (doCSV) { - result.displayCSV(fmt, csvSeparator); - } else { - result.display(fmt); - } - PrintStream output = System.out; - if (outputFile != null) { - output = new PrintStream(outputFile); - } + return result; + } - if (outputFile != null) { - // Print results to stdout as well - StringBuilder s = new StringBuilder(); - Formatter f = new Formatter(s); - result.display(f); - System.out.print(s); - f.close(); - } + private void renderResults(List benchmarkSuites) { + BenchmarkSuite summarisedBenchmarkSuite = new BenchmarkSuite(); + + benchmarkSuites.forEach(bs -> { + bs.getResult().forEach((name, stat) -> { + if (summarisedBenchmarkSuite.getResult().get(name) == null) { + summarisedBenchmarkSuite.getResult().put(name, new DescriptiveStatistics()); + } + for (int i = 0; i < stat.getN(); i++) { + summarisedBenchmarkSuite.getResult().get(name).addValue(stat.getElement(i)); + } + }); + }); + + try { + renderResult(summarisedBenchmarkSuite); + } catch (IOException e) { + e.printStackTrace(); + } + } - output.print(sb.toString()); - fmt.close(); + private void renderResult(BenchmarkSuite result) throws IOException { + StringBuilder sb = new StringBuilder(); - if (dataSaveDir != null) { - saveData(result.getResult(), dataSaveDir, scale); - } - } catch (Exception e) { - e.printStackTrace(); + Formatter fmt = new Formatter(sb); + if (doCSV) { + result.displayCSV(fmt, csvSeparator); + } else { + result.display(fmt); + } + + PrintStream output = System.out; + if (outputFile != null) { + outputFile += "_" + Thread.currentThread().getName(); + output = new PrintStream(outputFile); + } + + if (outputFile != null) { + // Print results to stdout as well + StringBuilder s = new StringBuilder(); + Formatter f = new Formatter(s); + result.display(f); + System.out.print(s); + f.close(); + } + + output.print(sb.toString()); + fmt.close(); + + if (dataSaveDir != null) { + saveData(result.getResult(), dataSaveDir, scale); } } } diff --git standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java index d80c290b60..676eada4af 100644 --- standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java +++ standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java @@ -20,30 +20,30 @@ import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.thrift.TException; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.stream.IntStream; import static org.apache.hadoop.hive.metastore.tools.Util.addManyPartitions; import static org.apache.hadoop.hive.metastore.tools.Util.addManyPartitionsNoException; import static org.apache.hadoop.hive.metastore.tools.Util.createSchema; -import static org.apache.hadoop.hive.metastore.tools.Util.generatePartitionNames; -import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.hadoop.hive.metastore.tools.Util.throwingSupplierWrapper; /** @@ -417,6 +417,40 @@ static DescriptiveStatistics benchmarkOpenTxns(@NotNull MicroBenchmark bench, () -> throwingSupplierWrapper(() -> client.abortTxns(client.getOpenTxns()))); } + static DescriptiveStatistics benchmarkLocks(@NotNull MicroBenchmark bench, + @NotNull BenchData data, + int howMany, int nTables) { + HMSClient client = data.getClient(); + String dbName = data.dbName; + String tableNameFormat = "tmp_table_%d"; + String user = "hclient"; + List lockComponents = new ArrayList<>(); + List txnIds = new ArrayList<>(); + + try { + LOG.info("Beginning prep"); + if (nTables < 0) nTables = 1; + + for (int i = 0; i < nTables; i++) { + lockComponents.add( + new Util.LockComponentBuilder() + .setDbName(dbName) + .setTableName(String.format(tableNameFormat, i)) + .setShared() + .setOperationType(DataOperationType.SELECT) + .build()); + } + + return bench.measure(() -> { + long txnId = executeOpenTxnAndGetTxnId(client, user); + txnIds.add(txnId); + executeLock(client, txnId, user, lockComponents); + }); + } finally { + txnIds.forEach(txnId -> executeCommitTxn(client, txnId)); + } + } + private static void createManyTables(HMSClient client, int howMany, String dbName, String format) { List columns = createSchema(new ArrayList<>(Arrays.asList("name", "string"))); List partitions = createSchema(new ArrayList<>(Arrays.asList("date", "string"))); @@ -446,6 +480,22 @@ private static void createPartitionedTable(HMSClient client, String dbName, Stri .build())); } + private static void executeLock(HMSClient client, long txnId, String user, List lockComponents) { + LOG.debug("execute lock"); + + LockRequest req = new LockRequest(lockComponents, user, "localhost"); + + throwingSupplierWrapper(() -> client.lock(req)); + } + + private static long executeOpenTxnAndGetTxnId(HMSClient client, String user) { + return throwingSupplierWrapper(() -> client.openTxn(user)); + } + + private static void executeCommitTxn(HMSClient client, long txnId) { + throwingSupplierWrapper(() -> client.commitTxn(txnId)); + } + static DescriptiveStatistics benchmarkGetNotificationId(@NotNull MicroBenchmark benchmark, @NotNull BenchData data) { HMSClient client = data.getClient(); @@ -453,5 +503,4 @@ static DescriptiveStatistics benchmarkGetNotificationId(@NotNull MicroBenchmark throwingSupplierWrapper(client::getCurrentNotificationId)); } - } diff --git standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkSuite.java standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkSuite.java index 5211082a7d..d32e971eb2 100644 --- standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkSuite.java +++ standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkSuite.java @@ -75,7 +75,7 @@ // List of benchmarks. All benchmarks are executed in the order // they are inserted private final List benchmarks = new ArrayList<>(); - // Once benchmarks are executed, results are stored in TreeMap to prserve the order. + // Once benchmarks are executed, results are stored in TreeMap to preserve the order. private final Map result = new TreeMap<>(); // Whether sanitizing of results is requested private boolean doSanitize = false; @@ -115,7 +115,7 @@ public BenchmarkSuite doSanitize(boolean sanitize) { } /** - * Run all benchmarks in the 'names' list. + * Run all benchmarks in the 'names' list. * @param names list of benchmarks to run * @return this to allow chaining */ diff --git standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java index 4e75edeae6..c2b75ac080 100644 --- standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java +++ standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java @@ -22,17 +22,23 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest; import org.apache.hadoop.hive.metastore.api.DropPartitionsResult; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -53,9 +59,11 @@ import javax.security.auth.login.LoginException; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -350,16 +358,20 @@ boolean commitTxn(long txnId) throws TException { return true; } - boolean abortTxn(long txnId) throws TException { - client.abort_txn(new AbortTxnRequest(txnId)); - return true; - } - boolean abortTxns(List txnIds) throws TException { client.abort_txns(new AbortTxnsRequest(txnIds)); return true; } + LockResponse lock(@NotNull LockRequest rqst) throws TException { + return client.lock(rqst); + } + + long openTxn(String user) throws TException { + OpenTxnsResponse txns = openTxnsIntr(user, 1, null); + return txns.getTxn_ids().get(0); + } + private TTransport open(Configuration conf, @NotNull URI uri) throws TException, IOException, LoginException { boolean useSSL = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_SSL); @@ -459,6 +471,21 @@ private TTransport open(Configuration conf, @NotNull URI uri) throws return transport; } + private OpenTxnsResponse openTxnsIntr(String user, int numTxns, TxnType txnType) throws TException { + String hostname; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve my host name " + e.getMessage()); + throw new RuntimeException(e); + } + OpenTxnRequest rqst = new OpenTxnRequest(numTxns, user, hostname); + if (txnType != null) { + rqst.setTxn_type(txnType); + } + return client.open_txns(rqst); + } + @Override public void close() throws Exception { if (transport != null && transport.isOpen()) { diff --git standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java index 101d6759c5..f1f9129189 100644 --- standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java +++ standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/Util.java @@ -20,28 +20,22 @@ import com.google.common.base.Joiner; import com.google.common.net.HostAndPort; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.thrift.TException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.locks.Lock; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -383,6 +377,99 @@ Partition build() { } } + public static class LockComponentBuilder { + private LockComponent component; + private boolean tableNameSet; + private boolean partNameSet; + + public LockComponentBuilder() { + component = new LockComponent(); + tableNameSet = partNameSet = false; + } + + /** + * Set the lock to be exclusive. + * @return reference to this builder + */ + public LockComponentBuilder setExclusive() { + component.setType(LockType.EXCLUSIVE); + return this; + } + + /** + * Set the lock to be semi-shared. + * @return reference to this builder + */ + public LockComponentBuilder setSemiShared() { + component.setType(LockType.SHARED_WRITE); + return this; + } + + /** + * Set the lock to be shared. + * @return reference to this builder + */ + public LockComponentBuilder setShared() { + component.setType(LockType.SHARED_READ); + return this; + } + + /** + * Set the database name. + * @param dbName database name + * @return reference to this builder + */ + public LockComponentBuilder setDbName(String dbName) { + component.setDbname(dbName); + return this; + } + + public LockComponentBuilder setIsTransactional(boolean t) { + component.setIsTransactional(t); + return this; + } + + public LockComponentBuilder setOperationType(DataOperationType dop) { + component.setOperationType(dop); + return this; + } + + /** + * Set the table name. + * @param tableName table name + * @return reference to this builder + */ + public LockComponentBuilder setTableName(String tableName) { + component.setTablename(tableName); + tableNameSet = true; + return this; + } + + /** + * Set the partition name. + * @param partitionName partition name + * @return reference to this builder + */ + public LockComponentBuilder setPartitionName(String partitionName) { + component.setPartitionname(partitionName); + partNameSet = true; + return this; + } + + public LockComponent build() { + LockLevel level = LockLevel.DB; + if (tableNameSet) level = LockLevel.TABLE; + if (partNameSet) level = LockLevel.PARTITION; + component.setLevel(level); + return component; + } + + public LockComponent setLock(LockType type) { + component.setType(type); + return component; + } + } + /** * Create table schema from parameters. *