diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 8ea58e6..2374dfa 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -741,7 +741,7 @@ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) thro txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -753,11 +753,11 @@ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) thro txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -809,7 +809,7 @@ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws E txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -821,11 +821,11 @@ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws E txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -871,7 +871,7 @@ public void testTransactionBatchCommit_Json() throws Exception { txnBatch.write(rec1.getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -998,7 +998,7 @@ public void testTransactionBatchAbortAndCommit() throws Exception { txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 17, 26, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -1017,13 +1017,13 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}"); txnBatch.beginNextTransaction(); txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 19, 28, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -1034,14 +1034,14 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.write("3,Hello streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 19, 38, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); txnBatch.beginNextTransaction(); txnBatch.write("4,Welcome to streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 19, 38, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}"); @@ -1078,11 +1078,11 @@ public void testInterleavedTransactionBatchCommits() throws Exception { txnBatch2.commit(); - checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 27, 36, 1, 1, "{3, Hello streaming - once again}"); txnBatch1.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 17, 36, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); txnBatch1.beginNextTransaction(); txnBatch1.write("2,Welcome to streaming".getBytes()); @@ -1090,17 +1090,17 @@ public void testInterleavedTransactionBatchCommits() throws Exception { txnBatch2.beginNextTransaction(); txnBatch2.write("4,Welcome to streaming - once again".getBytes()); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 17, 36, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); txnBatch1.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 17, 36, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); txnBatch2.commit(); - checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 17, 36, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}"); @@ -1769,7 +1769,7 @@ public void testErrorHandling() throws Exception { txnBatch.heartbeat();//this is no-op on closed batch txnBatch.abort();//ditto GetOpenTxnsInfoResponse r = msClient.showTxns(); - Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark()); + Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark()); List ti = r.getOpen_txns(); Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); @@ -1833,7 +1833,7 @@ public void testErrorHandling() throws Exception { expectedEx != null && expectedEx.getMessage().contains("has been closed()")); r = msClient.showTxns(); - Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark()); + Assert.assertEquals("HWM didn't match", 23, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); @@ -1856,7 +1856,7 @@ public void testErrorHandling() throws Exception { expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); r = msClient.showTxns(); - Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark()); + Assert.assertEquals("HWM didn't match", 25, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 66ed8ca..e89f38a 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -659,13 +659,13 @@ public void minorCompactWhileStreaming() throws Exception { Path resultFile = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000004")) { + if (names[i].equals("delta_0000003_0000006")) { resultFile = stat[i].getPath(); } } Arrays.sort(names); - String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + String[] expected = new String[]{"delta_0000003_0000004", + "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } @@ -718,10 +718,10 @@ public void majorCompactWhileStreaming() throws Exception { FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000004"); + Assert.assertEquals(name, "base_0000006"); checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); @@ -778,13 +778,13 @@ public void minorCompactAfterAbort() throws Exception { Path resultDelta = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000004")) { + if (names[i].equals("delta_0000003_0000006")) { resultDelta = stat[i].getPath(); } } Arrays.sort(names); - String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004"}; + String[] expected = new String[]{"delta_0000003_0000005", + "delta_0000003_0000006", "delta_0000003_0000006"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } @@ -844,11 +844,11 @@ public void majorCompactAfterAbort() throws Exception { Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat)); } if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - if (!name.equals("base_0000004")) { - Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004"); + if (!name.equals("base_0000006")) { + Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006"); } checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { @@ -899,10 +899,10 @@ public void majorCompactWhileStreamingForSplitUpdate() throws Exception { FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000004"); + Assert.assertEquals(name, "base_0000006"); checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); @@ -923,18 +923,18 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce " STORED AS ORC TBLPROPERTIES ('transactional'='true'," + "'transactional_properties'='default')", driver); - // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1 + // Insert some data -> this will generate only insert deltas and no delete deltas: delta_3_3 executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); - // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2 + // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_4_4 executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); - // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3 + // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_5_5 executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver); // Now, compact -> Compaction produces a single range for both delta and delete delta - // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3 - // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3. + // That is, both delta and delete_deltas would be compacted into delta_3_5 and delete_delta_3_5 + // even though there are only two delta_3_3, delta_4_4 and one delete_delta_5_5. TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); Worker t = new Worker(); @@ -957,12 +957,12 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce Path minorCompactedDelta = null; for (int i = 0; i < deltas.length; i++) { deltas[i] = stat[i].getPath().getName(); - if (deltas[i].equals("delta_0000001_0000003")) { + if (deltas[i].equals("delta_0000003_0000005")) { minorCompactedDelta = stat[i].getPath(); } } Arrays.sort(deltas); - String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"}; + String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000005", "delta_0000004_0000004_0000"}; if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } @@ -975,12 +975,12 @@ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exce Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) { + if (deleteDeltas[i].equals("delete_delta_0000003_0000005")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000005", "delete_delta_0000005_0000005_0000"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } @@ -1034,12 +1034,12 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception Path minorCompactedDelta = null; for (int i = 0; i < deltas.length; i++) { deltas[i] = stat[i].getPath().getName(); - if (deltas[i].equals("delta_0000001_0000002")) { + if (deltas[i].equals("delta_0000003_0000004")) { minorCompactedDelta = stat[i].getPath(); } } Arrays.sort(deltas); - String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"}; + String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000004", "delta_0000004_0000004_0000"}; if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } @@ -1052,12 +1052,12 @@ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) { + if (deleteDeltas[i].equals("delete_delta_0000003_0000004")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000004"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } @@ -1111,13 +1111,13 @@ public void minorCompactWhileStreamingWithSplitUpdate() throws Exception { Path resultFile = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000001_0000004")) { + if (names[i].equals("delta_0000003_0000006")) { resultFile = stat[i].getPath(); } } Arrays.sort(names); - String[] expected = new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + String[] expected = new String[]{"delta_0000003_0000004", + "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d378d06..8c46382 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -983,6 +983,12 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc case SELECT: updateTxnComponents = false; break; + case NO_TXN: + /*this constant is a bit of a misnomer since we now always have a txn context. It just means + * the operation is such that we don't care what tables/partitions it affected as it + * doesn't trigger a compaction or conflict detection*/ + updateTxnComponents = false; + break; default: //since we have an open transaction, only 4 values above are expected throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 630df43..b9ccdb3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -86,16 +86,17 @@ import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; +import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.ParseDriver; import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; @@ -380,7 +381,7 @@ public int compile(String command, boolean resetTaskIds) { // deferClose indicates if the close/destroy should be deferred when the process has been // interrupted, it should be set to true if the compile is called within another method like // runInternal, which defers the close to the called in that method. - public int compile(String command, boolean resetTaskIds, boolean deferClose) { + private int compile(String command, boolean resetTaskIds, boolean deferClose) { PerfLogger perfLogger = SessionState.getPerfLogger(true); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); @@ -491,7 +492,18 @@ public void run() { // analyzer (this is when the connection to the metastore is made) but before we analyze, // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - + if(checkConcurrency()) { + /* + todo: should this do perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);? + */ + String userFromUGI = getUserFromUGI(); + if (!txnManager.isTxnOpen()) { + if(userFromUGI == null) { + return 10; + } + txnManager.openTxn(ctx, userFromUGI); + } + } // Do semantic analysis and plan generation if (saHooks != null && !saHooks.isEmpty()) { HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); @@ -1073,6 +1085,21 @@ private void recordValidTxns() throws LockException { LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); } + private String getUserFromUGI() { + // Don't use the userName member, as it may or may not have been set. Get the value from + // conf, which calls into getUGI to figure out who the process is running as. + String userFromUGI; + try { + return conf.getUser(); + } catch (IOException e) { + errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage, + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + return null; + } /** * Acquire read and write locks needed by the statement. The list of objects to be locked are * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is @@ -1081,47 +1108,19 @@ private void recordValidTxns() throws LockException { * * This method also records the list of valid transactions. This must be done after any * transactions have been opened and locks acquired. - * @param startTxnImplicitly in AC=false, the 1st DML starts a txn **/ - private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { + private int acquireLocksAndOpenTxn() { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); SessionState ss = SessionState.get(); HiveTxnManager txnMgr = ss.getTxnMgr(); - if(startTxnImplicitly) { - assert !txnMgr.getAutoCommit(); - } try { - // Don't use the userName member, as it may or may not have been set. Get the value from - // conf, which calls into getUGI to figure out who the process is running as. - String userFromUGI; - try { - userFromUGI = conf.getUser(); - } catch (IOException e) { - errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage(); - SQLState = ErrorMsg.findSQLState(e.getMessage()); - downstreamError = e; - console.printError(errorMessage, - "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + String userFromUGI = getUserFromUGI(); + if(userFromUGI == null) { return 10; } - - boolean initiatingTransaction = false; - boolean readOnlyQueryInAutoCommit = false; - if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION || - (!txnMgr.getAutoCommit() && startTxnImplicitly)) { - if(txnMgr.isTxnOpen()) { - throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId()); - } - // We are writing to tables in an ACID compliant way, so we need to open a transaction - txnMgr.openTxn(ctx, userFromUGI); - initiatingTransaction = true; - } - else { - readOnlyQueryInAutoCommit = txnMgr.getAutoCommit() && plan.getOperation() == HiveOperation.QUERY && !haveAcidWrite(); - } // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { for (FileSinkDesc desc : acidSinks) { @@ -1136,16 +1135,13 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will see the changes made by 1st one. This takes care of autoCommit=true case. For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking - in the lock manager.*/ - txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); - if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) { - //For multi-stmt txns we should record the snapshot when txn starts but - // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction} - //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot - //for each statement. + in the lock manager. + It's imperative that {@code acquireLocks()} is called for all commands so that + HiveTxnManager can transition its state machine correctly*/ + txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);//todo: we call this for SET AUTOCOMMIT ? we should call this for everything + if(plan.getOperation() == HiveOperation.START_TRANSACTION || acidInQuery) { recordValidTxns(); } - return 0; } catch (Exception e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); @@ -1405,6 +1401,22 @@ private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, return compileLock; } + private boolean isReadOnly(ASTNode ast) { + if(ast == null) { + return false; + } + if(ast.getType() == HiveParser.TOK_QUERY) { + return isReadOnly((ASTNode) ast.getFirstChildWithType(HiveParser.TOK_INSERT)); + } + if(ast.getType() == HiveParser.TOK_INSERT) { + return isReadOnly((ASTNode)ast.getFirstChildWithType(HiveParser.TOK_DESTINATION)); + } + if(ast.getType() == HiveParser.TOK_DESTINATION) { + return null != ast.getFirstChildWithType(HiveParser.TOK_DIR); + } + return false; + } + private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { errorMessage = null; @@ -1473,52 +1485,12 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp HiveTxnManager txnManager = SessionState.get().getTxnMgr(); ctx.setHiveTxnManager(txnManager); - boolean startTxnImplicitly = false; - { - //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open - //DDL is not allowed in a txn, etc. - //an error in an open txn does a rollback of the txn - if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) { - assert !txnManager.getAutoCommit() : "didn't expect AC=true"; - return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null, - plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId()))); - } - if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) { - return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName())); - } - if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) { - //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics - //also, indirectly allows DDL to be executed outside a txn context - startTxnImplicitly = true; - } - if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) { - return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName())); - } - } - if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) { - try { - if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { - /*here, if there is an open txn, we want to commit it; this behavior matches - * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ - releaseLocksAndCommitOrRollback(true, null); - txnManager.setAutoCommit(true); - } - else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { - txnManager.setAutoCommit(false); - } - else {/*didn't change autoCommit value - no-op*/} - } - catch(LockException e) { - return handleHiveException(e, 12); - } - } - if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation if (isInterrupted()) { ret = handleInterruption("at acquiring the lock."); } else { - ret = acquireLocksAndOpenTxn(startTxnImplicitly); + ret = acquireLocksAndOpenTxn(); } if (ret != 0) { return rollback(createProcessorResponse(ret)); @@ -1659,6 +1631,13 @@ private boolean isExplicitLockOperation() { private CommandProcessorResponse createProcessorResponse(int ret) { SessionState.getPerfLogger().cleanupPerfLogMetrics(); queryDisplay.setErrorMessage(errorMessage); + if(downstreamError != null && downstreamError instanceof HiveException) { + ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg(); + if(em != null) { + return new CommandProcessorResponse(ret, errorMessage, SQLState, + schema, downstreamError, em.getErrorCode(), null); + } + } return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError); } diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index e8c8ae6..810351b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -35,6 +35,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -108,8 +109,12 @@ private Boolean autoCommitValue; public QueryPlan() { - this.reducerTimeStatsPerJobList = new ArrayList(); - operation = null; + this(null); + } + @VisibleForTesting + protected QueryPlan(HiveOperation command) { + this.reducerTimeStatsPerJobList = new ArrayList<>(); + this.operation = command; } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 62f7c5a..1b5dba0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -21,6 +21,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,6 +154,8 @@ long openTxn(Context ctx, String user, long delay) throws LockException { try { txnId = getMS().openTxn(user); statementId = 0; + isExplicitTransaction = false; + startTransactionCount = 0; LOG.debug("Opened " + JavaUtils.txnIdToString(txnId)); ctx.setHeartbeater(startHeartbeat(delay)); return txnId; @@ -186,17 +192,87 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo } /** + * to include in error msgs + * @param queryPlan + * @return + */ + private static String getQueryIdWaterMark(QueryPlan queryPlan) { + return "queryId=" + queryPlan.getQueryId(); + } + + /** + * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot + * include any Operations which cannot be rolled back. + * If false, it's a single statement transaction which can include any statement. This is not a + * contradiction from the user point of view who doesn't know anything about the implicit txn + * and cannot call rollback (the statement of course can fail in which case there is nothing to + * rollback (assuming the statement is well implemented)). + * + * This is done so that all commands run in a transaction which simplifies implementation and + * allows a simple implementation of multi-statement txns which don't require a lock manager + * capable of deadlock detection. (todo: not fully implemented; elaborate on how this works) + * + * Also, critically important, ensuring that everything runs in a transaction assigns an order + * to all operations in the system - needed for replication/DR. + */ + private boolean isExplicitTransaction = false; + private int startTransactionCount = 0; + private void verifyState(QueryPlan queryPlan) throws LockException { + if(!isTxnOpen()) { + throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + " for " + getQueryIdWaterMark(queryPlan)); + } + if(queryPlan.getOperation() == null) { + throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan)); + } + switch (queryPlan.getOperation()) { + case START_TRANSACTION: + if(getAutoCommit()) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, queryPlan.getOperationName()); + } + isExplicitTransaction = true; + if(++startTransactionCount > 1) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName()); + } + break; + case COMMIT: + case ROLLBACK: + if(!isTxnOpen()) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName()); + } + if(getAutoCommit()) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, queryPlan.getOperationName()); + } + break; + case SET_AUTOCOMMIT: + assert isTxnOpen();// must be so since Driver opens a txn for all commands whenever it's not open + /*if 'set autocommit true' is sent to an open txn, the effect will be to commit right after this set stmt + * 'set autocommit false' is effectively no-op in this case.*/ + break; + default: + if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName()); + } + } + /*todo: check ReadEntity/WriteEntity to determine whether it's reading/writing acid (and if any resources are not acid) + * and raise an appropriate error - depending on what semantics we go with*/ + } + /** * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)} * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING * @return null if no locks were needed */ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { init(); - // Make sure we've built the lock manager + // Make sure we've built the lock manager getLockManager(); - + verifyState(plan); boolean atLeastOneLock = false; queryId = plan.getQueryId(); + switch (plan.getOperation()) { + case SET_AUTOCOMMIT: + setAutoCommit(plan.getAutoCommitValue()); + return null; + } LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId); //link queryId to txnId @@ -240,8 +316,8 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB // This is a file or something we don't hold locks for. continue; } - if(t != null && AcidUtils.isAcidTable(t)) { - compBuilder.setIsAcid(true); + if(t != null) { + compBuilder.setIsAcid(AcidUtils.isAcidTable(t)); } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); @@ -262,7 +338,34 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB } LockComponentBuilder compBuilder = new LockComponentBuilder(); Table t = null; + switch (output.getType()) { + case DATABASE: + compBuilder.setDbName(output.getDatabase().getName()); + break; + + case TABLE: + case DUMMYPARTITION: // in case of dynamic partitioning lock the table + t = output.getTable(); + compBuilder.setDbName(t.getDbName()); + compBuilder.setTableName(t.getTableName()); + break; + + case PARTITION: + compBuilder.setPartitionName(output.getPartition().getName()); + t = output.getPartition().getTable(); + compBuilder.setDbName(t.getDbName()); + compBuilder.setTableName(t.getTableName()); + break; + + default: + // This is a file or something we don't hold locks for. + continue; + } switch (output.getWriteType()) { + /* + * todo: base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code... + * Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think + * makes sense everywhere). This however would be problematic for merge...*/ case DDL_EXCLUSIVE: case INSERT_OVERWRITE: compBuilder.setExclusive(); @@ -270,10 +373,9 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB break; case INSERT: - t = getTable(output); + assert t != null; if(AcidUtils.isAcidTable(t)) { compBuilder.setShared(); - compBuilder.setIsAcid(true); } else { if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) { @@ -281,7 +383,6 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB } else { // this is backward compatible for non-ACID resources, w/o ACID semantics compBuilder.setShared(); } - compBuilder.setIsAcid(false); } compBuilder.setOperationType(DataOperationType.INSERT); break; @@ -293,12 +394,10 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB case UPDATE: compBuilder.setSemiShared(); compBuilder.setOperationType(DataOperationType.UPDATE); - t = getTable(output); break; case DELETE: compBuilder.setSemiShared(); compBuilder.setOperationType(DataOperationType.DELETE); - t = getTable(output); break; case DDL_NO_LOCK: @@ -307,34 +406,11 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB default: throw new RuntimeException("Unknown write type " + output.getWriteType().toString()); - - } - switch (output.getType()) { - case DATABASE: - compBuilder.setDbName(output.getDatabase().getName()); - break; - - case TABLE: - case DUMMYPARTITION: // in case of dynamic partitioning lock the table - t = output.getTable(); - compBuilder.setDbName(t.getDbName()); - compBuilder.setTableName(t.getTableName()); - break; - - case PARTITION: - compBuilder.setPartitionName(output.getPartition().getName()); - t = output.getPartition().getTable(); - compBuilder.setDbName(t.getDbName()); - compBuilder.setTableName(t.getTableName()); - break; - - default: - // This is a file or something we don't hold locks for. - continue; } - if(t != null && AcidUtils.isAcidTable(t)) { - compBuilder.setIsAcid(true); + if(t != null) { + compBuilder.setIsAcid(AcidUtils.isAcidTable(t)); } + compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite()); LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); @@ -556,8 +632,24 @@ public String getTxnManagerName() { public boolean supportsExplicitLock() { return false; } - @Override + public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException { + throw new UnsupportedOperationException(); + } + @Override + public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException { + throw new UnsupportedOperationException(); + } + @Override + public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException { + throw new UnsupportedOperationException(); + } + @Override + public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException { + throw new UnsupportedOperationException(); + } + + @Override public boolean useNewShowLocksFormat() { return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 520d3de..31b2d59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -131,7 +131,9 @@ commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE - + commandType.put(HiveParser.TOK_UPDATE_TABLE, HiveOperation.UPDATE); + commandType.put(HiveParser.TOK_DELETE_FROM, HiveOperation.DELETE); + commandType.put(HiveParser.TOK_MERGE, HiveOperation.MERGE); } static { @@ -171,7 +173,21 @@ HiveOperation.ALTERTABLE_UPDATEPARTSTATS}); } - public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) + + public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) throws SemanticException { + BaseSemanticAnalyzer sem = getInternal(queryState, tree); + if(queryState.getHiveOperation() == null) { + String query = queryState.getQueryString(); + if(query != null) { + query = query.substring(0, 30); + } + throw new IllegalStateException("Unknown HiveOperation for query='" + query + "' queryId=" + + queryState.getQueryId()); + } + return sem; + } + + private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode tree) throws SemanticException { if (tree.getToken() == null) { throw new RuntimeException("Empty Syntax Tree"); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java index d333f91..c16e1ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java @@ -27,7 +27,7 @@ IMPORT("IMPORT", null, new Privilege[]{Privilege.ALTER_METADATA, Privilege.ALTER_DATA}), CREATEDATABASE("CREATEDATABASE", null, new Privilege[]{Privilege.CREATE}), DROPDATABASE("DROPDATABASE", null, new Privilege[]{Privilege.DROP}), - SWITCHDATABASE("SWITCHDATABASE", null, null), + SWITCHDATABASE("SWITCHDATABASE", null, null, true, false), LOCKDB("LOCKDATABASE", new Privilege[]{Privilege.LOCK}, null), UNLOCKDB("UNLOCKDATABASE", new Privilege[]{Privilege.LOCK}, null), DROPTABLE ("DROPTABLE", null, new Privilege[]{Privilege.DROP}), @@ -134,7 +134,13 @@ COMMIT("COMMIT", null, null, true, true), ROLLBACK("ROLLBACK", null, null, true, true), SET_AUTOCOMMIT("SET AUTOCOMMIT", null, null, true, false), - ABORT_TRANSACTIONS("ABORT TRANSACTIONS", null, null, false, false); + ABORT_TRANSACTIONS("ABORT TRANSACTIONS", null, null, false, false), + //todo: what should the PrivilegeS be for next 3 + UPDATE("UPDATE", null, null, true, false), + DELETE("DELETE", null, null, true, false), + MERGE("MERGE", null, null, true, false); + //todo: add INSERT & SELECT? and set it like VIEW related stuff in SemanticAnalyzerFactory? + //Note: if adding more operations, update HiveOperationType.java private String operationName; @@ -144,6 +150,7 @@ /** * Only a small set of operations is allowed inside an open transactions, e.g. DML + * Move this into {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager} ? */ private final boolean allowedInTransaction; private final boolean requiresOpenTransaction; diff --git ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java index 7da44e8..02f8dc7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java +++ ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java @@ -132,7 +132,7 @@ RESET, DFS, ADD, - DELETE, + DELETE,//todo: what is this? it's not HiveOperation.DELETE... COMPILE, START_TRANSACTION, COMMIT, diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 1e9774f..ec76aa1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -459,6 +459,21 @@ public HiveTxnManager getTxnMgr() { return txnMgr; } + /** + * This only for testing. It allows to switch the manager before the (test) operation so that + * it's not coupled to the executing thread. Since tests run against Derby which often wedges + * under concurrent access, tests must use a single thead and simulate concurrent access. + * For example, {@code TestDbTxnManager2} + */ + @VisibleForTesting + public HiveTxnManager setTxnMgr(HiveTxnManager mgr) { + if(!(sessionConf.getBoolVar(ConfVars.HIVE_IN_TEST) || sessionConf.getBoolVar(ConfVars.HIVE_IN_TEZ_TEST))) { + throw new IllegalStateException("Only for testing!"); + } + HiveTxnManager tmp = txnMgr; + txnMgr = mgr; + return tmp; + } public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException { try { return getHdfsEncryptionShim(FileSystem.get(sessionConf)); diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index adfe98a..23efce0 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -713,7 +713,7 @@ public void testLockSWSWSR() throws Exception { res = txnHandler.lock(req); assertTrue(res.getState() == LockState.ACQUIRED); } - + @Ignore("now that every op has a txn ctx, we don't produce the error expected here....") @Test public void testWrongLockForOperation() throws Exception { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 6dae7ba..e3eb335 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -231,17 +231,16 @@ public void testErrors() throws Exception { Assert.assertEquals("Expected update of bucket column to fail", "FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.", cpr3.getErrorMessage()); - //line below should in principle work but Driver doesn't propagate errorCode properly - //Assert.assertEquals("Expected update of bucket column to fail", ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode()); + Assert.assertEquals("Expected update of bucket column to fail", ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode()); cpr3 = runStatementOnDriverNegative("commit work");//not allowed in AC=true Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); cpr3 = runStatementOnDriverNegative("rollback work");//not allowed in AC=true Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); runStatementOnDriver("set autocommit false"); - cpr3 = runStatementOnDriverNegative("commit");//not allowed in w/o tx - Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); - cpr3 = runStatementOnDriverNegative("rollback");//not allowed in w/o tx - Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); +// cpr3 = runStatementOnDriverNegative("commit");//not allowed in w/o tx +// Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); +// cpr3 = runStatementOnDriverNegative("rollback");//not allowed in w/o tx +// Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode()); runStatementOnDriver("start transaction"); cpr3 = runStatementOnDriverNegative("start transaction");//not allowed in a tx Assert.assertEquals("Expected start transaction to fail", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr3.getErrorCode()); @@ -468,7 +467,7 @@ public void testTimeOutReaper() throws Exception { } } Assert.assertNotNull(txnInfo); - Assert.assertEquals(2, txnInfo.getId()); + Assert.assertEquals(12, txnInfo.getId()); Assert.assertEquals(TxnState.OPEN, txnInfo.getState()); String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false); String[] vals = s.split("\\s+"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 6718ae9..5b6ef0f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -663,11 +663,11 @@ public void testNonAcidToAcidConversion3() throws Exception { FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); if (numDelta == 1) { - Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numDelta == 2) { - Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT, buckets.length); Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); @@ -712,7 +712,7 @@ public void testNonAcidToAcidConversion3() throws Exception { Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numBase == 2) { // The new base dir now has two bucket files, since the delta dir has two bucket files - Assert.assertEquals("base_0000002", status[i].getPath().getName()); + Assert.assertEquals("base_0000023", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT, buckets.length); Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); @@ -739,7 +739,7 @@ public void testNonAcidToAcidConversion3() throws Exception { status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(1, status.length); - Assert.assertEquals("base_0000002", status[0].getPath().getName()); + Assert.assertEquals("base_0000023", status[0].getPath().getName()); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); Assert.assertEquals(BUCKET_COUNT, buckets.length); @@ -780,6 +780,11 @@ public void testSimpleRead() throws Exception { Assert.assertEquals("Missing data", 3, rs.size()); } @Test + public void testSimpleRead2() throws Exception { + runStatementOnDriver("select a from " + Table.ACIDTBL); + } + + @Test public void testUpdateMixedCase() throws Exception { int[][] tableData = {{1,2},{3,3},{5,3}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java index 67e917c..520e958 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java @@ -84,7 +84,6 @@ public void testFailureOnAlteringTransactionalProperties() throws Exception { runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')"); } - /** * Test the query correctness and directory layout for ACID table conversion with split-update * enabled. @@ -96,7 +95,8 @@ public void testFailureOnAlteringTransactionalProperties() throws Exception { * @throws Exception */ @Test - public void testNonAcidToAcidSplitUpdateConversion1() throws Exception { + @Override + public void testNonAcidToAcidConversion1() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status; @@ -226,7 +226,8 @@ public void testNonAcidToAcidSplitUpdateConversion1() throws Exception { * @throws Exception */ @Test - public void testNonAcidToAcidSplitUpdateConversion2() throws Exception { + @Override + public void testNonAcidToAcidConversion2() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status; @@ -360,7 +361,8 @@ public void testNonAcidToAcidSplitUpdateConversion2() throws Exception { * @throws Exception */ @Test - public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { + @Override + public void testNonAcidToAcidConversion3() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status; @@ -442,11 +444,11 @@ public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); if (numDelta == 1) { - Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numDelta == 2) { - Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName()); + Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName()); Assert.assertEquals(1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } @@ -455,7 +457,7 @@ public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); if (numDeleteDelta == 1) { - Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName()); + Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName()); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } @@ -502,7 +504,7 @@ public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numBase == 2) { // The new base dir now has two bucket files, since the delta dir has two bucket files - Assert.assertEquals("base_0000002", status[i].getPath().getName()); + Assert.assertEquals("base_0000023", status[i].getPath().getName()); Assert.assertEquals(1, buckets.length); Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } @@ -528,7 +530,7 @@ public void testNonAcidToAcidSplitUpdateConversion3() throws Exception { status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(1, status.length); - Assert.assertEquals("base_0000002", status[0].getPath().getName()); + Assert.assertEquals("base_0000023", status[0].getPath().getName()); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); Assert.assertEquals(1, buckets.length); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 9bfc7d1..a7f109b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -36,11 +36,13 @@ import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; @@ -79,13 +81,14 @@ public TestDbTxnManager() throws Exception { @Test public void testSingleReadTable() throws Exception { addTableInput(); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -93,13 +96,14 @@ public void testSingleReadTable() throws Exception { @Test public void testSingleReadPartition() throws Exception { addPartitionInput(newTable(true)); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); + txnMgr.openTxn(ctx, null); txnMgr.acquireLocks(qp, ctx, null); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); @@ -111,13 +115,14 @@ public void testSingleReadMultiPartition() throws Exception { addPartitionInput(t); addPartitionInput(t); addPartitionInput(t); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(3, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -129,13 +134,14 @@ public void testJoin() throws Exception { addPartitionInput(t); addPartitionInput(t); addTableInput(); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(4, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -143,7 +149,7 @@ public void testJoin() throws Exception { @Test public void testSingleWriteTable() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); @@ -159,7 +165,7 @@ public void testSingleWriteTable() throws Exception { @Test public void testSingleWritePartition() throws Exception { WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); @@ -174,7 +180,7 @@ public void testSingleWritePartition() throws Exception { @Test public void testWriteDynamicPartition() throws Exception { WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); @@ -217,7 +223,7 @@ private void runReaper() throws Exception { @Test public void testExceptions() throws Exception { addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); runReaper(); @@ -243,10 +249,11 @@ public void testExceptions() throws Exception { txnMgr.rollbackTxn();//this is idempotent } + @Ignore("This seems useless now that we have a txn for everything") @Test public void testLockTimeout() throws Exception { addPartitionInput(newTable(true)); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); //make sure it works with nothing to expire testLockExpiration(txnMgr, 0, true); @@ -294,7 +301,7 @@ public void testReadWrite() throws Exception { addPartitionInput(t); addPartitionInput(t); WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); @@ -309,7 +316,7 @@ public void testReadWrite() throws Exception { @Test public void testUpdate() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.UPDATE); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.UPDATE); txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); @@ -324,7 +331,7 @@ public void testUpdate() throws Exception { @Test public void testDelete() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.DELETE); txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); @@ -339,7 +346,7 @@ public void testDelete() throws Exception { @Test public void testRollback() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.DELETE); txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); @@ -354,13 +361,14 @@ public void testRollback() throws Exception { @Test public void testDDLExclusive() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DDL_EXCLUSIVE); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.DROPTABLE); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.rollbackTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -368,13 +376,14 @@ public void testDDLExclusive() throws Exception { @Test public void testDDLShared() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DDL_SHARED); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.ALTERTABLE_EXCHANGEPARTITION); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId)); - txnMgr.getLockManager().unlock(locks.get(0)); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -382,10 +391,13 @@ public void testDDLShared() throws Exception { @Test public void testDDLNoLock() throws Exception { WriteEntity we = addTableOutput(WriteEntity.WriteType.DDL_NO_LOCK); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.CREATEDATABASE); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertNull(locks); + txnMgr.rollbackTxn(); + } @Test @@ -406,11 +418,12 @@ public void concurrencyFalse() throws Exception { @Test public void testLockAcquisitionAndRelease() throws Exception { addTableInput(); - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); + txnMgr.openTxn(ctx, "fred"); txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); - txnMgr.releaseLocks(locks); + txnMgr.commitTxn(); locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } @@ -421,7 +434,7 @@ public void testHeartbeater() throws Exception { addTableInput(); LockException exception = null; - QueryPlan qp = new MockQueryPlan(this); + QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); // Case 1: If there's no delay for the heartbeat, txn should be able to commit txnMgr.openTxn(ctx, "fred"); @@ -493,7 +506,8 @@ public void tearDown() throws Exception { private final HashSet outputs = new HashSet<>(); private final String queryId; - MockQueryPlan(TestDbTxnManager test) { + MockQueryPlan(TestDbTxnManager test, HiveOperation operation) { + super(operation); inputs.addAll(test.readEntities); outputs.addAll(test.writeEntities); queryId = makeQueryId(); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 7cae109..428f4d9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -44,14 +44,20 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; /** * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} @@ -65,11 +71,13 @@ * The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB. */ public class TestDbTxnManager2 { + private static final Logger LOG = LoggerFactory.getLogger(TestDbTxnManager2.class); private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; private Driver driver; TxnStore txnHandler; + public TestDbTxnManager2() throws Exception { conf @@ -106,7 +114,6 @@ public void testLocksInSubquery() throws Exception { checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)")); - txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -115,7 +122,6 @@ public void testLocksInSubquery() throws Exception { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)")); - txnMgr.openTxn(ctx, "one"); txnMgr.acquireLocks(driver.getPlan(), ctx, "one"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -124,13 +130,13 @@ public void testLocksInSubquery() throws Exception { txnMgr.rollbackTxn(); checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)")); - txnMgr.openTxn(ctx, "three"); txnMgr.acquireLocks(driver.getPlan(), ctx, "three"); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks); + txnMgr.rollbackTxn(); } @Test public void createTable() throws Exception { @@ -141,7 +147,7 @@ public void createTable() throws Exception { List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks); - txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); } @Test @@ -158,7 +164,7 @@ public void insertOverwriteCreate() throws Exception { Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T2", null, locks); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T3", null, locks); - txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); cpr = driver.run("drop table if exists T1"); checkCmdOnDriver(cpr); @@ -179,7 +185,7 @@ public void insertOverwritePartitionedCreate() throws Exception { Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T5", null, locks); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T4", null, locks); - txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks()); + txnMgr.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); cpr = driver.run("drop table if exists T5"); checkCmdOnDriver(cpr); @@ -195,23 +201,23 @@ public void basicBlocking() throws Exception { checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6 List selectLocks = ctx.getHiveLocks(); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("drop table if exists T6"); checkCmdOnDriver(cpr); //tries to get X lock on T1 and gets Waiting state - LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); + LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); - txnMgr.getLockManager().releaseLocks(selectLocks);//release S on T6 + txnMgr.rollbackTxn();//release S on T6 //attempt to X on T6 again - succeed lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T6", null, locks); - List xLock = new ArrayList(0); - xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(xLock); + txnMgr2.rollbackTxn(); cpr = driver.run("drop table if exists T6"); locks = getLocks(); Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); @@ -224,26 +230,24 @@ public void lockConflictDbTable() throws Exception { checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6"); + cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");//gets SS lock on T7 checkCmdOnDriver(cpr); - txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); - checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); //txnMgr2.openTxn("Fiddler"); - ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7 + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false); List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks); txnMgr.commitTxn(); - ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); + ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks); - List xLock = new ArrayList(0); - xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr2.getLockManager().releaseLocks(xLock); + txnMgr2.commitTxn(); } @Test public void updateSelectUpdate() throws Exception { @@ -252,12 +256,12 @@ public void updateSelectUpdate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("delete from T8 where b = 89"); checkCmdOnDriver(cpr); - txnMgr.openTxn(ctx, "Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.setAutoCommit(false); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 checkCmdOnDriver(cpr); - HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "Fiddler"); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler"); checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1")); ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer @@ -273,6 +277,7 @@ public void updateSelectUpdate() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks); txnMgr2.commitTxn(); + swapTxnManager(txnMgr); cpr = driver.run("drop table if exists T6"); locks = getLocks(); Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); @@ -284,21 +289,22 @@ public void testLockRetryLimit() throws Exception { dropTable(new String[] {"T9"}); conf.setIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES, 2); conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true); - HiveTxnManager otherTxnMgr = new DbTxnManager(); - ((DbTxnManager)otherTxnMgr).setHiveConf(conf); CommandProcessorResponse cpr = driver.run("create table T9(a int)"); checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("select * from T9"); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Vincent Vega"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks); - + + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("drop table T9"); checkCmdOnDriver(cpr); try { - otherTxnMgr.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield"); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield"); } catch(LockException ex) { Assert.assertEquals("Got wrong lock exception", ErrorMsg.LOCK_ACQUIRE_TIMEDOUT, ex.getCanonicalErrorMsg()); @@ -306,7 +312,7 @@ public void testLockRetryLimit() throws Exception { locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks); - otherTxnMgr.closeTxnManager(); + txnMgr2.closeTxnManager(); } /** @@ -324,13 +330,15 @@ public void testLockBlockedBy() throws Exception { cpr = driver.compileAndRespond("select * from TAB_BLOCKED"); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("drop table TAB_BLOCKED"); checkCmdOnDriver(cpr); - ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking - locks = getLocks(txnMgr); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "TAB_BLOCKED", null, locks); @@ -597,9 +605,10 @@ public void checkExpectedLocks() throws Exception { List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks); - List relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); +// List relLocks = new ArrayList(1); +// relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); +// txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn();; cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)"); checkCmdOnDriver(cpr); @@ -607,9 +616,10 @@ public void checkExpectedLocks() throws Exception { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); +// relLocks = new ArrayList(1); +// relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); +// txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)"); checkCmdOnDriver(cpr); @@ -617,9 +627,10 @@ public void checkExpectedLocks() throws Exception { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); +// relLocks = new ArrayList(1); +// relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); +// txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)"); checkCmdOnDriver(cpr); @@ -627,19 +638,21 @@ public void checkExpectedLocks() throws Exception { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); - +// relLocks = new ArrayList(1); +// relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); +// txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); + cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1"); checkCmdOnDriver(cpr); lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks); - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); +// relLocks = new ArrayList(1); +// relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); +// txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1"); checkCmdOnDriver(cpr); @@ -647,9 +660,10 @@ public void checkExpectedLocks() throws Exception { locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks);//https://issues.apache.org/jira/browse/HIVE-13212 - relLocks = new ArrayList(1); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); +// relLocks = new ArrayList(1); +// relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); +// txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr.rollbackTxn(); } /** * Check to make sure we acquire proper locks for queries involving acid and non-acid tables @@ -726,6 +740,16 @@ public static ShowLocksResponseElement checkLock(LockType expectedType, LockStat throw new IllegalStateException("How did it get here?!"); } + /** + * SessionState is stored in ThreadLoacal; UnitTest runs in a single thread (otherwise Derby wedges) + * {@link HiveTxnManager} instances are per SessionState. + * So to be able to simulate concurrent locks/transactions s/o forking threads we just swap + * the TxnManager instance in the session (hacky but nothing is actually threading so it allows us + * to write good tests) + */ + private static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) { + return SessionState.get().setTxnMgr(txnMgr); + } @Test public void testShowLocksFilterOptions() throws Exception { CommandProcessorResponse cpr = driver.run("drop table if exists db1.t14"); @@ -756,26 +780,32 @@ public void testShowLocksFilterOptions() throws Exception { // Acquire different locks at different levels + HiveTxnManager txnMgr1 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr1); cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='today') values (1, 2)"); checkCmdOnDriver(cpr); - txnMgr.acquireLocks(driver.getPlan(), ctx, "Tom"); + txnMgr1.acquireLocks(driver.getPlan(), ctx, "Tom"); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='tomorrow') values (3, 4)"); checkCmdOnDriver(cpr); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Jerry"); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr3); cpr = driver.compileAndRespond("select * from db2.t15"); checkCmdOnDriver(cpr); txnMgr3.acquireLocks(driver.getPlan(), ctx, "Donald"); HiveTxnManager txnMgr4 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr4); cpr = driver.compileAndRespond("select * from db2.t16"); checkCmdOnDriver(cpr); txnMgr4.acquireLocks(driver.getPlan(), ctx, "Hillary"); HiveTxnManager txnMgr5 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr5); cpr = driver.compileAndRespond("select * from db2.t14"); checkCmdOnDriver(cpr); txnMgr5.acquireLocks(driver.getPlan(), ctx, "Obama"); @@ -799,6 +829,7 @@ public void testShowLocksFilterOptions() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "db2", "t14", null, locks); // SHOW LOCKS t14 + swapTxnManager(txnMgr); cpr = driver.run("use db1"); checkCmdOnDriver(cpr); locks = getLocksWithFilterOptions(txnMgr, null, "t14", null); @@ -847,14 +878,13 @@ public void testWriteSetTracking1() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART")); - txnMgr.openTxn(ctx, "Nicholas"); - checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas"); - HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); txnMgr.commitTxn(); - txnMgr2.openTxn(ctx, "Alexandra"); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); - txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas"); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Alexandra"); txnMgr2.commitTxn(); } private void dropTable(String[] tabs) throws Exception { @@ -899,16 +929,17 @@ public void testWriteSetTracking3() throws Exception { "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)")); - HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId = txnMgr.openTxn(ctx, "Known"); - long txnId2 = txnMgr2.openTxn(ctx, "Unknown"); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + long txnId = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); List locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + long txnId2 = txnMgr2.getCurrentTxnId(); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); locks = getLocks(txnMgr2);//should not matter which txnMgr is used here Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -933,7 +964,7 @@ public void testWriteSetTracking3() throws Exception { Assert.assertTrue("Didn't get exception", expectedException != null); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", - "Aborting [txnid:3,3] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:2,3] u/u", + "Aborting [txnid:"+txnId2+","+txnId2+"] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:"+txnId+","+txnId2+"] u/u", expectedException.getCause().getMessage()); } /** @@ -1067,7 +1098,7 @@ public void testWriteSetTracking6() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "Horton"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton"); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -1080,6 +1111,7 @@ public void testWriteSetTracking6() throws Exception { locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks); + txnMgr.commitTxn(); TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf); Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); } @@ -1097,40 +1129,41 @@ public void testWriteSetTracking7() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - + swapTxnManager(txnMgr2); //test with predicates such that partition pruning works - txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'")); + long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'")); + long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of txnid:idTxnUpdate1 AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//txnid:idTxnUpdate1 locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks); - //completion of txnid:3 + //completion of txnid:idTxnUpdate2 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//txnid:idTxnUpdate2 //now both txns concurrently updated TAB2 but different partitions. Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1147,8 +1180,9 @@ public void testWriteSetTracking7() throws Exception { "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4 - txnMgr2.openTxn(ctx, "T5"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + long idTxnUpdate3 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1156,8 +1190,9 @@ public void testWriteSetTracking7() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T6"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2")); + long idTxnUpdate4 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 4, locks.size()); @@ -1166,24 +1201,24 @@ public void testWriteSetTracking7() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks); - //this simulates the completion of txnid:5 + //this simulates the completion of txnid:idTxnUpdate3 adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:5 + txnMgr2.commitTxn();//txnid:idTxnUpdate3 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - //completion of txnid:6 + //completion of txnid:idTxnUpdate4 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:6 + txnMgr.commitTxn();//txnid:idTxnUpdate4 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); @@ -1202,10 +1237,11 @@ public void testWriteSetTracking8() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1213,8 +1249,9 @@ public void testWriteSetTracking8() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'")); + long idTxnUpdate2 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -1222,23 +1259,23 @@ public void testWriteSetTracking8() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of txnid:idTxnUpdate1 AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//txnid:idTxnUpdate1 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of txnid:idTxnUpdate2 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//txnid:idTxnUpdate2 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); @@ -1256,10 +1293,11 @@ public void testWriteSetTracking9() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + long idTxnUpdate1 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); @@ -1267,8 +1305,9 @@ public void testWriteSetTracking9() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + long idTxnDelete1 = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 3, locks.size()); @@ -1276,30 +1315,30 @@ public void testWriteSetTracking9() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of txnid:idTxnUpdate1 AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=one")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//txnid:idTxnUpdate1 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of txnid:idTxnUpdate2 adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//txnid:idTxnUpdate2 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (idTxnUpdate1 - 1) + " and ctc_table='tab1'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1' and ctc_partition='p=one'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnUpdate1 + " and ctc_table='tab1' and ctc_partition='p=one'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=3 and ctc_table='tab1' and ctc_partition='p=two'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnDelete1 + " and ctc_table='tab1' and ctc_partition='p=two'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1318,7 +1357,7 @@ public void testWriteSetTracking10() throws Exception { checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); @@ -1327,7 +1366,7 @@ public void testWriteSetTracking10() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + swapTxnManager(txnMgr); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); @@ -1336,32 +1375,32 @@ public void testWriteSetTracking10() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of "Update tab2" txn AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.UPDATE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//"Update tab2" ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of "delete from tab1" txn adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); LockException exception = null; try { - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//"delete from tab1" } catch(LockException e) { exception = e; } Assert.assertNotEquals("Expected exception", null, exception); Assert.assertEquals("Exception msg doesn't match", - "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3] d/u", + "Aborting [txnid:5,5] due to a write conflict on default/tab1/p=two committed by [txnid:4,5] d/u", exception.getCause().getMessage()); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1378,19 +1417,22 @@ public void testWriteSetTracking11() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); checkCmdOnDriver(cpr); - checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')")); HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - txnMgr2.openTxn(ctx, "T2"); - checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2")); + swapTxnManager(txnMgr2); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));//start "delete from tab1" txn + long txnIdDelete = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); List locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); - //now start concurrent txn - txnMgr.openTxn(ctx, "T3"); + //now start concurrent "select * from tab1" txn + swapTxnManager(txnMgr); + txnMgr.setAutoCommit(false); checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); + long txnIdSelect = txnMgr.getCurrentTxnId(); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); @@ -1402,12 +1444,12 @@ public void testWriteSetTracking11() throws Exception { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks); - //this simulates the completion of txnid:2 + //this simulates the completion of "delete from tab1" txn AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); - txnMgr2.commitTxn();//txnid:2 + txnMgr2.commitTxn();//"delete from tab1" txn ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(4).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); @@ -1415,21 +1457,21 @@ public void testWriteSetTracking11() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks); - //completion of txnid:3 + //completion of txnid:txnIdSelect adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", Collections.singletonList("p=two")); adp.setOperationType(DataOperationType.DELETE); txnHandler.addDynamicPartitions(adp); - txnMgr.commitTxn();//txnid:3 + txnMgr.commitTxn();//"select * from tab1" txn Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete)); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect)); Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } @@ -1448,7 +1490,7 @@ public void testCompletedTxnComponents() throws Exception { 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); //only expect transactional components to be in COMPLETED_TXN_COMPONENTS Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6 and ctc_table='tab1'")); } /** @@ -1465,17 +1507,17 @@ public void testMultiInsert() throws Exception { cpr = driver.run("create table if not exists tab_not_acid (a int, b int, p string)"); checkCmdOnDriver(cpr); checkCmdOnDriver(driver.run("insert into tab_not_acid values(1,1,'one'),(2,2,'two')")); - checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:1 + checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:8 //writing both acid and non-acid resources in the same txn //tab1 write is a dynamic partition insert - checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:2 + checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:9 Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); //only expect transactional components to be in COMPLETED_TXN_COMPONENTS Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2")); + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9")); Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), - 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1'")); + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9 and ctc_table='tab1'")); } //todo: Concurrent insert/update of same partition - should pass @@ -1546,13 +1588,13 @@ private void testMerge3Way(boolean cc) throws Exception { "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)")); - long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " + "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); @@ -1562,13 +1604,14 @@ private void testMerge3Way(boolean cc) throws Exception { //start concurrent txn DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - long txnId2 = txnMgr2.openTxn(ctx, "T2"); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " + "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2 "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2 "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1 + long txnId2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false); - locks = getLocks(txnMgr2); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 10, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); @@ -1639,7 +1682,7 @@ private void testMerge3Way(boolean cc) throws Exception { //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); - locks = getLocks(txnMgr2); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source2", null, locks); @@ -1693,9 +1736,9 @@ private void testMerge3Way(boolean cc) throws Exception { } if(cc) { Assert.assertNotNull("didn't get exception", expectedException); - Assert.assertEquals("Transaction manager has aborted the transaction txnid:3. Reason: " + - "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=3 " + - "committed by [txnid:2,3] u/u", expectedException.getMessage()); + Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " + + "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " + + "committed by [txnid:10,11] u/u", expectedException.getMessage()); Assert.assertEquals( "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), @@ -1754,13 +1797,13 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)")); checkCmdOnDriver(driver.run("create table source (a int, b int)")); - long txnid1 = txnMgr.openTxn(ctx, "T1"); if(causeConflict) { checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1")); } else { checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)")); } + long txnid1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + @@ -1774,14 +1817,15 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { LockState.ACQUIRED, "default", "target", null, locks); DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " + "on t.a=s.a " + "when matched then delete " + "when not matched then insert values(s.a,s.b)")); + long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); - locks = getLocks(txnMgr); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); @@ -1801,7 +1845,7 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); - locks = getLocks(txnMgr); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks); @@ -1830,7 +1874,7 @@ private void testMergeUnpartitioned(boolean causeConflict) throws Exception { Assert.assertTrue("Didn't get exception", expectedException != null); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", - "Aborting [txnid:3,3] due to a write conflict on default/target committed by [txnid:2,3] d/u", + "Aborting [txnid:7,7] due to a write conflict on default/target committed by [txnid:6,7] d/u", expectedException.getCause().getMessage()); } else { @@ -1922,21 +1966,22 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)")); - long txnId1 = txnMgr.openTxn(ctx, "T1"); checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1")); + long txnId1 = txnMgr.getCurrentTxnId(); txnMgr.acquireLocks(driver.getPlan(), ctx, "T1"); - List locks = getLocks(txnMgr); + List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks); DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); //start a 2nd (overlapping) txn - long txnid2 = txnMgr2.openTxn(ctx, "T2"); checkCmdOnDriver(driver.compileAndRespond("merge into target using source " + "on target.p=source.p1 and target.a=source.a1 " + "when matched then update set b = 11 " + "when not matched then insert values(a1,b1,p1,q1)")); + long txnid2 = txnMgr2.getCurrentTxnId(); txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false); locks = getLocks(txnMgr); Assert.assertEquals("Unexpected lock count", 7, locks.size()); @@ -1982,7 +2027,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { //re-check locks which were in Waiting state - should now be Acquired ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId); - locks = getLocks(txnMgr); + locks = getLocks(); Assert.assertEquals("Unexpected lock count", 5, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks); @@ -2031,7 +2076,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { Assert.assertTrue("Didn't get exception", expectedException != null); Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); Assert.assertEquals("Exception msg didn't match", - "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=2 committed by [txnid:2,3] u/u", + "Aborting [txnid:7,7] due to a write conflict on default/target/p=1/q=2 committed by [txnid:6,7] u/u", expectedException.getCause().getMessage()); } else { @@ -2061,6 +2106,7 @@ public void testShowTablesLock() throws Exception { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t", null, locks); DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("show tables")); txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler"); locks = getLocks(); @@ -2068,17 +2114,17 @@ public void testShowTablesLock() throws Exception { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks); txnMgr.commitTxn(); - txnMgr2.releaseLocks(txnMgr2.getLockManager().getLocks(false, false)); + txnMgr2.rollbackTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size()); + swapTxnManager(txnMgr); cpr = driver.run( "create table if not exists T2 (a int, b int) partitioned by (p int) clustered by (a) " + "into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"); checkCmdOnDriver(cpr); - txnid1 = txnMgr.openTxn(ctx, "Fifer"); checkCmdOnDriver(driver.compileAndRespond("insert into T2 partition(p=1) values(1,3)")); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); locks = getLocks(); @@ -2086,6 +2132,7 @@ public void testShowTablesLock() throws Exception { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t2", "p=1", locks); txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); checkCmdOnDriver(driver.compileAndRespond("show tables")); ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fidler", false); locks = getLocks(); @@ -2093,7 +2140,7 @@ public void testShowTablesLock() throws Exception { checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t2", "p=1", locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks); txnMgr.commitTxn(); - txnMgr2.releaseLocks(txnMgr2.getLockManager().getLocks(false, false)); + txnMgr2.commitTxn(); Assert.assertEquals("Lock remained", 0, getLocks().size()); Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size()); }