diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 91910d1c0c..478ae199cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -22,7 +22,6 @@ import java.io.DataInput; import java.io.IOException; import java.io.PrintStream; -import java.io.Serializable; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; @@ -37,6 +36,8 @@ import java.util.Set; import java.util.stream.Collectors; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configurable; @@ -58,6 +59,7 @@ import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; @@ -185,6 +187,7 @@ // Transaction manager used for the query. This will be set at compile time based on // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager queryTxnMgr; + private TxnType queryTxnType = TxnType.DEFAULT; private StatsSource statsSource; // Boolean to store information about whether valid txn list was generated @@ -475,7 +478,9 @@ public void compile(String command, boolean resetTaskIds, boolean deferClose) th && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) { setLastReplIdForDump(queryState.getConf()); } - openTransaction(); + queryTxnType = AcidUtils.getTxnType(tree); + openTransaction(queryTxnType); + generateValidTxnList(); } @@ -766,10 +771,10 @@ private void setLastReplIdForDump(HiveConf conf) throws HiveException, TExceptio LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); } - private void openTransaction() throws LockException, CommandProcessorException { + private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { if (checkConcurrency() && startImplicitTxn(queryTxnMgr) && !queryTxnMgr.isTxnOpen()) { String userFromUGI = getUserFromUGI(); - queryTxnMgr.openTxn(ctx, userFromUGI); + queryTxnMgr.openTxn(ctx, userFromUGI, txnType); } } @@ -946,12 +951,15 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo if (txnTables.size() != 1) { throw new LockException("Unexpected tables in compaction: " + txnTables); } - String fullTableName = txnTables.get(0); txnWriteIds = new ValidTxnWriteIdList(compactorTxnId); txnWriteIds.addTableValidWriteIdList(compactionWriteIds); } else { txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString); } + assert queryTxnType != TxnType.READ_ONLY || plan.getAcidSinks().isEmpty() : String.format( + "Inferred transaction type '%s' doesn't " + "conform to the actual query string '%s'", + queryTxnType, queryState.getQueryString()); + String writeIdStr = txnWriteIds.toString(); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr); if (plan.getFetchTask() != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index fcf499d53a..b6f30110a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; -import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX; import java.io.FileNotFoundException; import java.io.IOException; @@ -56,6 +55,7 @@ import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.table.creation.CreateTableDesc; @@ -72,6 +72,9 @@ import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; +import org.apache.hadoop.hive.ql.parse.CalcitePlanner; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -90,6 +93,7 @@ import javax.annotation.concurrent.Immutable; import java.nio.charset.Charset; +import java.util.stream.Stream; /** * Utilities that are shared by all of the ACID input and output formats. They @@ -2983,4 +2987,19 @@ public static void validateAcidPartitionLocation(String location, Configuration throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ex.getMessage()), ex); } } + + /** + * Determines transaction type based on query AST. + * @param tree AST + */ + public static TxnType getTxnType(ASTNode tree) { + return (tree.getToken().getType() == HiveParser.TOK_QUERY && + Stream.of( + new int[]{HiveParser.TOK_INSERT_INTO}, + new int[]{HiveParser.TOK_INSERT, HiveParser.TOK_TAB}) + + .noneMatch(pattern -> + new CalcitePlanner.ASTSearcher().simpleBreadthFirstSearch(tree, pattern) != null)) ? + TxnType.READ_ONLY : TxnType.DEFAULT; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 943aa383bb..76934bc203 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -220,11 +221,16 @@ void setHiveConf(HiveConf conf) { @Override public long openTxn(Context ctx, String user) throws LockException { - return openTxn(ctx, user, 0); + return openTxn(ctx, user, TxnType.DEFAULT, 0); + } + + @Override + public long openTxn(Context ctx, String user, TxnType txnType) throws LockException { + return openTxn(ctx, user, txnType, 0); } @VisibleForTesting - long openTxn(Context ctx, String user, long delay) throws LockException { + long openTxn(Context ctx, String user, TxnType txnType, long delay) throws LockException { /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call whenever it chooses A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock @@ -236,7 +242,7 @@ long openTxn(Context ctx, String user, long delay) throws LockException { throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId)); } try { - txnId = getMS().openTxn(user); + txnId = getMS().openTxn(user, txnType); stmtId = 0; numStatements = 0; tableWriteIds.clear(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index ac813c8288..7820013ab0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -53,6 +54,12 @@ private HiveLockManagerCtx lockManagerCtx; + @Override + public long openTxn(Context ctx, String user, TxnType txnType) throws LockException { + // No-op + return 0L; + } + @Override public long openTxn(Context ctx, String user) throws LockException { // No-op diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 1c53426966..600289f837 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverState; import org.apache.hadoop.hive.ql.ddl.database.lock.LockDatabaseDesc; @@ -50,6 +51,16 @@ */ long openTxn(Context ctx, String user) throws LockException; + /** + * Open a new transaction. + * @param ctx Context for this query + * @param user Hive user who is opening this transaction. + * @param txnType transaction type. + * @return The new transaction id + * @throws LockException if a transaction is already open. + */ + long openTxn(Context ctx, String user, TxnType txnType) throws LockException; + /** * Open a new transaction in target cluster. * @param replPolicy Replication policy to uniquely identify the source cluster. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index cc86afedbf..cd91948519 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -210,7 +211,8 @@ private void runReaper() throws Exception { public void testExceptions() throws Exception { addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); - ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); LockException exception = null; @@ -224,7 +226,8 @@ public void testExceptions() throws Exception { Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); exception = null; - ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper();//this will abort the txn TxnStore txnHandler = TxnUtils.getTxnStore(conf); @@ -441,7 +444,7 @@ public void testHeartbeater() throws Exception { // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance, // then txt should be able to commit // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).openTxn(ctx, "tom", + ((DbTxnManager) txnMgr).openTxn(ctx, "tom", TxnType.DEFAULT, HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2); txnMgr.acquireLocks(qp, ctx, "tom"); runReaper(); @@ -457,7 +460,8 @@ public void testHeartbeater() throws Exception { // then the txn will time out and be aborted. // Here we just don't send the heartbeat at all - an infinite delay. // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); txnMgr.acquireLocks(qp, ctx, "jerry"); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java new file mode 100644 index 0000000000..6e8c96f66d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.ql.io.AcidUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; + +@RunWith(value = Parameterized.class) +/** + * Transaction type derived from the original query test. + */ +public class TestParseUtils { + + private String query; + private TxnType txnType; + + public TestParseUtils(String query, TxnType txnType) { + this.query = query; + this.txnType = txnType; + } + + @Parameters + public static Collection data() { + return Arrays.asList( + new Object[][]{ + {"SELECT current_timestamp()", TxnType.READ_ONLY}, + {"SELECT count(*) FROM a", TxnType.READ_ONLY}, + {"SELECT count(*) FROM a JOIN b ON a.id = b.id", TxnType.READ_ONLY}, + + {"WITH a AS (SELECT current_timestamp()) " + + " SELECT * FROM a", TxnType.READ_ONLY}, + + {"INSERT into a values (1, 2)", TxnType.DEFAULT}, + {"INSERT into a select * from b", TxnType.DEFAULT}, + {"INSERT overwrite table a SELECT * from b", TxnType.DEFAULT}, + + {"WITH a AS (SELECT current_timestamp()) " + + " INSERT into b SELECT * FROM a", TxnType.DEFAULT}, + + {"UPDATE a set col_b = 1", TxnType.DEFAULT}, + {"DELETE from a WHERE col_b = 1", TxnType.DEFAULT}, + + {"CREATE table a (col_b int)", TxnType.DEFAULT}, + {"CREATE table a AS SELECT * FROM b", TxnType.DEFAULT}, + {"DROP table a", TxnType.DEFAULT}, + + {"MERGE into a trg using b src " + + " on src.col_a = trg.col_a " + + "when matched then " + + " update set col_b = src.col_b " + + "when not matched then " + + " insert values (src.col_a, src.col_b)", + TxnType.DEFAULT}, + }); + } + + @Test + public void testTxnType() throws ParseException { + Assert.assertEquals( + AcidUtils.getTxnType(ParseUtils.parse(query)), txnType); + } +}