diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2995afad23..ada608052c 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -628,31 +628,44 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } // Need to register minimum open txnid for current transactions into MIN_HISTORY table. - s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new IllegalStateException("Scalar query returned no rows?!?!!"); - } + // For a single txn we can do it in a single insert. With multiple txns calculating the + // minOpenTxnId for every insert is not cost effective, so caching the value + if (txnIds.size() == 1) { + s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + + "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute query <" + s + ">"); + try (PreparedStatement pstmt = dbConn.prepareStatement(s)) { + pstmt.setLong(1, txnIds.get(0)); + pstmt.execute(); + } + LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds); + } else { + s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } - // TXNS table should have atleast one entry because we just inserted the newly opened txns. - // So, min(txn_id) would be a non-zero txnid. - long minOpenTxnId = rs.getLong(1); - assert (minOpenTxnId > 0); - rows.clear(); - for (long txnId = first; txnId < first + numTxns; txnId++) { - rows.add(txnId + ", " + minOpenTxnId); - } + // TXNS table should have atleast one entry because we just inserted the newly opened txns. + // So, min(txn_id) would be a non-zero txnid. + long minOpenTxnId = rs.getLong(1); + assert (minOpenTxnId > 0); + rows.clear(); + for (long txnId = first; txnId < first + numTxns; txnId++) { + rows.add(txnId + ", " + minOpenTxnId); + } - // Insert transaction entries into MIN_HISTORY_LEVEL. - List inserts = sqlGenerator.createInsertValuesStmt( - "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); + // Insert transaction entries into MIN_HISTORY_LEVEL. + List inserts = sqlGenerator.createInsertValuesStmt( + "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds + + ") with min_open_txn: " + minOpenTxnId); } - LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds - + ") with min_open_txn: " + minOpenTxnId); if (rqst.isSetReplPolicy()) { List rowsRepl = new ArrayList<>(); @@ -1057,25 +1070,25 @@ private void updateReplId(Connection dbConn, ReplLastIdInfo replLastIdInfo) thro /** * Concurrency/isolation notes: * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)} - * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X + * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNS table for specific txnid:X * see more notes below. * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of - * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap. + * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap. * * Motivating example: * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1 - * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot - * that they read appropriately. In particular, if txns do not overlap, then one follows the other - * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure + * In order to prevent lost update problem, then the non-overlapping txns must lock in the snapshot + * that they read appropriately. In particular, if txns do not overlap, then one follows the other + * (assuming they write the same entity), and thus the 2nd must see changes of the 1st. We ensure * this by locking in snapshot after * {@link #openTxns(OpenTxnRequest)} call is made (see org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn) - * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure + * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure * that txn T which will be considered a later txn, locks in a snapshot that includes the result * of S's commit (assuming no other txns). * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions - * were running in parallel). If T and S both locked in the same snapshot (for example commit of + * were running in parallel). If T and S both locked in the same snapshot (for example commit of * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) * 'x' would be updated to the same value by both, i.e. lost update. */ diff --git standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java index 041cd76234..2ab9388301 100644 --- standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java +++ standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java @@ -69,7 +69,7 @@ private String tableName = TEST_TABLE + "_" + System.getProperty("user.name"); - @Option(names = {"-N", "--number"}, description = "umber of object instances") + @Option(names = {"-N", "--number"}, description = "number of object instances") private int[] instances = {100}; @Option(names = {"-L", "--spin"}, description = "spin count") @@ -176,7 +176,9 @@ public void run() { .add("renameTable", () -> benchmarkRenameTable(bench, bData, 1)) .add("dropDatabase", - () -> benchmarkDropDatabase(bench, bData, 1)); + () -> benchmarkDropDatabase(bench, bData, 1)) + .add("openTxn", + () -> benchmarkOpenTxns(bench, bData, 1)); for (int howMany: instances) { suite.add("listTables" + '.' + howMany, @@ -198,7 +200,9 @@ public void run() { .add("renameTable" + '.' + howMany, () -> benchmarkRenameTable(bench, bData, howMany)) .add("dropDatabase" + '.' + howMany, - () -> benchmarkDropDatabase(bench, bData, howMany)); + () -> benchmarkDropDatabase(bench, bData, howMany)) + .add("openTxns" + '.' + howMany, + () -> benchmarkOpenTxns(bench, bData, howMany)); } if (doList) { diff --git standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java index f53f2ef43b..d80c290b60 100644 --- standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java +++ standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java @@ -408,6 +408,15 @@ static DescriptiveStatistics benchmarkDropDatabase(@NotNull MicroBenchmark bench } } + static DescriptiveStatistics benchmarkOpenTxns(@NotNull MicroBenchmark bench, + @NotNull BenchData data, + int howMany) { + final HMSClient client = data.getClient(); + return bench.measure(null, + () -> throwingSupplierWrapper(() -> client.openTxn(howMany)), + () -> throwingSupplierWrapper(() -> client.abortTxns(client.getOpenTxns()))); + } + private static void createManyTables(HMSClient client, int howMany, String dbName, String format) { List columns = createSchema(new ArrayList<>(Arrays.asList("name", "string"))); List partitions = createSchema(new ArrayList<>(Arrays.asList("date", "string"))); @@ -444,4 +453,5 @@ static DescriptiveStatistics benchmarkGetNotificationId(@NotNull MicroBenchmark throwingSupplierWrapper(client::getCurrentNotificationId)); } + } diff --git standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java index 7cc1e42a8b..2d844caa77 100644 --- standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java +++ standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java @@ -20,16 +20,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest; import org.apache.hadoop.hive.metastore.api.DropPartitionsResult; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -52,7 +58,9 @@ import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -319,6 +327,40 @@ void appendPartition(@NotNull String dbName, @NotNull String tableName, client.append_partition_with_environment_context(dbName, tableName, partitionValues, null); } + boolean openTxn(int num_txns) throws TException { + client.open_txns(new OpenTxnRequest(num_txns, "Test", "Host")).getTxn_ids(); + return true; + } + + List getOpenTxns() throws TException { + GetOpenTxnsResponse txns = client.get_open_txns(); + List openTxns = new ArrayList<>(); + BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits()); + int i = 0; + for(long txnId : txns.getOpen_txns()) { + if(!abortedBits.get(i)) { + openTxns.add(txnId); + } + ++i; + } + return openTxns; + } + + boolean commitTxn(long txnId) throws TException { + client.commit_txn(new CommitTxnRequest(txnId)); + return true; + } + + boolean abortTxn(long txnId) throws TException { + client.abort_txn(new AbortTxnRequest(txnId)); + return true; + } + + boolean abortTxns(List txnIds) throws TException { + client.abort_txns(new AbortTxnsRequest(txnIds)); + return true; + } + private TTransport open(Configuration conf, @NotNull URI uri) throws TException, IOException, LoginException { boolean useSSL = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_SSL);