diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index e70c92eef4..9fab7c5a49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -682,12 +682,19 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // Currently, we acquire a snapshot, we compile the query wrt that snapshot, // and then, we acquire locks. If snapshot is still valid, we continue as usual. // But if snapshot is not valid, we recompile the query. + if (driverContext.isOutdatedTxn()) { + driverContext.getTxnManager().rollbackTxn(); + + String userFromUGI = DriverUtils.getUserFromUGI(driverContext); + driverContext.getTxnManager().openTxn(context, userFromUGI, driverContext.getTxnType()); + lockAndRespond(); + } driverContext.setRetrial(true); driverContext.getBackupContext().addSubContext(context); driverContext.getBackupContext().setHiveLocks(context.getHiveLocks()); context = driverContext.getBackupContext(); driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, - driverContext.getTxnManager().getValidTxns().toString()); + driverContext.getTxnManager().getValidTxns().toString()); if (driverContext.getPlan().hasAcidResourcesInQuery()) { validTxnManager.recordValidWriteIds(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index a8c83fc504..0afa657ccb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -59,6 +59,7 @@ // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager txnManager; private TxnType txnType = TxnType.DEFAULT; + private boolean outdatedTxn; private StatsSource statsSource; // Boolean to store information about whether valid txn list was generated @@ -155,6 +156,14 @@ public TxnType getTxnType() { return txnType; } + public void setOutdatedTxn(boolean outdated) { + this.outdatedTxn = outdated; + } + + public boolean isOutdatedTxn() { + return outdatedTxn; + } + public void setTxnType(TxnType txnType) { this.txnType = txnType; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java index 7d49c57dda..e5f8ce005f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java @@ -32,9 +32,11 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.exec.Operator; @@ -91,11 +93,22 @@ boolean isValidTxnListState() throws LockException { return true; // Nothing to check } - String currentTxnString = driverContext.getTxnManager().getValidTxns().toString(); + GetOpenTxnsResponse openTxns = driverContext.getTxnManager().getOpenTxns(); + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(openTxns, 0); + long txnId = driverContext.getTxnManager().getCurrentTxnId(); + + String currentTxnString; + if (validTxnList.isTxnRangeValid(txnId + 1, openTxns.getTxn_high_water_mark()) != ValidTxnList.RangeResponse.NONE) { + // If here, there was another txn opened & committed between current SNAPSHOT generation and locking. + validTxnList.removeException(txnId); + currentTxnString = validTxnList.toString(); + } else { + currentTxnString = TxnCommonUtils.createValidReadTxnList(openTxns, txnId).toString(); + } + if (currentTxnString.equals(txnString)) { return true; // Still valid, nothing more to do } - return checkWriteIds(currentTxnString, nonSharedLockedTables, txnWriteIdListString); } @@ -142,9 +155,17 @@ private boolean checkWriteIds(String currentTxnString, Set nonSharedLock if (nonSharedLockedTables.contains(fullQNameForLock)) { // Check if table is transactional if (AcidUtils.isTransactionalTable(tableInfo.getValue())) { + ValidWriteIdList writeIdList = txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()); + ValidWriteIdList currentWriteIdList = currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()); + // Check if there was a conflicting write between current SNAPSHOT generation and locking. + // If yes, mark current transaction as outdated. + if (currentWriteIdList.isWriteIdRangeValid(writeIdList.getHighWatermark() + 1, + currentWriteIdList.getHighWatermark()) != ValidWriteIdList.RangeResponse.NONE) { + driverContext.setOutdatedTxn(true); + return false; + } // Check that write id is still valid - if (!TxnIdUtils.checkEquivalentWriteIds(txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()), - currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) { + if (!TxnIdUtils.checkEquivalentWriteIds(writeIdList, currentWriteIdList)) { // Write id has changed, it is not valid anymore, we need to recompile return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 71afcbdc68..6b163d6c95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -39,6 +39,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.ql.Context; @@ -751,6 +752,15 @@ private void stopHeartbeat() { } } + @Override + public GetOpenTxnsResponse getOpenTxns() throws LockException { + try { + return getMS().getOpenTxns(); + } catch (TException e) { + throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); + } + } + @Override public ValidTxnList getValidTxns() throws LockException { assert isTxnOpen(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 0383881acc..29266db126 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -20,13 +20,13 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.TxnType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; @@ -265,6 +265,11 @@ public void heartbeat() throws LockException { // No-op } + @Override + public GetOpenTxnsResponse getOpenTxns() throws LockException { + return new GetOpenTxnsResponse(); + } + @Override public ValidTxnList getValidTxns() throws LockException { return new ValidReadTxnList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 600289f837..5c75e63cb7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.metastore.api.TxnType; @@ -172,6 +173,8 @@ void replTableWriteIdState(String validWriteIdList, String dbName, String tableN */ void heartbeat() throws LockException; + GetOpenTxnsResponse getOpenTxns() throws LockException; + /** * Get the transactions that are currently valid. The resulting * {@link ValidTxnList} object can be passed as string to the processing diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 8a15b7cc5d..e7e6215b8b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2226,6 +2226,60 @@ private void testMergeInsertLocking(boolean sharedWrite) throws Exception { LockState.WAITING, "default", "target", null, locks); } + @Test + public void testInsertMergeInsertConcurrentSnapshotInvalidateNoDuplicates() throws Exception { + testConcurrentMergeInsertSnapshotInvalidate("insert into target values (5, 6)", false); + } + @Test + public void testInsertMergeInsertConcurrentSharedWriteSnapshotInvalidateNoDuplicates() throws Exception { + testConcurrentMergeInsertSnapshotInvalidate("insert into target values (5, 6)", true); + } + @Test + public void test2MergeInsertsConcurrentSnapshotInvalidateNoDuplicates() throws Exception { + testConcurrentMergeInsertSnapshotInvalidate("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)", false); + } + @Test + public void test2MergeInsertsConcurrentSharedWriteSnapshotInvalidateNoDuplicates() throws Exception { + testConcurrentMergeInsertSnapshotInvalidate("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)", true); + } + @Test + public void testMergeInsertNoSnapshotInvalidateNoDuplicates() throws Exception { + testConcurrentMergeInsertSnapshotInvalidate("insert into source values (3, 4)", false); + } + + private void testConcurrentMergeInsertSnapshotInvalidate(String query, boolean sharedWrite) throws Exception { + dropTable(new String[]{"target", "source"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); + + driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("insert into target values (1,2), (3,4)"); + driver.run("create table source (a int, b int)"); + driver.run("insert into source values (5,6), (7,8)"); + + driver.compileAndRespond("merge into target t using source s on t.a = s.a " + + "when not matched then insert values (s.a, s.b)"); + + DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgr2); + driver2.run(query); + driver2.run("select * from target"); + + swapTxnManager(txnMgr); + try { + driver.run(); + } catch (Exception ex ){ + Assert.assertTrue(ex.getCause().getMessage().contains("due to a write conflict")); + } + + swapTxnManager(txnMgr2); + driver2.run("select * from target"); + List res = new ArrayList(); + driver2.getFetchTask().fetch(res); + Assert.assertEquals("Duplicate records found", 4, res.size()); + } + @Test public void test2MergeInsertsConcurrentNoDuplicates() throws Exception { testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " + diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 65df9c2ba9..7a5304d120 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -3113,6 +3113,11 @@ public boolean removeMasterKey(Integer keySeq) throws TException { return keyList.toArray(new String[keyList.size()]); } + @Override + public GetOpenTxnsResponse getOpenTxns() throws TException { + return client.get_open_txns(); + } + @Override public ValidTxnList getValidTxns() throws TException { return TxnCommonUtils.createValidReadTxnList(client.get_open_txns(), 0); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 887d4303f4..6074c09569 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -2858,6 +2858,8 @@ Function getFunction(String catName, String dbName, String funcName) */ GetAllFunctionsResponse getAllFunctions() throws MetaException, TException; + GetOpenTxnsResponse getOpenTxns() throws TException ; + /** * Get a structure that details valid transactions. * @return list of valid transactions diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index 312936efa8..951c673c97 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -2268,6 +2268,11 @@ public boolean removeMasterKey(Integer keySeq) throws TException { return keyList.toArray(new String[keyList.size()]); } + @Override + public GetOpenTxnsResponse getOpenTxns() throws TException { + return client.get_open_txns(); + } + @Override public ValidTxnList getValidTxns() throws TException { return TxnCommonUtils.createValidReadTxnList(client.get_open_txns(), 0); diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index b8ff03f9c4..0d730b2208 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.common; +import org.apache.commons.lang.ArrayUtils; + import java.util.Arrays; import java.util.BitSet; @@ -54,6 +56,11 @@ public ValidReadTxnList(String value) { readFromString(value); } + @Override + public void removeException(long txnId) { + exceptions = ArrayUtils.remove(exceptions, Arrays.binarySearch(exceptions, txnId)); + } + @Override public boolean isTxnValid(long txnid) { if (txnid > highWatermark) { diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index d4c3b09730..65ce8b4efa 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -31,6 +31,8 @@ */ public static final String VALID_TXNS_KEY = "hive.txn.valid.txns"; + void removeException(long txnId); + /** * The response to a range query. NONE means no values in this range match, * SOME mean that some do, and ALL means that every value does.