diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 91910d1c0c..bae0ffd295 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -22,10 +22,8 @@ import java.io.DataInput; import java.io.IOException; import java.io.PrintStream; -import java.io.Serializable; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -37,11 +35,10 @@ import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -53,11 +50,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; @@ -82,7 +79,6 @@ import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lock.CompileLock; @@ -185,6 +181,7 @@ // Transaction manager used for the query. This will be set at compile time based on // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager queryTxnMgr; + private TxnType queryTxnType = TxnType.DEFAULT; private StatsSource statsSource; // Boolean to store information about whether valid txn list was generated @@ -475,7 +472,9 @@ public void compile(String command, boolean resetTaskIds, boolean deferClose) th && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) { setLastReplIdForDump(queryState.getConf()); } - openTransaction(); + queryTxnType = AcidUtils.getTxnType(tree); + openTransaction(queryTxnType); + generateValidTxnList(); } @@ -676,7 +675,7 @@ private boolean isValidTxnListState() throws LockException { lckCmp.getType() == LockType.SHARED_WRITE) && lckCmp.getTablename() != null) { nonSharedLocks.add( - Warehouse.getQualifiedName( + TableName.getDbTable( lckCmp.getDbname(), lckCmp.getTablename())); } } @@ -687,7 +686,7 @@ private boolean isValidTxnListState() throws LockException { lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) && lock.getHiveLockObject().getPaths().length == 2) { nonSharedLocks.add( - Warehouse.getQualifiedName( + TableName.getDbTable( lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); } } @@ -699,25 +698,27 @@ private boolean isValidTxnListState() throws LockException { return true; } ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListStr); - List> writtenTables = getWrittenTableList(plan); + Map writtenTables = getWrittenTables(plan); + ValidTxnWriteIdList currentTxnWriteIds = queryTxnMgr.getValidWriteIds( - writtenTables.stream() - .filter(e -> AcidUtils.isTransactionalTable(e.getRight())) - .map(e -> e.getLeft()) + writtenTables.entrySet().stream() + .filter(e -> AcidUtils.isTransactionalTable(e.getValue())) + .map(Map.Entry::getKey) .collect(Collectors.toList()), currentTxnString); - for (Pair tableInfo : writtenTables) { - String fullQNameForLock = Warehouse.getQualifiedName( - tableInfo.getRight().getDbName(), - MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName())); + + for (Map.Entry tableInfo : writtenTables.entrySet()) { + String fullQNameForLock = TableName.getDbTable( + tableInfo.getValue().getDbName(), + MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName())); if (nonSharedLocks.contains(fullQNameForLock)) { // Check if table is transactional - if (AcidUtils.isTransactionalTable(tableInfo.getRight())) { + if (AcidUtils.isTransactionalTable(tableInfo.getValue())) { // Check that write id is still valid if (!TxnIdUtils.checkEquivalentWriteIds( - txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()), - currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) { + txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()), + currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) { // Write id has changed, it is not valid anymore, // we need to recompile return false; @@ -729,7 +730,7 @@ private boolean isValidTxnListState() throws LockException { if (!nonSharedLocks.isEmpty()) { throw new LockException("Wrong state: non-shared locks contain information for tables that have not" + " been visited when trying to validate the locks from query tables.\n" + - "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" + + "Tables: " + writtenTables.keySet() + "\n" + "Remaining locks after check: " + nonSharedLocks); } // It passes the test, it is valid @@ -766,10 +767,10 @@ private void setLastReplIdForDump(HiveConf conf) throws HiveException, TExceptio LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); } - private void openTransaction() throws LockException, CommandProcessorException { + private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { if (checkConcurrency() && startImplicitTxn(queryTxnMgr) && !queryTxnMgr.isTxnOpen()) { String userFromUGI = getUserFromUGI(); - queryTxnMgr.openTxn(ctx, userFromUGI); + queryTxnMgr.openTxn(ctx, userFromUGI, txnType); } } @@ -933,7 +934,7 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); } - List txnTables = getTransactionalTableList(plan); + List txnTables = getTransactionalTables(plan); ValidTxnWriteIdList txnWriteIds = null; if (compactionWriteIds != null) { /** @@ -946,12 +947,17 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo if (txnTables.size() != 1) { throw new LockException("Unexpected tables in compaction: " + txnTables); } - String fullTableName = txnTables.get(0); txnWriteIds = new ValidTxnWriteIdList(compactorTxnId); txnWriteIds.addTableValidWriteIdList(compactionWriteIds); } else { txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString); } + if (queryTxnType == TxnType.READ_ONLY && !getWrittenTables(plan).isEmpty()) { + throw new IllegalStateException(String.format( + "Inferred transaction type '%s' doesn't conform to the actual query string '%s'", + queryTxnType, queryState.getQueryString())); + } + String writeIdStr = txnWriteIds.toString(); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr); if (plan.getFetchTask() != null) { @@ -980,20 +986,31 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo return txnWriteIds; } - // Make the list of transactional tables list which are getting read or written by current txn - private List getTransactionalTableList(QueryPlan plan) { - Set tableList = new HashSet<>(); + // Make the list of transactional tables that are read or written by current txn + private List getTransactionalTables(QueryPlan plan) { + Map tables = new HashMap<>(); + plan.getInputs().forEach( + input -> addTableFromEntity(input, tables) + ); + plan.getOutputs().forEach( + output -> addTableFromEntity(output, tables) + ); + return tables.entrySet().stream() + .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } - for (ReadEntity input : plan.getInputs()) { - addTableFromEntity(input, tableList); - } - for (WriteEntity output : plan.getOutputs()) { - addTableFromEntity(output, tableList); - } - return new ArrayList(tableList); + // Make the map of tables written by current txn + private Map getWrittenTables(QueryPlan plan) { + Map tables = new HashMap<>(); + plan.getOutputs().forEach( + output -> addTableFromEntity(output, tables) + ); + return tables; } - private void addTableFromEntity(Entity entity, Collection tableList) { + private void addTableFromEntity(Entity entity, Map tables) { Table tbl; switch (entity.getType()) { case TABLE: { @@ -1009,39 +1026,8 @@ private void addTableFromEntity(Entity entity, Collection tableList) { return; } } - if (!AcidUtils.isTransactionalTable(tbl)) { - return; - } String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); - tableList.add(fullTableName); - } - - // Make the list of transactional tables list which are getting written by current txn - private List> getWrittenTableList(QueryPlan plan) { - List> result = new ArrayList<>(); - Set tableList = new HashSet<>(); - for (WriteEntity output : plan.getOutputs()) { - Table tbl; - switch (output.getType()) { - case TABLE: { - tbl = output.getTable(); - break; - } - case PARTITION: - case DUMMYPARTITION: { - tbl = output.getPartition().getTable(); - break; - } - default: { - continue; - } - } - String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); - if (tableList.add(fullTableName)) { - result.add(new ImmutablePair(fullTableName, tbl)); - } - } - return result; + tables.put(fullTableName, tbl); } private String getUserFromUGI() throws CommandProcessorException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index fcf499d53a..67996c6db9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.io; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; -import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX; +import static org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher; import java.io.FileNotFoundException; import java.io.IOException; @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.table.creation.CreateTableDesc; @@ -72,6 +73,8 @@ import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -90,6 +93,7 @@ import javax.annotation.concurrent.Immutable; import java.nio.charset.Charset; +import java.util.stream.Stream; /** * Utilities that are shared by all of the ACID input and output formats. They @@ -2983,4 +2987,20 @@ public static void validateAcidPartitionLocation(String location, Configuration throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ex.getMessage()), ex); } } + + /** + * Determines transaction type based on query AST. + * @param tree AST + */ + public static TxnType getTxnType(ASTNode tree) { + final ASTSearcher astSearcher = new ASTSearcher(); + + return (tree.getToken().getType() == HiveParser.TOK_QUERY && + Stream.of( + new int[]{HiveParser.TOK_INSERT_INTO}, + new int[]{HiveParser.TOK_INSERT, HiveParser.TOK_TAB}) + .noneMatch(pattern -> + astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ? + TxnType.READ_ONLY : TxnType.DEFAULT; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 943aa383bb..76934bc203 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -220,11 +221,16 @@ void setHiveConf(HiveConf conf) { @Override public long openTxn(Context ctx, String user) throws LockException { - return openTxn(ctx, user, 0); + return openTxn(ctx, user, TxnType.DEFAULT, 0); + } + + @Override + public long openTxn(Context ctx, String user, TxnType txnType) throws LockException { + return openTxn(ctx, user, txnType, 0); } @VisibleForTesting - long openTxn(Context ctx, String user, long delay) throws LockException { + long openTxn(Context ctx, String user, TxnType txnType, long delay) throws LockException { /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call whenever it chooses A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock @@ -236,7 +242,7 @@ long openTxn(Context ctx, String user, long delay) throws LockException { throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId)); } try { - txnId = getMS().openTxn(user); + txnId = getMS().openTxn(user, txnType); stmtId = 0; numStatements = 0; tableWriteIds.clear(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index ac813c8288..7820013ab0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -53,6 +54,12 @@ private HiveLockManagerCtx lockManagerCtx; + @Override + public long openTxn(Context ctx, String user, TxnType txnType) throws LockException { + // No-op + return 0L; + } + @Override public long openTxn(Context ctx, String user) throws LockException { // No-op diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 1c53426966..600289f837 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverState; import org.apache.hadoop.hive.ql.ddl.database.lock.LockDatabaseDesc; @@ -50,6 +51,16 @@ */ long openTxn(Context ctx, String user) throws LockException; + /** + * Open a new transaction. + * @param ctx Context for this query + * @param user Hive user who is opening this transaction. + * @param txnType transaction type. + * @return The new transaction id + * @throws LockException if a transaction is already open. + */ + long openTxn(Context ctx, String user, TxnType txnType) throws LockException; + /** * Open a new transaction in target cluster. * @param replPolicy Replication policy to uniquely identify the source cluster. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index cc86afedbf..cd91948519 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -210,7 +211,8 @@ private void runReaper() throws Exception { public void testExceptions() throws Exception { addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); - ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); LockException exception = null; @@ -224,7 +226,8 @@ public void testExceptions() throws Exception { Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); exception = null; - ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper();//this will abort the txn TxnStore txnHandler = TxnUtils.getTxnStore(conf); @@ -441,7 +444,7 @@ public void testHeartbeater() throws Exception { // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance, // then txt should be able to commit // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).openTxn(ctx, "tom", + ((DbTxnManager) txnMgr).openTxn(ctx, "tom", TxnType.DEFAULT, HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2); txnMgr.acquireLocks(qp, ctx, "tom"); runReaper(); @@ -457,7 +460,8 @@ public void testHeartbeater() throws Exception { // then the txn will time out and be aborted. // Here we just don't send the heartbeat at all - an infinite delay. // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); txnMgr.acquireLocks(qp, ctx, "jerry"); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java new file mode 100644 index 0000000000..bf8a0284c3 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.ql.io.AcidUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Transaction type derived from the original query test. + */ +@RunWith(value = Parameterized.class) +public class TestParseUtils { + + private String query; + private TxnType txnType; + + public TestParseUtils(String query, TxnType txnType) { + this.query = query; + this.txnType = txnType; + } + + @Parameters + public static Collection data() { + return Arrays.asList( + new Object[][]{ + {"SELECT current_timestamp()", TxnType.READ_ONLY}, + {"SELECT count(*) FROM a", TxnType.READ_ONLY}, + {"SELECT count(*) FROM a JOIN b ON a.id = b.id", TxnType.READ_ONLY}, + + {"WITH a AS (SELECT current_timestamp()) " + + " SELECT * FROM a", TxnType.READ_ONLY}, + + {"INSERT INTO a VALUES (1, 2)", TxnType.DEFAULT}, + {"INSERT INTO a SELECT * FROM b", TxnType.DEFAULT}, + {"INSERT OVERWRITE TABLE a SELECT * FROM b", TxnType.DEFAULT}, + + {"FROM b INSERT OVERWRITE TABLE a SELECT *", TxnType.DEFAULT}, + + {"WITH a AS (SELECT current_timestamp()) " + + " INSERT INTO b SELECT * FROM a", TxnType.DEFAULT}, + + {"UPDATE a SET col_b = 1", TxnType.DEFAULT}, + {"DELETE FROM a WHERE col_b = 1", TxnType.DEFAULT}, + + {"CREATE TABLE a (col_b int)", TxnType.DEFAULT}, + {"CREATE TABLE a AS SELECT * FROM b", TxnType.DEFAULT}, + {"DROP TABLE a", TxnType.DEFAULT}, + + {"LOAD DATA LOCAL INPATH './examples/files/kv.txt' " + + " OVERWRITE INTO TABLE a", TxnType.DEFAULT}, + + {"REPL LOAD a from './examples/files/kv.txt'", TxnType.DEFAULT}, + {"REPL DUMP a", TxnType.DEFAULT}, + {"REPL STATUS a", TxnType.DEFAULT}, + + {"MERGE INTO a trg using b src " + + " ON src.col_a = trg.col_a " + + "WHEN MATCHED THEN " + + " UPDATE SET col_b = src.col_b " + + "WHEN NOT MATCHED THEN " + + " INSERT VALUES (src.col_a, src.col_b)", + TxnType.DEFAULT}, + }); + } + + @Test + public void testTxnType() throws ParseException { + Assert.assertEquals( + AcidUtils.getTxnType(ParseUtils.parse(query)), txnType); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 355c4f5374..6281208247 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -57,6 +57,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; @@ -1700,10 +1701,17 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow } - // Check if all the input txns are in open state. Write ID should be allocated only for open transactions. - if (!isTxnsInOpenState(txnIds, stmt)) { - ensureAllTxnsValid(dbName, tblName, txnIds, stmt); - throw new RuntimeException("This should never happen for txnIds: " + txnIds); + // Check if all the input txns are in valid state. + // Write IDs should be allocated only for open and not read-only transactions. + if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) { + String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName) + + " failed for input txns: " + + getAbortedAndReadOnlyTxns(txnIds, stmt) + + getCommittedTxns(txnIds, stmt); + LOG.error(errorMsg); + + throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName) + + " as not all input txns in open state or read-only"); } List queries = new ArrayList<>(); @@ -1716,7 +1724,6 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // first write on a table will allocate write id and rest of the writes should re-use it. prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where" + " t2w_database = ? and t2w_table = ?" + " and "); - suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "t2w_txnid", false, false); @@ -4669,112 +4676,94 @@ private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, } /** - * Checks if all the txns in the list are in open state. - * @param txnIds list of txns to be evaluated for open state + * Checks if all the txns in the list are in open state and not read-only. + * @param txnIds list of txns to be evaluated for open state/read-only status * @param stmt db statement - * @return If all txns in open state, then return true else false + * @return If all the txns in open state and not read-only, then return true else false */ - private boolean isTxnsInOpenState(List txnIds, Statement stmt) throws SQLException { + private boolean isTxnsOpenAndNotReadOnly(List txnIds, Statement stmt) throws SQLException { List queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); - StringBuilder suffix = new StringBuilder(); - // Get the count of txns from the given list are in open state. If the returned count is same as - // the input number of txns, then it means, all are in open state. - prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN + "' and "); - suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, - txnIds, "txn_id", false, false); + // Get the count of txns from the given list that are in open state and not read-only. + // If the returned count is same as the input number of txns, then all txns are in open state and not read-only. + prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN + + "' and txn_type != " + TxnType.READ_ONLY.getValue() + " and "); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), + txnIds, "txn_id", false, false); long count = 0; for (String query : queries) { LOG.debug("Going to execute query <" + query + ">"); - ResultSet rs = stmt.executeQuery(query); - if (rs.next()) { - count += rs.getLong(1); + try (ResultSet rs = stmt.executeQuery(query)) { + if (rs.next()) { + count += rs.getLong(1); + } } } return count == txnIds.size(); } /** - * Checks if all the txns in the list are in open state. - * @param dbName Database name - * @param tblName Table on which we try to allocate write id - * @param txnIds list of txns to be evaluated for open state + * Get txns from the list that are either aborted or read-only. + * @param txnIds list of txns to be evaluated for aborted state/read-only status * @param stmt db statement */ - private void ensureAllTxnsValid(String dbName, String tblName, List txnIds, Statement stmt) - throws SQLException { + private String getAbortedAndReadOnlyTxns(List txnIds, Statement stmt) throws SQLException { List queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); - StringBuilder suffix = new StringBuilder(); - - // Check if any of the txns in the list is aborted. - prefix.append("select txn_id, txn_state from TXNS where "); - suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, - txnIds, "txn_id", false, false); - Long txnId; - char txnState; - boolean isAborted = false; - StringBuilder errorMsg = new StringBuilder(); - errorMsg.append("Write ID allocation on ") - .append(Warehouse.getQualifiedName(dbName, tblName)) - .append(" failed for input txns: "); + + // Check if any of the txns in the list are either aborted or read-only. + prefix.append("select txn_id, txn_state, txn_type from TXNS where "); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), + txnIds, "txn_id", false, false); + StringBuilder txnInfo = new StringBuilder(); + for (String query : queries) { LOG.debug("Going to execute query <" + query + ">"); - ResultSet rs = stmt.executeQuery(query); - while (rs.next()) { - txnId = rs.getLong(1); - txnState = rs.getString(2).charAt(0); - if (txnState != TXN_OPEN) { - isAborted = true; - errorMsg.append("{").append(txnId).append(",").append(txnState).append("}"); + try (ResultSet rs = stmt.executeQuery(query)) { + while (rs.next()) { + long txnId = rs.getLong(1); + char txnState = rs.getString(2).charAt(0); + TxnType txnType = TxnType.findByValue(rs.getInt(3)); + + if (txnState != TXN_OPEN) { + txnInfo.append("{").append(txnId).append(",").append(txnState).append("}"); + } else if (txnType == TxnType.READ_ONLY) { + txnInfo.append("{").append(txnId).append(",read-only}"); + } } } } - // Check if any of the txns in the list is committed. - boolean isCommitted = checkIfTxnsCommitted(txnIds, stmt, errorMsg); - if (isAborted || isCommitted) { - LOG.error(errorMsg.toString()); - throw new IllegalStateException("Write ID allocation failed on " - + Warehouse.getQualifiedName(dbName, tblName) - + " as not all input txns in open state"); - } + return txnInfo.toString(); } /** - * Checks if all the txns in the list are in committed. If yes, throw eception. - * @param txnIds list of txns to be evaluated for committed + * Get txns from the list that are committed. + * @param txnIds list of txns to be evaluated for committed state * @param stmt db statement - * @return true if any input txn is committed, else false */ - private boolean checkIfTxnsCommitted(List txnIds, Statement stmt, StringBuilder errorMsg) - throws SQLException { + private String getCommittedTxns(List txnIds, Statement stmt) throws SQLException { List queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); - StringBuilder suffix = new StringBuilder(); - // Check if any of the txns in the list is committed. If yes, throw exception. + // Check if any of the txns in the list are committed. prefix.append("select ctc_txnid from COMPLETED_TXN_COMPONENTS where "); - suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, - txnIds, "ctc_txnid", false, false); - Long txnId; - boolean isCommitted = false; + TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), + txnIds, "ctc_txnid", false, false); + StringBuilder txnInfo = new StringBuilder(); + for (String query : queries) { LOG.debug("Going to execute query <" + query + ">"); - ResultSet rs = stmt.executeQuery(query); - while (rs.next()) { - isCommitted = true; - txnId = rs.getLong(1); - if (errorMsg != null) { - errorMsg.append("{").append(txnId).append(",c}"); + try (ResultSet rs = stmt.executeQuery(query)) { + while (rs.next()) { + long txnId = rs.getLong(1); + txnInfo.append("{").append(txnId).append(",c}"); } } } - return isCommitted; + return txnInfo.toString(); } /** diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index b5f22092a9..fc08dbcd57 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -23,10 +23,8 @@ import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; @@ -39,12 +37,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.Rule; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import java.util.List; @@ -67,6 +66,9 @@ private IMetaStoreClient client; private Connection conn; + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Test public void testTxns() throws Exception { List tids = client.openTxns("me", 3).getTxn_ids(); @@ -312,11 +314,20 @@ public void testOpenTxnWithType() throws Exception { public void testTxnTypePersisted() throws Exception { long txnId = client.openTxn("me", TxnType.READ_ONLY); Statement stm = conn.createStatement(); - ResultSet rs = stm.executeQuery("SELECT txn_type FROM TXNS WHERE txn_id = " + txnId); + ResultSet rs = stm.executeQuery("SELECT txn_type FROM txns WHERE txn_id = " + txnId); Assert.assertTrue(rs.next()); Assert.assertEquals(TxnType.findByValue(rs.getInt(1)), TxnType.READ_ONLY); } + @Test + public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Write ID allocation failed on db.tbl as not all input txns in open state or read-only"); + + long txnId = client.openTxn("me", TxnType.READ_ONLY); + client.allocateTableWriteId(txnId, "db", "tbl"); + } + @Before public void setUp() throws Exception { conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true);