diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 91910d1c0c..0ff4125507 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -22,10 +22,8 @@ import java.io.DataInput; import java.io.IOException; import java.io.PrintStream; -import java.io.Serializable; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -37,11 +35,10 @@ import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -53,11 +50,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; @@ -82,7 +79,6 @@ import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lock.CompileLock; @@ -185,6 +181,7 @@ // Transaction manager used for the query. This will be set at compile time based on // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager queryTxnMgr; + private TxnType queryTxnType = TxnType.DEFAULT; private StatsSource statsSource; // Boolean to store information about whether valid txn list was generated @@ -475,7 +472,9 @@ public void compile(String command, boolean resetTaskIds, boolean deferClose) th && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) { setLastReplIdForDump(queryState.getConf()); } - openTransaction(); + queryTxnType = AcidUtils.getTxnType(tree); + openTransaction(queryTxnType); + generateValidTxnList(); } @@ -676,7 +675,7 @@ private boolean isValidTxnListState() throws LockException { lckCmp.getType() == LockType.SHARED_WRITE) && lckCmp.getTablename() != null) { nonSharedLocks.add( - Warehouse.getQualifiedName( + TableName.getDbTable( lckCmp.getDbname(), lckCmp.getTablename())); } } @@ -687,7 +686,7 @@ private boolean isValidTxnListState() throws LockException { lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) && lock.getHiveLockObject().getPaths().length == 2) { nonSharedLocks.add( - Warehouse.getQualifiedName( + TableName.getDbTable( lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); } } @@ -699,25 +698,27 @@ private boolean isValidTxnListState() throws LockException { return true; } ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListStr); - List> writtenTables = getWrittenTableList(plan); + Map writtenTables = getWrittenTables(plan); + ValidTxnWriteIdList currentTxnWriteIds = queryTxnMgr.getValidWriteIds( - writtenTables.stream() - .filter(e -> AcidUtils.isTransactionalTable(e.getRight())) - .map(e -> e.getLeft()) + writtenTables.entrySet().stream() + .filter(e -> AcidUtils.isTransactionalTable(e.getValue())) + .map(Map.Entry::getKey) .collect(Collectors.toList()), currentTxnString); - for (Pair tableInfo : writtenTables) { - String fullQNameForLock = Warehouse.getQualifiedName( - tableInfo.getRight().getDbName(), - MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName())); + + for (Map.Entry tableInfo : writtenTables.entrySet()) { + String fullQNameForLock = TableName.getDbTable( + tableInfo.getValue().getDbName(), + MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName())); if (nonSharedLocks.contains(fullQNameForLock)) { // Check if table is transactional - if (AcidUtils.isTransactionalTable(tableInfo.getRight())) { + if (AcidUtils.isTransactionalTable(tableInfo.getValue())) { // Check that write id is still valid if (!TxnIdUtils.checkEquivalentWriteIds( - txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()), - currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) { + txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()), + currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) { // Write id has changed, it is not valid anymore, // we need to recompile return false; @@ -729,7 +730,7 @@ private boolean isValidTxnListState() throws LockException { if (!nonSharedLocks.isEmpty()) { throw new LockException("Wrong state: non-shared locks contain information for tables that have not" + " been visited when trying to validate the locks from query tables.\n" + - "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" + + "Tables: " + writtenTables.keySet() + "\n" + "Remaining locks after check: " + nonSharedLocks); } // It passes the test, it is valid @@ -766,10 +767,10 @@ private void setLastReplIdForDump(HiveConf conf) throws HiveException, TExceptio LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); } - private void openTransaction() throws LockException, CommandProcessorException { + private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { if (checkConcurrency() && startImplicitTxn(queryTxnMgr) && !queryTxnMgr.isTxnOpen()) { String userFromUGI = getUserFromUGI(); - queryTxnMgr.openTxn(ctx, userFromUGI); + queryTxnMgr.openTxn(ctx, userFromUGI, txnType); } } @@ -933,7 +934,7 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); } - List txnTables = getTransactionalTableList(plan); + List txnTables = getTransactionalTables(plan); ValidTxnWriteIdList txnWriteIds = null; if (compactionWriteIds != null) { /** @@ -946,12 +947,15 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo if (txnTables.size() != 1) { throw new LockException("Unexpected tables in compaction: " + txnTables); } - String fullTableName = txnTables.get(0); txnWriteIds = new ValidTxnWriteIdList(compactorTxnId); txnWriteIds.addTableValidWriteIdList(compactionWriteIds); } else { txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString); } + assert queryTxnType != TxnType.READ_ONLY || getWrittenTables(plan).isEmpty() : String.format( + "Inferred transaction type '%s' doesn't conform to the actual query string '%s'", + queryTxnType, queryState.getQueryString()); + String writeIdStr = txnWriteIds.toString(); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr); if (plan.getFetchTask() != null) { @@ -980,20 +984,31 @@ private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws Lo return txnWriteIds; } - // Make the list of transactional tables list which are getting read or written by current txn - private List getTransactionalTableList(QueryPlan plan) { - Set tableList = new HashSet<>(); + // Make the list of transactional tables that are read or written by current txn + private List getTransactionalTables(QueryPlan plan) { + Map tables = new HashMap<>(); + plan.getInputs().forEach( + input -> addTableFromEntity(input, tables) + ); + plan.getOutputs().forEach( + output -> addTableFromEntity(output, tables) + ); + return tables.entrySet().stream() + .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } - for (ReadEntity input : plan.getInputs()) { - addTableFromEntity(input, tableList); - } - for (WriteEntity output : plan.getOutputs()) { - addTableFromEntity(output, tableList); - } - return new ArrayList(tableList); + // Make the map of tables written by current txn + private Map getWrittenTables(QueryPlan plan) { + Map tables = new HashMap<>(); + plan.getOutputs().forEach( + output -> addTableFromEntity(output, tables) + ); + return tables; } - private void addTableFromEntity(Entity entity, Collection tableList) { + private void addTableFromEntity(Entity entity, Map tables) { Table tbl; switch (entity.getType()) { case TABLE: { @@ -1009,39 +1024,8 @@ private void addTableFromEntity(Entity entity, Collection tableList) { return; } } - if (!AcidUtils.isTransactionalTable(tbl)) { - return; - } String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); - tableList.add(fullTableName); - } - - // Make the list of transactional tables list which are getting written by current txn - private List> getWrittenTableList(QueryPlan plan) { - List> result = new ArrayList<>(); - Set tableList = new HashSet<>(); - for (WriteEntity output : plan.getOutputs()) { - Table tbl; - switch (output.getType()) { - case TABLE: { - tbl = output.getTable(); - break; - } - case PARTITION: - case DUMMYPARTITION: { - tbl = output.getPartition().getTable(); - break; - } - default: { - continue; - } - } - String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); - if (tableList.add(fullTableName)) { - result.add(new ImmutablePair(fullTableName, tbl)); - } - } - return result; + tables.put(fullTableName, tbl); } private String getUserFromUGI() throws CommandProcessorException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index fcf499d53a..9b021b42ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.io; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; -import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX; +import static org.apache.hadoop.hive.ql.parse.CalcitePlanner.*; import java.io.FileNotFoundException; import java.io.IOException; @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.table.creation.CreateTableDesc; @@ -72,6 +73,8 @@ import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -90,6 +93,7 @@ import javax.annotation.concurrent.Immutable; import java.nio.charset.Charset; +import java.util.stream.Stream; /** * Utilities that are shared by all of the ACID input and output formats. They @@ -2983,4 +2987,20 @@ public static void validateAcidPartitionLocation(String location, Configuration throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ex.getMessage()), ex); } } + + /** + * Determines transaction type based on query AST. + * @param tree AST + */ + public static TxnType getTxnType(ASTNode tree) { + final ASTSearcher astSearcher = new ASTSearcher(); + + return (tree.getToken().getType() == HiveParser.TOK_QUERY && + Stream.of( + new int[]{HiveParser.TOK_INSERT_INTO}, + new int[]{HiveParser.TOK_INSERT, HiveParser.TOK_TAB}) + .noneMatch(pattern -> + astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ? + TxnType.READ_ONLY : TxnType.DEFAULT; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 943aa383bb..76934bc203 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -220,11 +221,16 @@ void setHiveConf(HiveConf conf) { @Override public long openTxn(Context ctx, String user) throws LockException { - return openTxn(ctx, user, 0); + return openTxn(ctx, user, TxnType.DEFAULT, 0); + } + + @Override + public long openTxn(Context ctx, String user, TxnType txnType) throws LockException { + return openTxn(ctx, user, txnType, 0); } @VisibleForTesting - long openTxn(Context ctx, String user, long delay) throws LockException { + long openTxn(Context ctx, String user, TxnType txnType, long delay) throws LockException { /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call whenever it chooses A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock @@ -236,7 +242,7 @@ long openTxn(Context ctx, String user, long delay) throws LockException { throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId)); } try { - txnId = getMS().openTxn(user); + txnId = getMS().openTxn(user, txnType); stmtId = 0; numStatements = 0; tableWriteIds.clear(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index ac813c8288..7820013ab0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -53,6 +54,12 @@ private HiveLockManagerCtx lockManagerCtx; + @Override + public long openTxn(Context ctx, String user, TxnType txnType) throws LockException { + // No-op + return 0L; + } + @Override public long openTxn(Context ctx, String user) throws LockException { // No-op diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 1c53426966..600289f837 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverState; import org.apache.hadoop.hive.ql.ddl.database.lock.LockDatabaseDesc; @@ -50,6 +51,16 @@ */ long openTxn(Context ctx, String user) throws LockException; + /** + * Open a new transaction. + * @param ctx Context for this query + * @param user Hive user who is opening this transaction. + * @param txnType transaction type. + * @return The new transaction id + * @throws LockException if a transaction is already open. + */ + long openTxn(Context ctx, String user, TxnType txnType) throws LockException; + /** * Open a new transaction in target cluster. * @param replPolicy Replication policy to uniquely identify the source cluster. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index cc86afedbf..cd91948519 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -210,7 +211,8 @@ private void runReaper() throws Exception { public void testExceptions() throws Exception { addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); - ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); LockException exception = null; @@ -224,7 +226,8 @@ public void testExceptions() throws Exception { Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); exception = null; - ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper();//this will abort the txn TxnStore txnHandler = TxnUtils.getTxnStore(conf); @@ -441,7 +444,7 @@ public void testHeartbeater() throws Exception { // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance, // then txt should be able to commit // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).openTxn(ctx, "tom", + ((DbTxnManager) txnMgr).openTxn(ctx, "tom", TxnType.DEFAULT, HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2); txnMgr.acquireLocks(qp, ctx, "tom"); runReaper(); @@ -457,7 +460,8 @@ public void testHeartbeater() throws Exception { // then the txn will time out and be aborted. // Here we just don't send the heartbeat at all - an infinite delay. // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT - ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); + ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", TxnType.DEFAULT, + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); txnMgr.acquireLocks(qp, ctx, "jerry"); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java new file mode 100644 index 0000000000..d6aad87d33 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.ql.io.AcidUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Transaction type derived from the original query test. + */ +@RunWith(value = Parameterized.class) +public class TestParseUtils { + + private String query; + private TxnType txnType; + + public TestParseUtils(String query, TxnType txnType) { + this.query = query; + this.txnType = txnType; + } + + @Parameters + public static Collection data() { + return Arrays.asList( + new Object[][]{ + {"SELECT current_timestamp()", TxnType.READ_ONLY}, + {"SELECT count(*) FROM a", TxnType.READ_ONLY}, + {"SELECT count(*) FROM a JOIN b ON a.id = b.id", TxnType.READ_ONLY}, + + {"WITH a AS (SELECT current_timestamp()) " + + " SELECT * FROM a", TxnType.READ_ONLY}, + + {"INSERT into a values (1, 2)", TxnType.DEFAULT}, + {"INSERT into a select * from b", TxnType.DEFAULT}, + {"INSERT overwrite table a SELECT * from b", TxnType.DEFAULT}, + + {"WITH a AS (SELECT current_timestamp()) " + + " INSERT into b SELECT * FROM a", TxnType.DEFAULT}, + + {"UPDATE a set col_b = 1", TxnType.DEFAULT}, + {"DELETE from a WHERE col_b = 1", TxnType.DEFAULT}, + + {"CREATE table a (col_b int)", TxnType.DEFAULT}, + {"CREATE table a AS SELECT * FROM b", TxnType.DEFAULT}, + {"DROP table a", TxnType.DEFAULT}, + + {"MERGE into a trg using b src " + + " on src.col_a = trg.col_a " + + "when matched then " + + " update set col_b = src.col_b " + + "when not matched then " + + " insert values (src.col_a, src.col_b)", + TxnType.DEFAULT}, + }); + } + + @Test + public void testTxnType() throws ParseException { + Assert.assertEquals( + AcidUtils.getTxnType(ParseUtils.parse(query)), txnType); + } +}