diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cbb3a72..039e756 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1681,6 +1681,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + + "current open transactions reach this limit, future open transaction requests will be \n" + + "rejected, until this number goes below the limit."), + HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s", + new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open transactions."), + HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + "This controls how many transactions streaming agents such as Flume or Storm open\n" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index bc818e0..917edc0 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.HouseKeeperService; import org.apache.hadoop.hive.metastore.Warehouse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,6 +164,15 @@ public static OpertaionType fromLockType(LockType lockType) { } } + // Maximum number of open transactions that's allowed + private static volatile int maxOpenTxns = 0; + // Current number of open txns + private static volatile long numOpenTxns = 0; + // Whether number of open transactions reaches the threshold + private static volatile boolean tooManyOpenTxns = false; + // The AcidHouseKeeperService for counting open transactions + private static volatile HouseKeeperService openTxnsCounter = null; + /** * Number of consecutive deadlocks we have seen */ @@ -232,6 +242,7 @@ public void setConf(HiveConf conf) { TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; + maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS); } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @@ -358,7 +369,45 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { return getOpenTxns(); } } + + private static void startHouseKeeperService(HiveConf conf, Class c){ + try { + openTxnsCounter = (HouseKeeperService)c.newInstance(); + openTxnsCounter.start(conf); + } catch (Exception ex) { + LOG.error("Failed to start {}" , openTxnsCounter.getClass() + + ". The system will not handle {} " , openTxnsCounter.getServiceDescription(), + ". Root Cause: ", ex); + } + } + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + if (openTxnsCounter == null) { + synchronized (TxnHandler.class) { + try { + if (openTxnsCounter == null) { + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService")); + } + } catch (ClassNotFoundException e) { + throw new MetaException(e.getMessage()); + } + } + } + + if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) { + tooManyOpenTxns = true; + } + if (tooManyOpenTxns) { + if (numOpenTxns < maxOpenTxns * 0.9) { + tooManyOpenTxns = false; + } else { + LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " + + "reached. Current number of open transactions: " + numOpenTxns); + throw new MetaException("Maximum allowed number of open transactions has been reached. " + + "See hive.max.open.txns."); + } + } + int numTxns = rqst.getNum_txns(); try { Connection dbConn = null; @@ -2832,6 +2881,36 @@ public void performTimeOuts() { } } + public void countOpenTxns() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.error("Transaction database not properly configured, " + + "can't find txn_state from TXNS."); + } else { + numOpenTxns = rs.getLong(1); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + LOG.info("Failed to update number of open transactions"); + checkRetryable(dbConn, e, "countOpenTxns()"); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + countOpenTxns(); + } + } + private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { if (connPool != null) return; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 12be862..5b56aaf 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -77,6 +77,12 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException; /** + * Get the count for open transactions. + * @throws MetaException + */ + public void countOpenTxns() throws MetaException; + + /** * Open a set of transactions * @param rqst request to open transactions * @return information on opened transactions diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java deleted file mode 100644 index 80e3cd6..0000000 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ /dev/null @@ -1,455 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; -import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; -import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; - -/** - * Tests for TxnHandler. - */ -public class TestCompactionTxnHandler { - - private HiveConf conf = new HiveConf(); - private TxnStore txnHandler; - - public TestCompactionTxnHandler() throws Exception { - TxnDbUtil.setConfValues(conf); - tearDown(); - } - - @Test - public void testFindNextToCompact() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - long now = System.currentTimeMillis(); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - assertEquals("ds=today", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - assertNull(ci.runAs); - assertNull(txnHandler.findNextToCompact("fred")); - - txnHandler.setRunAs(ci.id, "bob"); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("working", c.getState()); - assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now); - assertEquals("fred", c.getWorkerid()); - assertEquals("bob", c.getRunAs()); - } - - @Test - public void testFindNextToCompact2() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - - rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=yesterday"); - txnHandler.compact(rqst); - - long now = System.currentTimeMillis(); - boolean expectToday = false; - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if ("ds=today".equals(ci.partName)) expectToday = false; - else if ("ds=yesterday".equals(ci.partName)) expectToday = true; - else fail("partition name should have been today or yesterday but was " + ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - assertEquals("foo", ci.dbname); - assertEquals("bar", ci.tableName); - if (expectToday) assertEquals("ds=today", ci.partName); - else assertEquals("ds=yesterday", ci.partName); - assertEquals(CompactionType.MINOR, ci.type); - - assertNull(txnHandler.findNextToCompact("fred")); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(2, compacts.size()); - for (ShowCompactResponseElement e : compacts) { - assertEquals("working", e.getState()); - assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now); - assertEquals("fred", e.getWorkerid()); - } - } - - @Test - public void testFindNextToCompactNothingToCompact() throws Exception { - assertNull(txnHandler.findNextToCompact("fred")); - } - - @Test - public void testMarkCompacted() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("ready for cleaning", c.getState()); - assertNull(c.getWorkerid()); - } - - @Test - public void testFindNextToClean() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean().size()); - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - List toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact("fred")); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("ready for cleaning", c.getState()); - assertNull(c.getWorkerid()); - } - - @Test - public void testMarkCleaned() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - CompactionInfo ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - - assertEquals(0, txnHandler.findReadyToClean().size()); - txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact("fred")); - - List toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact("fred")); - txnHandler.markCleaned(ci); - assertNull(txnHandler.findNextToCompact("fred")); - assertEquals(0, txnHandler.findReadyToClean().size()); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - assertEquals(1, rsp.getCompactsSize()); - assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); - } - - @Test - public void testRevokeFromLocalWorkers() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR); - txnHandler.compact(rqst); - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - assertNotNull(txnHandler.findNextToCompact("bob-193892")); - assertNotNull(txnHandler.findNextToCompact("fred-193893")); - txnHandler.revokeFromLocalWorkers("fred"); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(3, compacts.size()); - boolean sawWorkingBob = false; - int initiatedCount = 0; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) { - assertEquals("bob-193892", c.getWorkerid()); - sawWorkingBob = true; - } else if (c.getState().equals("initiated")) { - initiatedCount++; - } else { - fail("Unexpected state"); - } - } - assertTrue(sawWorkingBob); - assertEquals(2, initiatedCount); - } - - @Test - public void testRevokeTimedOutWorkers() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); - txnHandler.compact(rqst); - - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - Thread.sleep(200); - assertNotNull(txnHandler.findNextToCompact("fred-193892")); - txnHandler.revokeTimedoutWorkers(100); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(2, compacts.size()); - boolean sawWorking = false, sawInitiated = false; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) sawWorking = true; - else if (c.getState().equals("initiated")) sawInitiated = true; - else fail("Unexpected state"); - } - assertTrue(sawWorking); - assertTrue(sawInitiated); - } - - @Test - public void testFindPotentialCompactions() throws Exception { - // Test that committing unlocks - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - List components = new ArrayList(1); - components.add(comp); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("yourtable"); - comp.setPartitionname("mypartition"); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.commitTxn(new CommitTxnRequest(txnid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - - Set potentials = txnHandler.findPotentialCompactions(100); - assertEquals(2, potentials.size()); - boolean sawMyTable = false, sawYourTable = false; - for (CompactionInfo ci : potentials) { - sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") && - ci.partName == null); - sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") && - ci.partName.equals("mypartition")); - } - assertTrue(sawMyTable); - assertTrue(sawYourTable); - } - - // TODO test changes to mark cleaned to clean txns and txn_components - - @Test - public void testMarkCleanedCleansTxnsAndTxnComponents() - throws Exception { - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("yourtable"); - components = new ArrayList(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("foo"); - comp.setPartitionname("bar"); - components = new ArrayList(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("foo"); - comp.setPartitionname("baz"); - components = new ArrayList(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - - CompactionInfo ci = new CompactionInfo(); - - // Now clean them and check that they are removed from the count. - CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - txnHandler.markCompacted(ci); - - List toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - txnHandler.markCleaned(ci); - - // Check that we are cleaning up the empty aborted transactions - GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); - assertEquals(3, txnList.getOpen_txnsSize()); - txnHandler.cleanEmptyAbortedTxns(); - txnList = txnHandler.getOpenTxns(); - assertEquals(2, txnList.getOpen_txnsSize()); - - rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); - rqst.setPartitionname("bar"); - txnHandler.compact(rqst); - assertEquals(0, txnHandler.findReadyToClean().size()); - ci = txnHandler.findNextToCompact("fred"); - assertNotNull(ci); - txnHandler.markCompacted(ci); - - toClean = txnHandler.findReadyToClean(); - assertEquals(1, toClean.size()); - txnHandler.markCleaned(ci); - - txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - txnHandler.cleanEmptyAbortedTxns(); - txnList = txnHandler.getOpenTxns(); - assertEquals(3, txnList.getOpen_txnsSize()); - } - - @Test - public void addDynamicPartitions() throws Exception { - String dbName = "default"; - String tableName = "adp_table"; - OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); - long txnId = openTxns.getTxn_ids().get(0); - // lock a table, as in dynamic partitions - LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName); - lc.setTablename(tableName); - LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost"); - lr.setTxnid(txnId); - LockResponse lock = txnHandler.lock(lr); - assertEquals(LockState.ACQUIRED, lock.getState()); - - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName, - Arrays.asList("ds=yesterday", "ds=today"))); - txnHandler.commitTxn(new CommitTxnRequest(txnId)); - - Set potentials = txnHandler.findPotentialCompactions(1000); - assertEquals(2, potentials.size()); - SortedSet sorted = new TreeSet(potentials); - - int i = 0; - for (CompactionInfo ci : sorted) { - assertEquals(dbName, ci.dbname); - assertEquals(tableName, ci.tableName); - switch (i++) { - case 0: assertEquals("ds=today", ci.partName); break; - case 1: assertEquals("ds=yesterday", ci.partName); break; - default: throw new RuntimeException("What?"); - } - } - } - - @Before - public void setUp() throws Exception { - TxnDbUtil.prepDb(); - txnHandler = TxnUtils.getTxnStore(conf); - } - - @After - public void tearDown() throws Exception { - TxnDbUtil.cleanDb(); - } - - private long openTxn() throws MetaException { - List txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); - return txns.get(0); - } - -} diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java deleted file mode 100644 index 1a118a9..0000000 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ /dev/null @@ -1,1399 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; -import org.apache.hadoop.hive.metastore.api.CheckLockRequest; -import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; -import org.apache.hadoop.hive.metastore.api.LockComponent; -import org.apache.hadoop.hive.metastore.api.LockLevel; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchLockException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.hadoop.hive.metastore.api.TxnInfo; -import org.apache.hadoop.hive.metastore.api.TxnOpenException; -import org.apache.hadoop.hive.metastore.api.TxnState; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; -import org.apache.hadoop.util.StringUtils; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.Configuration; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.assertNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; - -/** - * Tests for TxnHandler. - */ -public class TestTxnHandler { - static final private String CLASS_NAME = TxnHandler.class.getName(); - private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - - private HiveConf conf = new HiveConf(); - private TxnStore txnHandler; - - public TestTxnHandler() throws Exception { - TxnDbUtil.setConfValues(conf); - LoggerContext ctx = (LoggerContext) LogManager.getContext(false); - Configuration conf = ctx.getConfiguration(); - conf.getLoggerConfig(CLASS_NAME).setLevel(Level.DEBUG); - ctx.updateLoggers(conf); - tearDown(); - } - - @Test - public void testValidTxnsEmpty() throws Exception { - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(0L, txnsInfo.getTxn_high_water_mark()); - assertTrue(txnsInfo.getOpen_txns().isEmpty()); - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(0L, txns.getTxn_high_water_mark()); - assertTrue(txns.getOpen_txns().isEmpty()); - } - - @Test - public void testOpenTxn() throws Exception { - long first = openTxn(); - assertEquals(1L, first); - long second = openTxn(); - assertEquals(2L, second); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(2L, txnsInfo.getTxn_high_water_mark()); - assertEquals(2, txnsInfo.getOpen_txns().size()); - assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState()); - assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); - assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser()); - assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname()); - - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(2L, txns.getTxn_high_water_mark()); - assertEquals(2, txns.getOpen_txns().size()); - boolean[] saw = new boolean[3]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (Long tid : txns.getOpen_txns()) { - saw[tid.intValue()] = true; - } - for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); - } - - @Test - public void testAbortTxn() throws Exception { - OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost")); - List txnList = openedTxns.getTxn_ids(); - long first = txnList.get(0); - assertEquals(1L, first); - long second = txnList.get(1); - assertEquals(2L, second); - txnHandler.abortTxn(new AbortTxnRequest(1)); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(2L, txnsInfo.getTxn_high_water_mark()); - assertEquals(2, txnsInfo.getOpen_txns().size()); - assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); - assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState()); - assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); - - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(2L, txns.getTxn_high_water_mark()); - assertEquals(2, txns.getOpen_txns().size()); - boolean[] saw = new boolean[3]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (Long tid : txns.getOpen_txns()) { - saw[tid.intValue()] = true; - } - for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); - } - - @Test - public void testAbortInvalidTxn() throws Exception { - boolean caught = false; - try { - txnHandler.abortTxn(new AbortTxnRequest(195L)); - } catch (NoSuchTxnException e) { - caught = true; - } - assertTrue(caught); - } - - @Test - public void testValidTxnsNoneOpen() throws Exception { - txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost")); - txnHandler.commitTxn(new CommitTxnRequest(1)); - txnHandler.commitTxn(new CommitTxnRequest(2)); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(2L, txnsInfo.getTxn_high_water_mark()); - assertEquals(0, txnsInfo.getOpen_txns().size()); - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(2L, txns.getTxn_high_water_mark()); - assertEquals(0, txns.getOpen_txns().size()); - } - - @Test - public void testValidTxnsSomeOpen() throws Exception { - txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); - txnHandler.abortTxn(new AbortTxnRequest(1)); - txnHandler.commitTxn(new CommitTxnRequest(2)); - GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(3L, txnsInfo.getTxn_high_water_mark()); - assertEquals(2, txnsInfo.getOpen_txns().size()); - assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); - assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState()); - assertEquals(3L, txnsInfo.getOpen_txns().get(1).getId()); - assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); - - GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(3L, txns.getTxn_high_water_mark()); - assertEquals(2, txns.getOpen_txns().size()); - boolean[] saw = new boolean[4]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (Long tid : txns.getOpen_txns()) { - saw[tid.intValue()] = true; - } - assertTrue(saw[1]); - assertFalse(saw[2]); - assertTrue(saw[3]); - } - - @Test - public void testLockDifferentDBs() throws Exception { - // Test that two different databases don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSameDB() throws Exception { - // Test that two different databases don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockDbLocksTable() throws Exception { - // Test that locking a database prevents locking of tables in the database - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockDbDoesNotLockTableInDifferentDB() throws Exception { - // Test that locking a database prevents locking of tables in the database - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); - comp.setTablename("mytable"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockDifferentTables() throws Exception { - // Test that two different tables don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("yourtable"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSameTable() throws Exception { - // Test that two different tables don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockTableLocksPartition() throws Exception { - // Test that locking a table prevents locking of partitions of the table - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockDifferentTableDoesntLockPartition() throws Exception { - // Test that locking a table prevents locking of partitions of the table - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("yourtable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockDifferentPartitions() throws Exception { - // Test that two different partitions don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("yourpartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSamePartition() throws Exception { - // Test that two different partitions don't collide on their locks - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSRSR() throws Exception { - // Test that two shared read locks can share a partition - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockESRSR() throws Exception { - // Test that exclusive lock blocks shared reads - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSRSW() throws Exception { - // Test that write can acquire after read - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockESRSW() throws Exception { - // Test that exclusive lock blocks read and write - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSRE() throws Exception { - // Test that read blocks exclusive - LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockESRE() throws Exception { - // Test that exclusive blocks read and exclusive - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockSWSR() throws Exception { - // Test that read can acquire after write - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSWSWSR() throws Exception { - // Test that write blocks write but read can still acquire - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testLockSWSWSW() throws Exception { - // Test that write blocks two writes - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockEESW() throws Exception { - // Test that exclusive blocks exclusive and write - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testLockEESR() throws Exception { - // Test that exclusive blocks exclusive and read - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.WAITING); - } - - @Test - public void testCheckLockAcquireAfterWaiting() throws Exception { - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - long txnId = openTxn(); - req.setTxnid(txnId); - LockResponse res = txnHandler.lock(req); - long lockid1 = res.getLockid(); - assertTrue(res.getState() == LockState.ACQUIRED); - - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(openTxn()); - res = txnHandler.lock(req); - long lockid2 = res.getLockid(); - assertTrue(res.getState() == LockState.WAITING); - - txnHandler.abortTxn(new AbortTxnRequest(txnId)); - res = txnHandler.checkLock(new CheckLockRequest(lockid2)); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testCheckLockNoSuchLock() throws Exception { - try { - txnHandler.checkLock(new CheckLockRequest(23L)); - fail("Allowed to check lock on non-existent lock"); - } catch (NoSuchLockException e) { - } - } - - @Test - public void testCheckLockTxnAborted() throws Exception { - // Test that when a transaction is aborted, the heartbeat fails - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - long lockid = res.getLockid(); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - try { - // This will throw NoSuchLockException (even though it's the - // transaction we've closed) because that will have deleted the lock. - txnHandler.checkLock(new CheckLockRequest(lockid)); - fail("Allowed to check lock on aborted transaction."); - } catch (NoSuchLockException e) { - } - } - - @Test - public void testMultipleLock() throws Exception { - // Test more than one lock can be handled in a lock request - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(2); - components.add(comp); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("anotherpartition"); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - long lockid = res.getLockid(); - assertTrue(res.getState() == LockState.ACQUIRED); - res = txnHandler.checkLock(new CheckLockRequest(lockid)); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.unlock(new UnlockRequest(lockid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - } - - @Test - public void testMultipleLockWait() throws Exception { - // Test that two shared read locks can share a partition - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(2); - components.add(comp); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("anotherpartition"); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - long lockid1 = res.getLockid(); - assertTrue(res.getState() == LockState.ACQUIRED); - - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components = new ArrayList(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - long lockid2 = res.getLockid(); - assertTrue(res.getState() == LockState.WAITING); - - txnHandler.unlock(new UnlockRequest(lockid1)); - - res = txnHandler.checkLock(new CheckLockRequest(lockid2)); - assertTrue(res.getState() == LockState.ACQUIRED); - } - - @Test - public void testUnlockOnCommit() throws Exception { - // Test that committing unlocks - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.commitTxn(new CommitTxnRequest(txnid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - } - - @Test - public void testUnlockOnAbort() throws Exception { - // Test that committing unlocks - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.abortTxn(new AbortTxnRequest(txnid)); - assertEquals(0, txnHandler.numLocksInLockTable()); - } - - @Test - public void testUnlockWithTxn() throws Exception { - LOG.debug("Starting testUnlockWithTxn"); - // Test that attempting to unlock locks associated with a transaction - // generates an error - long txnid = openTxn(); - LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - LockResponse res = txnHandler.lock(req); - long lockid = res.getLockid(); - try { - txnHandler.unlock(new UnlockRequest(lockid)); - fail("Allowed to unlock lock associated with transaction."); - } catch (TxnOpenException e) { - } - } - - @Test - public void testHeartbeatTxnAborted() throws Exception { - // Test that when a transaction is aborted, the heartbeat fails - openTxn(); - txnHandler.abortTxn(new AbortTxnRequest(1)); - HeartbeatRequest h = new HeartbeatRequest(); - h.setTxnid(1); - try { - txnHandler.heartbeat(h); - fail("Told there was a txn, when it should have been aborted."); - } catch (TxnAbortedException e) { - } - } - - @Test - public void testHeartbeatNoTxn() throws Exception { - // Test that when a transaction is aborted, the heartbeat fails - HeartbeatRequest h = new HeartbeatRequest(); - h.setTxnid(939393L); - try { - txnHandler.heartbeat(h); - fail("Told there was a txn, when there wasn't."); - } catch (NoSuchTxnException e) { - } - } - - @Test - public void testHeartbeatLock() throws Exception { - conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); - HeartbeatRequest h = new HeartbeatRequest(); - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - h.setLockid(res.getLockid()); - for (int i = 0; i < 30; i++) { - try { - txnHandler.heartbeat(h); - } catch (NoSuchLockException e) { - fail("Told there was no lock, when the heartbeat should have kept it."); - } - } - } - - @Test - public void heartbeatTxnRange() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnid = openTxn(); - txnid = openTxn(); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(0, rsp.getAborted().size()); - assertEquals(0, rsp.getNosuch().size()); - } - - @Test - public void heartbeatTxnRangeOneCommitted() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnHandler.commitTxn(new CommitTxnRequest(1)); - txnid = openTxn(); - txnid = openTxn(); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(1, rsp.getNosuchSize()); - Long txn = rsp.getNosuch().iterator().next(); - assertEquals(1L, (long)txn); - assertEquals(0, rsp.getAborted().size()); - } - - @Test - public void heartbeatTxnRangeOneAborted() throws Exception { - long txnid = openTxn(); - assertEquals(1, txnid); - txnid = openTxn(); - txnid = openTxn(); - txnHandler.abortTxn(new AbortTxnRequest(3)); - HeartbeatTxnRangeResponse rsp = - txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); - assertEquals(1, rsp.getAbortedSize()); - Long txn = rsp.getAborted().iterator().next(); - assertEquals(3L, (long)txn); - assertEquals(0, rsp.getNosuch().size()); - } - - @Test - public void testLockTimeout() throws Exception { - long timeout = txnHandler.setTimeout(1); - try { - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - Thread.sleep(10); - txnHandler.performTimeOuts(); - txnHandler.checkLock(new CheckLockRequest(res.getLockid())); - fail("Told there was a lock, when it should have timed out."); - } catch (NoSuchLockException e) { - } finally { - txnHandler.setTimeout(timeout); - } - } - - @Test - public void testRecoverManyTimeouts() throws Exception { - long timeout = txnHandler.setTimeout(1); - try { - txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost")); - Thread.sleep(10); - txnHandler.performTimeOuts(); - GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo(); - int numAborted = 0; - for (TxnInfo txnInfo : rsp.getOpen_txns()) { - assertEquals(TxnState.ABORTED, txnInfo.getState()); - numAborted++; - } - assertEquals(503, numAborted); - } finally { - txnHandler.setTimeout(timeout); - } - - - } - - @Test - public void testHeartbeatNoLock() throws Exception { - HeartbeatRequest h = new HeartbeatRequest(); - h.setLockid(29389839L); - try { - txnHandler.heartbeat(h); - fail("Told there was a lock, when there wasn't."); - } catch (NoSuchLockException e) { - } - } - - @Test - public void testCompactMajorWithPartition() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MAJOR); - rqst.setPartitionname("ds=today"); - txnHandler.compact(rqst); - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertEquals("ds=today", c.getPartitionname()); - assertEquals(CompactionType.MAJOR, c.getType()); - assertEquals("initiated", c.getState()); - assertEquals(0L, c.getStart()); - } - - @Test - public void testCompactMinorNoPartition() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - rqst.setRunas("fred"); - txnHandler.compact(rqst); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(1, compacts.size()); - ShowCompactResponseElement c = compacts.get(0); - assertEquals("foo", c.getDbname()); - assertEquals("bar", c.getTablename()); - assertNull(c.getPartitionname()); - assertEquals(CompactionType.MINOR, c.getType()); - assertEquals("initiated", c.getState()); - assertEquals(0L, c.getStart()); - assertEquals("fred", c.getRunAs()); - } - - @Test - public void showLocks() throws Exception { - long begining = System.currentTimeMillis(); - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - - // Open txn - long txnid = openTxn(); - comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb"); - comp.setTablename("mytable"); - components = new ArrayList(1); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); - res = txnHandler.lock(req); - - // Locks not associated with a txn - components = new ArrayList(1); - comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb"); - comp.setTablename("yourtable"); - comp.setPartitionname("yourpartition"); - components.add(comp); - req = new LockRequest(components, "you", "remotehost"); - res = txnHandler.lock(req); - - ShowLocksResponse rsp = txnHandler.showLocks(new ShowLocksRequest()); - List locks = rsp.getLocks(); - assertEquals(3, locks.size()); - boolean[] saw = new boolean[locks.size()]; - for (int i = 0; i < saw.length; i++) saw[i] = false; - for (ShowLocksResponseElement lock : locks) { - if (lock.getLockid() == 1) { - assertEquals(0, lock.getTxnid()); - assertEquals("mydb", lock.getDbname()); - assertNull(lock.getTablename()); - assertNull(lock.getPartname()); - assertEquals(LockState.ACQUIRED, lock.getState()); - assertEquals(LockType.EXCLUSIVE, lock.getType()); - assertTrue(lock.toString(), 0 != lock.getLastheartbeat()); - assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining - + " and " + System.currentTimeMillis(), - begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat()); - assertEquals("me", lock.getUser()); - assertEquals("localhost", lock.getHostname()); - saw[0] = true; - } else if (lock.getLockid() == 2) { - assertEquals(1, lock.getTxnid()); - assertEquals("mydb", lock.getDbname()); - assertEquals("mytable", lock.getTablename()); - assertNull(lock.getPartname()); - assertEquals(LockState.WAITING, lock.getState()); - assertEquals(LockType.SHARED_READ, lock.getType()); - assertTrue(lock.toString(), 0 == lock.getLastheartbeat() && - lock.getTxnid() != 0); - assertEquals(0, lock.getAcquiredat()); - assertEquals("me", lock.getUser()); - assertEquals("localhost", lock.getHostname()); - saw[1] = true; - } else if (lock.getLockid() == 3) { - assertEquals(0, lock.getTxnid()); - assertEquals("yourdb", lock.getDbname()); - assertEquals("yourtable", lock.getTablename()); - assertEquals("yourpartition", lock.getPartname()); - assertEquals(LockState.ACQUIRED, lock.getState()); - assertEquals(LockType.SHARED_READ, lock.getType()); - assertTrue(lock.toString(), begining <= lock.getLastheartbeat() && - System.currentTimeMillis() >= lock.getLastheartbeat()); - assertTrue(begining <= lock.getAcquiredat() && - System.currentTimeMillis() >= lock.getAcquiredat()); - assertEquals("you", lock.getUser()); - assertEquals("remotehost", lock.getHostname()); - saw[2] = true; - } else { - fail("Unknown lock id"); - } - } - for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]); - } - - @Test - @Ignore("Wedges Derby") - public void deadlockDetected() throws Exception { - LOG.debug("Starting deadlock test"); - if (txnHandler instanceof TxnHandler) { - final TxnHandler tHndlr = (TxnHandler)txnHandler; - Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - Statement stmt = conn.createStatement(); - long now = tHndlr.getDbTime(conn); - stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + - "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + - "'scooby.com')"); - stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + - "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + - "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + - tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + - "'scooby.com')"); - conn.commit(); - tHndlr.closeDbConn(conn); - - final AtomicBoolean sawDeadlock = new AtomicBoolean(); - - final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - try { - - for (int i = 0; i < 5; i++) { - Thread t1 = new Thread() { - @Override - public void run() { - try { - try { - updateTxns(conn1); - updateLocks(conn1); - Thread.sleep(1000); - conn1.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { - try { - tHndlr.checkRetryable(conn1, e, "thread t1"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); - } - } - conn1.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - Thread t2 = new Thread() { - @Override - public void run() { - try { - try { - updateLocks(conn2); - updateTxns(conn2); - Thread.sleep(1000); - conn2.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { - try { - tHndlr.checkRetryable(conn2, e, "thread t2"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); - } - } - conn2.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - t1.start(); - t2.start(); - t1.join(); - t2.join(); - if (sawDeadlock.get()) break; - } - assertTrue(sawDeadlock.get()); - } finally { - conn1.rollback(); - tHndlr.closeDbConn(conn1); - conn2.rollback(); - tHndlr.closeDbConn(conn2); - } - } - } - - /** - * This cannnot be run against Derby (thus in UT) but it can run againt MySQL. - * 1. add to metastore/pom.xml - * - * mysql - * mysql-connector-java - * 5.1.30 - * - * 2. Hack in the c'tor of this class - * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore"); - * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive"); - * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive"); - * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); - * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack() - * - */ - @Ignore("multiple threads wedge Derby") - @Test - public void testMutexAPI() throws Exception { - final TxnStore.MutexAPI api = txnHandler.getMutexAPI(); - final AtomicInteger stepTracker = new AtomicInteger(0); - /** - * counter = 0; - * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock - * Thread2 counter=2, lock (and block), inc counter, should be 4 - */ - Thread t1 = new Thread("MutexTest1") { - public void run() { - try { - stepTracker.incrementAndGet();//now 1 - TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); - Thread.sleep(4000); - //stepTracker should now be 2 which indicates t2 has started - Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get()); - stepTracker.incrementAndGet();//now 3 - handle.releaseLocks(); - } - catch(Exception ex) { - throw new RuntimeException(ex.getMessage(), ex); - } - } - }; - t1.setDaemon(true); - ErrorHandle ueh1 = new ErrorHandle(); - t1.setUncaughtExceptionHandler(ueh1); - Thread t2 = new Thread("MutexTest2") { - public void run() { - try { - stepTracker.incrementAndGet();//now 2 - //this should block until t1 unlocks - TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); - stepTracker.incrementAndGet();//now 4 - Assert.assertEquals(4, stepTracker.get()); - handle.releaseLocks(); - stepTracker.incrementAndGet();//now 5 - } - catch(Exception ex) { - throw new RuntimeException(ex.getMessage(), ex); - } - } - }; - t2.setDaemon(true); - ErrorHandle ueh2 = new ErrorHandle(); - t2.setUncaughtExceptionHandler(ueh2); - t1.start(); - try { - Thread.sleep(1000); - } - catch(InterruptedException ex) { - LOG.info("Sleep was interrupted"); - } - t2.start(); - t1.join(6000);//so that test doesn't block - t2.join(6000); - - if(ueh1.error != null) { - Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false); - } - if (ueh2.error != null) { - Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false); - } - Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get()); - } - private final static class ErrorHandle implements Thread.UncaughtExceptionHandler { - Throwable error = null; - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage()); - error = e; - } - } - - @Test - public void testRetryableRegex() throws Exception { - SQLException sqlException = new SQLException("ORA-08177: can't serialize access for this transaction", "72000"); - // Note that we have 3 regex'es below - conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, "^Deadlock detected, roll back,.*08177.*,.*08178.*"); - boolean result = TxnHandler.isRetryable(conf, sqlException); - Assert.assertTrue("regex should be retryable", result); - - sqlException = new SQLException("This error message, has comma in it"); - conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, ".*comma.*"); - result = TxnHandler.isRetryable(conf, sqlException); - Assert.assertTrue("regex should be retryable", result); - } - - private void updateTxns(Connection conn) throws SQLException { - Statement stmt = conn.createStatement(); - stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1"); - } - - private void updateLocks(Connection conn) throws SQLException { - Statement stmt = conn.createStatement(); - stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1"); - } - - @Before - public void setUp() throws Exception { - TxnDbUtil.prepDb(); - txnHandler = TxnUtils.getTxnStore(conf); - } - - @After - public void tearDown() throws Exception { - TxnDbUtil.cleanDb(); - } - - private long openTxn() throws MetaException { - List txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); - return txns.get(0); - } - -} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java new file mode 100644 index 0000000..f5eb8a1 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Background running thread, periodically updating number of open transactions. + * Runs inside Hive Metastore Service. + */ +public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase { + private static final Logger LOG = LoggerFactory.getLogger(AcidOpenTxnsCounterService.class); + @Override + protected long getStartDelayMs() { + return 100; // in miliseconds + } + @Override + protected long getIntervalMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) { + return new OpenTxnsCounter(hiveConf, isAliveCounter); + } + @Override + public String getServiceDescription() { + return "Count number of open transactions"; + } + private static final class OpenTxnsCounter implements Runnable { + private final TxnStore txnHandler; + private final AtomicInteger isAliveCounter; + private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) { + txnHandler = TxnUtils.getTxnStore(hiveConf); + this.isAliveCounter = isAliveCounter; + } + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + txnHandler.countOpenTxns(); + int count = isAliveCounter.incrementAndGet(); + LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + } + catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } + } + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java new file mode 100644 index 0000000..80e3cd6 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -0,0 +1,455 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; + +/** + * Tests for TxnHandler. + */ +public class TestCompactionTxnHandler { + + private HiveConf conf = new HiveConf(); + private TxnStore txnHandler; + + public TestCompactionTxnHandler() throws Exception { + TxnDbUtil.setConfValues(conf); + tearDown(); + } + + @Test + public void testFindNextToCompact() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + long now = System.currentTimeMillis(); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + assertEquals("foo", ci.dbname); + assertEquals("bar", ci.tableName); + assertEquals("ds=today", ci.partName); + assertEquals(CompactionType.MINOR, ci.type); + assertNull(ci.runAs); + assertNull(txnHandler.findNextToCompact("fred")); + + txnHandler.setRunAs(ci.id, "bob"); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(1, compacts.size()); + ShowCompactResponseElement c = compacts.get(0); + assertEquals("foo", c.getDbname()); + assertEquals("bar", c.getTablename()); + assertEquals("ds=today", c.getPartitionname()); + assertEquals(CompactionType.MINOR, c.getType()); + assertEquals("working", c.getState()); + assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now); + assertEquals("fred", c.getWorkerid()); + assertEquals("bob", c.getRunAs()); + } + + @Test + public void testFindNextToCompact2() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=yesterday"); + txnHandler.compact(rqst); + + long now = System.currentTimeMillis(); + boolean expectToday = false; + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + assertEquals("foo", ci.dbname); + assertEquals("bar", ci.tableName); + if ("ds=today".equals(ci.partName)) expectToday = false; + else if ("ds=yesterday".equals(ci.partName)) expectToday = true; + else fail("partition name should have been today or yesterday but was " + ci.partName); + assertEquals(CompactionType.MINOR, ci.type); + + ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + assertEquals("foo", ci.dbname); + assertEquals("bar", ci.tableName); + if (expectToday) assertEquals("ds=today", ci.partName); + else assertEquals("ds=yesterday", ci.partName); + assertEquals(CompactionType.MINOR, ci.type); + + assertNull(txnHandler.findNextToCompact("fred")); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(2, compacts.size()); + for (ShowCompactResponseElement e : compacts) { + assertEquals("working", e.getState()); + assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now); + assertEquals("fred", e.getWorkerid()); + } + } + + @Test + public void testFindNextToCompactNothingToCompact() throws Exception { + assertNull(txnHandler.findNextToCompact("fred")); + } + + @Test + public void testMarkCompacted() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + + txnHandler.markCompacted(ci); + assertNull(txnHandler.findNextToCompact("fred")); + + + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(1, compacts.size()); + ShowCompactResponseElement c = compacts.get(0); + assertEquals("foo", c.getDbname()); + assertEquals("bar", c.getTablename()); + assertEquals("ds=today", c.getPartitionname()); + assertEquals(CompactionType.MINOR, c.getType()); + assertEquals("ready for cleaning", c.getState()); + assertNull(c.getWorkerid()); + } + + @Test + public void testFindNextToClean() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + assertEquals(0, txnHandler.findReadyToClean().size()); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + + assertEquals(0, txnHandler.findReadyToClean().size()); + txnHandler.markCompacted(ci); + assertNull(txnHandler.findNextToCompact("fred")); + + List toClean = txnHandler.findReadyToClean(); + assertEquals(1, toClean.size()); + assertNull(txnHandler.findNextToCompact("fred")); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(1, compacts.size()); + ShowCompactResponseElement c = compacts.get(0); + assertEquals("foo", c.getDbname()); + assertEquals("bar", c.getTablename()); + assertEquals("ds=today", c.getPartitionname()); + assertEquals(CompactionType.MINOR, c.getType()); + assertEquals("ready for cleaning", c.getState()); + assertNull(c.getWorkerid()); + } + + @Test + public void testMarkCleaned() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + assertEquals(0, txnHandler.findReadyToClean().size()); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + + assertEquals(0, txnHandler.findReadyToClean().size()); + txnHandler.markCompacted(ci); + assertNull(txnHandler.findNextToCompact("fred")); + + List toClean = txnHandler.findReadyToClean(); + assertEquals(1, toClean.size()); + assertNull(txnHandler.findNextToCompact("fred")); + txnHandler.markCleaned(ci); + assertNull(txnHandler.findNextToCompact("fred")); + assertEquals(0, txnHandler.findReadyToClean().size()); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + } + + @Test + public void testRevokeFromLocalWorkers() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + txnHandler.compact(rqst); + rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); + txnHandler.compact(rqst); + rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR); + txnHandler.compact(rqst); + assertNotNull(txnHandler.findNextToCompact("fred-193892")); + assertNotNull(txnHandler.findNextToCompact("bob-193892")); + assertNotNull(txnHandler.findNextToCompact("fred-193893")); + txnHandler.revokeFromLocalWorkers("fred"); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(3, compacts.size()); + boolean sawWorkingBob = false; + int initiatedCount = 0; + for (ShowCompactResponseElement c : compacts) { + if (c.getState().equals("working")) { + assertEquals("bob-193892", c.getWorkerid()); + sawWorkingBob = true; + } else if (c.getState().equals("initiated")) { + initiatedCount++; + } else { + fail("Unexpected state"); + } + } + assertTrue(sawWorkingBob); + assertEquals(2, initiatedCount); + } + + @Test + public void testRevokeTimedOutWorkers() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + txnHandler.compact(rqst); + rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); + txnHandler.compact(rqst); + + assertNotNull(txnHandler.findNextToCompact("fred-193892")); + Thread.sleep(200); + assertNotNull(txnHandler.findNextToCompact("fred-193892")); + txnHandler.revokeTimedoutWorkers(100); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(2, compacts.size()); + boolean sawWorking = false, sawInitiated = false; + for (ShowCompactResponseElement c : compacts) { + if (c.getState().equals("working")) sawWorking = true; + else if (c.getState().equals("initiated")) sawInitiated = true; + else fail("Unexpected state"); + } + assertTrue(sawWorking); + assertTrue(sawInitiated); + } + + @Test + public void testFindPotentialCompactions() throws Exception { + // Test that committing unlocks + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, + "mydb"); + comp.setTablename("mytable"); + List components = new ArrayList(1); + components.add(comp); + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, + "mydb"); + comp.setTablename("yourtable"); + comp.setPartitionname("mypartition"); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + assertEquals(0, txnHandler.numLocksInLockTable()); + + Set potentials = txnHandler.findPotentialCompactions(100); + assertEquals(2, potentials.size()); + boolean sawMyTable = false, sawYourTable = false; + for (CompactionInfo ci : potentials) { + sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") && + ci.partName == null); + sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") && + ci.partName.equals("mypartition")); + } + assertTrue(sawMyTable); + assertTrue(sawYourTable); + } + + // TODO test changes to mark cleaned to clean txns and txn_components + + @Test + public void testMarkCleanedCleansTxnsAndTxnComponents() + throws Exception { + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, + "mydb"); + comp.setTablename("mytable"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + + txnid = openTxn(); + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("yourtable"); + components = new ArrayList(1); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + + txnid = openTxn(); + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("foo"); + comp.setPartitionname("bar"); + components = new ArrayList(1); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("foo"); + comp.setPartitionname("baz"); + components = new ArrayList(1); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + + CompactionInfo ci = new CompactionInfo(); + + // Now clean them and check that they are removed from the count. + CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR); + txnHandler.compact(rqst); + assertEquals(0, txnHandler.findReadyToClean().size()); + ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + txnHandler.markCompacted(ci); + + List toClean = txnHandler.findReadyToClean(); + assertEquals(1, toClean.size()); + txnHandler.markCleaned(ci); + + // Check that we are cleaning up the empty aborted transactions + GetOpenTxnsResponse txnList = txnHandler.getOpenTxns(); + assertEquals(3, txnList.getOpen_txnsSize()); + txnHandler.cleanEmptyAbortedTxns(); + txnList = txnHandler.getOpenTxns(); + assertEquals(2, txnList.getOpen_txnsSize()); + + rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR); + rqst.setPartitionname("bar"); + txnHandler.compact(rqst); + assertEquals(0, txnHandler.findReadyToClean().size()); + ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + txnHandler.markCompacted(ci); + + toClean = txnHandler.findReadyToClean(); + assertEquals(1, toClean.size()); + txnHandler.markCleaned(ci); + + txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); + txnHandler.cleanEmptyAbortedTxns(); + txnList = txnHandler.getOpenTxns(); + assertEquals(3, txnList.getOpen_txnsSize()); + } + + @Test + public void addDynamicPartitions() throws Exception { + String dbName = "default"; + String tableName = "adp_table"; + OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")); + long txnId = openTxns.getTxn_ids().get(0); + // lock a table, as in dynamic partitions + LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName); + lc.setTablename(tableName); + LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost"); + lr.setTxnid(txnId); + LockResponse lock = txnHandler.lock(lr); + assertEquals(LockState.ACQUIRED, lock.getState()); + + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"))); + txnHandler.commitTxn(new CommitTxnRequest(txnId)); + + Set potentials = txnHandler.findPotentialCompactions(1000); + assertEquals(2, potentials.size()); + SortedSet sorted = new TreeSet(potentials); + + int i = 0; + for (CompactionInfo ci : sorted) { + assertEquals(dbName, ci.dbname); + assertEquals(tableName, ci.tableName); + switch (i++) { + case 0: assertEquals("ds=today", ci.partName); break; + case 1: assertEquals("ds=yesterday", ci.partName); break; + default: throw new RuntimeException("What?"); + } + } + } + + @Before + public void setUp() throws Exception { + TxnDbUtil.prepDb(); + txnHandler = TxnUtils.getTxnStore(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(); + } + + private long openTxn() throws MetaException { + List txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); + return txns.get(0); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java new file mode 100644 index 0000000..1a118a9 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -0,0 +1,1399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; +import org.apache.hadoop.util.StringUtils; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; + +/** + * Tests for TxnHandler. + */ +public class TestTxnHandler { + static final private String CLASS_NAME = TxnHandler.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + private HiveConf conf = new HiveConf(); + private TxnStore txnHandler; + + public TestTxnHandler() throws Exception { + TxnDbUtil.setConfValues(conf); + LoggerContext ctx = (LoggerContext) LogManager.getContext(false); + Configuration conf = ctx.getConfiguration(); + conf.getLoggerConfig(CLASS_NAME).setLevel(Level.DEBUG); + ctx.updateLoggers(conf); + tearDown(); + } + + @Test + public void testValidTxnsEmpty() throws Exception { + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(0L, txnsInfo.getTxn_high_water_mark()); + assertTrue(txnsInfo.getOpen_txns().isEmpty()); + GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); + assertEquals(0L, txns.getTxn_high_water_mark()); + assertTrue(txns.getOpen_txns().isEmpty()); + } + + @Test + public void testOpenTxn() throws Exception { + long first = openTxn(); + assertEquals(1L, first); + long second = openTxn(); + assertEquals(2L, second); + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(2L, txnsInfo.getTxn_high_water_mark()); + assertEquals(2, txnsInfo.getOpen_txns().size()); + assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); + assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState()); + assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); + assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); + assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser()); + assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname()); + + GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); + assertEquals(2L, txns.getTxn_high_water_mark()); + assertEquals(2, txns.getOpen_txns().size()); + boolean[] saw = new boolean[3]; + for (int i = 0; i < saw.length; i++) saw[i] = false; + for (Long tid : txns.getOpen_txns()) { + saw[tid.intValue()] = true; + } + for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); + } + + @Test + public void testAbortTxn() throws Exception { + OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost")); + List txnList = openedTxns.getTxn_ids(); + long first = txnList.get(0); + assertEquals(1L, first); + long second = txnList.get(1); + assertEquals(2L, second); + txnHandler.abortTxn(new AbortTxnRequest(1)); + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(2L, txnsInfo.getTxn_high_water_mark()); + assertEquals(2, txnsInfo.getOpen_txns().size()); + assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); + assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState()); + assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); + assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); + + GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); + assertEquals(2L, txns.getTxn_high_water_mark()); + assertEquals(2, txns.getOpen_txns().size()); + boolean[] saw = new boolean[3]; + for (int i = 0; i < saw.length; i++) saw[i] = false; + for (Long tid : txns.getOpen_txns()) { + saw[tid.intValue()] = true; + } + for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); + } + + @Test + public void testAbortInvalidTxn() throws Exception { + boolean caught = false; + try { + txnHandler.abortTxn(new AbortTxnRequest(195L)); + } catch (NoSuchTxnException e) { + caught = true; + } + assertTrue(caught); + } + + @Test + public void testValidTxnsNoneOpen() throws Exception { + txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost")); + txnHandler.commitTxn(new CommitTxnRequest(1)); + txnHandler.commitTxn(new CommitTxnRequest(2)); + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(2L, txnsInfo.getTxn_high_water_mark()); + assertEquals(0, txnsInfo.getOpen_txns().size()); + GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); + assertEquals(2L, txns.getTxn_high_water_mark()); + assertEquals(0, txns.getOpen_txns().size()); + } + + @Test + public void testValidTxnsSomeOpen() throws Exception { + txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); + txnHandler.abortTxn(new AbortTxnRequest(1)); + txnHandler.commitTxn(new CommitTxnRequest(2)); + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(3L, txnsInfo.getTxn_high_water_mark()); + assertEquals(2, txnsInfo.getOpen_txns().size()); + assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); + assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState()); + assertEquals(3L, txnsInfo.getOpen_txns().get(1).getId()); + assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); + + GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); + assertEquals(3L, txns.getTxn_high_water_mark()); + assertEquals(2, txns.getOpen_txns().size()); + boolean[] saw = new boolean[4]; + for (int i = 0; i < saw.length; i++) saw[i] = false; + for (Long tid : txns.getOpen_txns()) { + saw[tid.intValue()] = true; + } + assertTrue(saw[1]); + assertFalse(saw[2]); + assertTrue(saw[3]); + } + + @Test + public void testLockDifferentDBs() throws Exception { + // Test that two different databases don't collide on their locks + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockSameDB() throws Exception { + // Test that two different databases don't collide on their locks + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockDbLocksTable() throws Exception { + // Test that locking a database prevents locking of tables in the database + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockDbDoesNotLockTableInDifferentDB() throws Exception { + // Test that locking a database prevents locking of tables in the database + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); + comp.setTablename("mytable"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockDifferentTables() throws Exception { + // Test that two different tables don't collide on their locks + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("yourtable"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockSameTable() throws Exception { + // Test that two different tables don't collide on their locks + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockTableLocksPartition() throws Exception { + // Test that locking a table prevents locking of partitions of the table + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockDifferentTableDoesntLockPartition() throws Exception { + // Test that locking a table prevents locking of partitions of the table + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("yourtable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockDifferentPartitions() throws Exception { + // Test that two different partitions don't collide on their locks + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("yourpartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockSamePartition() throws Exception { + // Test that two different partitions don't collide on their locks + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockSRSR() throws Exception { + // Test that two shared read locks can share a partition + LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockESRSR() throws Exception { + // Test that exclusive lock blocks shared reads + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockSRSW() throws Exception { + // Test that write can acquire after read + LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockESRSW() throws Exception { + // Test that exclusive lock blocks read and write + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockSRE() throws Exception { + // Test that read blocks exclusive + LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockESRE() throws Exception { + // Test that exclusive blocks read and exclusive + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockSWSR() throws Exception { + // Test that read can acquire after write + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockSWSWSR() throws Exception { + // Test that write blocks write but read can still acquire + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testLockSWSWSW() throws Exception { + // Test that write blocks two writes + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockEESW() throws Exception { + // Test that exclusive blocks exclusive and write + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testLockEESR() throws Exception { + // Test that exclusive blocks exclusive and read + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + + comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.WAITING); + } + + @Test + public void testCheckLockAcquireAfterWaiting() throws Exception { + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + long txnId = openTxn(); + req.setTxnid(txnId); + LockResponse res = txnHandler.lock(req); + long lockid1 = res.getLockid(); + assertTrue(res.getState() == LockState.ACQUIRED); + + comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components.clear(); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + res = txnHandler.lock(req); + long lockid2 = res.getLockid(); + assertTrue(res.getState() == LockState.WAITING); + + txnHandler.abortTxn(new AbortTxnRequest(txnId)); + res = txnHandler.checkLock(new CheckLockRequest(lockid2)); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testCheckLockNoSuchLock() throws Exception { + try { + txnHandler.checkLock(new CheckLockRequest(23L)); + fail("Allowed to check lock on non-existent lock"); + } catch (NoSuchLockException e) { + } + } + + @Test + public void testCheckLockTxnAborted() throws Exception { + // Test that when a transaction is aborted, the heartbeat fails + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + long lockid = res.getLockid(); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + try { + // This will throw NoSuchLockException (even though it's the + // transaction we've closed) because that will have deleted the lock. + txnHandler.checkLock(new CheckLockRequest(lockid)); + fail("Allowed to check lock on aborted transaction."); + } catch (NoSuchLockException e) { + } + } + + @Test + public void testMultipleLock() throws Exception { + // Test more than one lock can be handled in a lock request + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(2); + components.add(comp); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("anotherpartition"); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + long lockid = res.getLockid(); + assertTrue(res.getState() == LockState.ACQUIRED); + res = txnHandler.checkLock(new CheckLockRequest(lockid)); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.unlock(new UnlockRequest(lockid)); + assertEquals(0, txnHandler.numLocksInLockTable()); + } + + @Test + public void testMultipleLockWait() throws Exception { + // Test that two shared read locks can share a partition + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(2); + components.add(comp); + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("anotherpartition"); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + long lockid1 = res.getLockid(); + assertTrue(res.getState() == LockState.ACQUIRED); + + + comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + components = new ArrayList(1); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + res = txnHandler.lock(req); + long lockid2 = res.getLockid(); + assertTrue(res.getState() == LockState.WAITING); + + txnHandler.unlock(new UnlockRequest(lockid1)); + + res = txnHandler.checkLock(new CheckLockRequest(lockid2)); + assertTrue(res.getState() == LockState.ACQUIRED); + } + + @Test + public void testUnlockOnCommit() throws Exception { + // Test that committing unlocks + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + assertEquals(0, txnHandler.numLocksInLockTable()); + } + + @Test + public void testUnlockOnAbort() throws Exception { + // Test that committing unlocks + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + assertEquals(0, txnHandler.numLocksInLockTable()); + } + + @Test + public void testUnlockWithTxn() throws Exception { + LOG.debug("Starting testUnlockWithTxn"); + // Test that attempting to unlock locks associated with a transaction + // generates an error + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + long lockid = res.getLockid(); + try { + txnHandler.unlock(new UnlockRequest(lockid)); + fail("Allowed to unlock lock associated with transaction."); + } catch (TxnOpenException e) { + } + } + + @Test + public void testHeartbeatTxnAborted() throws Exception { + // Test that when a transaction is aborted, the heartbeat fails + openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(1)); + HeartbeatRequest h = new HeartbeatRequest(); + h.setTxnid(1); + try { + txnHandler.heartbeat(h); + fail("Told there was a txn, when it should have been aborted."); + } catch (TxnAbortedException e) { + } + } + + @Test + public void testHeartbeatNoTxn() throws Exception { + // Test that when a transaction is aborted, the heartbeat fails + HeartbeatRequest h = new HeartbeatRequest(); + h.setTxnid(939393L); + try { + txnHandler.heartbeat(h); + fail("Told there was a txn, when there wasn't."); + } catch (NoSuchTxnException e) { + } + } + + @Test + public void testHeartbeatLock() throws Exception { + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS); + HeartbeatRequest h = new HeartbeatRequest(); + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + h.setLockid(res.getLockid()); + for (int i = 0; i < 30; i++) { + try { + txnHandler.heartbeat(h); + } catch (NoSuchLockException e) { + fail("Told there was no lock, when the heartbeat should have kept it."); + } + } + } + + @Test + public void heartbeatTxnRange() throws Exception { + long txnid = openTxn(); + assertEquals(1, txnid); + txnid = openTxn(); + txnid = openTxn(); + HeartbeatTxnRangeResponse rsp = + txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); + assertEquals(0, rsp.getAborted().size()); + assertEquals(0, rsp.getNosuch().size()); + } + + @Test + public void heartbeatTxnRangeOneCommitted() throws Exception { + long txnid = openTxn(); + assertEquals(1, txnid); + txnHandler.commitTxn(new CommitTxnRequest(1)); + txnid = openTxn(); + txnid = openTxn(); + HeartbeatTxnRangeResponse rsp = + txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); + assertEquals(1, rsp.getNosuchSize()); + Long txn = rsp.getNosuch().iterator().next(); + assertEquals(1L, (long)txn); + assertEquals(0, rsp.getAborted().size()); + } + + @Test + public void heartbeatTxnRangeOneAborted() throws Exception { + long txnid = openTxn(); + assertEquals(1, txnid); + txnid = openTxn(); + txnid = openTxn(); + txnHandler.abortTxn(new AbortTxnRequest(3)); + HeartbeatTxnRangeResponse rsp = + txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3)); + assertEquals(1, rsp.getAbortedSize()); + Long txn = rsp.getAborted().iterator().next(); + assertEquals(3L, (long)txn); + assertEquals(0, rsp.getNosuch().size()); + } + + @Test + public void testLockTimeout() throws Exception { + long timeout = txnHandler.setTimeout(1); + try { + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + Thread.sleep(10); + txnHandler.performTimeOuts(); + txnHandler.checkLock(new CheckLockRequest(res.getLockid())); + fail("Told there was a lock, when it should have timed out."); + } catch (NoSuchLockException e) { + } finally { + txnHandler.setTimeout(timeout); + } + } + + @Test + public void testRecoverManyTimeouts() throws Exception { + long timeout = txnHandler.setTimeout(1); + try { + txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost")); + Thread.sleep(10); + txnHandler.performTimeOuts(); + GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo(); + int numAborted = 0; + for (TxnInfo txnInfo : rsp.getOpen_txns()) { + assertEquals(TxnState.ABORTED, txnInfo.getState()); + numAborted++; + } + assertEquals(503, numAborted); + } finally { + txnHandler.setTimeout(timeout); + } + + + } + + @Test + public void testHeartbeatNoLock() throws Exception { + HeartbeatRequest h = new HeartbeatRequest(); + h.setLockid(29389839L); + try { + txnHandler.heartbeat(h); + fail("Told there was a lock, when there wasn't."); + } catch (NoSuchLockException e) { + } + } + + @Test + public void testCompactMajorWithPartition() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(1, compacts.size()); + ShowCompactResponseElement c = compacts.get(0); + assertEquals("foo", c.getDbname()); + assertEquals("bar", c.getTablename()); + assertEquals("ds=today", c.getPartitionname()); + assertEquals(CompactionType.MAJOR, c.getType()); + assertEquals("initiated", c.getState()); + assertEquals(0L, c.getStart()); + } + + @Test + public void testCompactMinorNoPartition() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setRunas("fred"); + txnHandler.compact(rqst); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + assertEquals(1, compacts.size()); + ShowCompactResponseElement c = compacts.get(0); + assertEquals("foo", c.getDbname()); + assertEquals("bar", c.getTablename()); + assertNull(c.getPartitionname()); + assertEquals(CompactionType.MINOR, c.getType()); + assertEquals("initiated", c.getState()); + assertEquals(0L, c.getStart()); + assertEquals("fred", c.getRunAs()); + } + + @Test + public void showLocks() throws Exception { + long begining = System.currentTimeMillis(); + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + + // Open txn + long txnid = openTxn(); + comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb"); + comp.setTablename("mytable"); + components = new ArrayList(1); + components.add(comp); + req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + res = txnHandler.lock(req); + + // Locks not associated with a txn + components = new ArrayList(1); + comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb"); + comp.setTablename("yourtable"); + comp.setPartitionname("yourpartition"); + components.add(comp); + req = new LockRequest(components, "you", "remotehost"); + res = txnHandler.lock(req); + + ShowLocksResponse rsp = txnHandler.showLocks(new ShowLocksRequest()); + List locks = rsp.getLocks(); + assertEquals(3, locks.size()); + boolean[] saw = new boolean[locks.size()]; + for (int i = 0; i < saw.length; i++) saw[i] = false; + for (ShowLocksResponseElement lock : locks) { + if (lock.getLockid() == 1) { + assertEquals(0, lock.getTxnid()); + assertEquals("mydb", lock.getDbname()); + assertNull(lock.getTablename()); + assertNull(lock.getPartname()); + assertEquals(LockState.ACQUIRED, lock.getState()); + assertEquals(LockType.EXCLUSIVE, lock.getType()); + assertTrue(lock.toString(), 0 != lock.getLastheartbeat()); + assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining + + " and " + System.currentTimeMillis(), + begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat()); + assertEquals("me", lock.getUser()); + assertEquals("localhost", lock.getHostname()); + saw[0] = true; + } else if (lock.getLockid() == 2) { + assertEquals(1, lock.getTxnid()); + assertEquals("mydb", lock.getDbname()); + assertEquals("mytable", lock.getTablename()); + assertNull(lock.getPartname()); + assertEquals(LockState.WAITING, lock.getState()); + assertEquals(LockType.SHARED_READ, lock.getType()); + assertTrue(lock.toString(), 0 == lock.getLastheartbeat() && + lock.getTxnid() != 0); + assertEquals(0, lock.getAcquiredat()); + assertEquals("me", lock.getUser()); + assertEquals("localhost", lock.getHostname()); + saw[1] = true; + } else if (lock.getLockid() == 3) { + assertEquals(0, lock.getTxnid()); + assertEquals("yourdb", lock.getDbname()); + assertEquals("yourtable", lock.getTablename()); + assertEquals("yourpartition", lock.getPartname()); + assertEquals(LockState.ACQUIRED, lock.getState()); + assertEquals(LockType.SHARED_READ, lock.getType()); + assertTrue(lock.toString(), begining <= lock.getLastheartbeat() && + System.currentTimeMillis() >= lock.getLastheartbeat()); + assertTrue(begining <= lock.getAcquiredat() && + System.currentTimeMillis() >= lock.getAcquiredat()); + assertEquals("you", lock.getUser()); + assertEquals("remotehost", lock.getHostname()); + saw[2] = true; + } else { + fail("Unknown lock id"); + } + } + for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]); + } + + @Test + @Ignore("Wedges Derby") + public void deadlockDetected() throws Exception { + LOG.debug("Starting deadlock test"); + if (txnHandler instanceof TxnHandler) { + final TxnHandler tHndlr = (TxnHandler)txnHandler; + Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = conn.createStatement(); + long now = tHndlr.getDbTime(conn); + stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + + "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + + "'scooby.com')"); + stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + + "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + + tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + + "'scooby.com')"); + conn.commit(); + tHndlr.closeDbConn(conn); + + final AtomicBoolean sawDeadlock = new AtomicBoolean(); + + final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + try { + + for (int i = 0; i < 5; i++) { + Thread t1 = new Thread() { + @Override + public void run() { + try { + try { + updateTxns(conn1); + updateLocks(conn1); + Thread.sleep(1000); + conn1.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + tHndlr.checkRetryable(conn1, e, "thread t1"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.RetryException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.set(true); + } + } + conn1.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + Thread t2 = new Thread() { + @Override + public void run() { + try { + try { + updateLocks(conn2); + updateTxns(conn2); + Thread.sleep(1000); + conn2.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + tHndlr.checkRetryable(conn2, e, "thread t2"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.RetryException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.set(true); + } + } + conn2.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + if (sawDeadlock.get()) break; + } + assertTrue(sawDeadlock.get()); + } finally { + conn1.rollback(); + tHndlr.closeDbConn(conn1); + conn2.rollback(); + tHndlr.closeDbConn(conn2); + } + } + } + + /** + * This cannnot be run against Derby (thus in UT) but it can run againt MySQL. + * 1. add to metastore/pom.xml + * + * mysql + * mysql-connector-java + * 5.1.30 + * + * 2. Hack in the c'tor of this class + * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore"); + * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive"); + * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive"); + * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); + * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack() + * + */ + @Ignore("multiple threads wedge Derby") + @Test + public void testMutexAPI() throws Exception { + final TxnStore.MutexAPI api = txnHandler.getMutexAPI(); + final AtomicInteger stepTracker = new AtomicInteger(0); + /** + * counter = 0; + * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock + * Thread2 counter=2, lock (and block), inc counter, should be 4 + */ + Thread t1 = new Thread("MutexTest1") { + public void run() { + try { + stepTracker.incrementAndGet();//now 1 + TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); + Thread.sleep(4000); + //stepTracker should now be 2 which indicates t2 has started + Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get()); + stepTracker.incrementAndGet();//now 3 + handle.releaseLocks(); + } + catch(Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + }; + t1.setDaemon(true); + ErrorHandle ueh1 = new ErrorHandle(); + t1.setUncaughtExceptionHandler(ueh1); + Thread t2 = new Thread("MutexTest2") { + public void run() { + try { + stepTracker.incrementAndGet();//now 2 + //this should block until t1 unlocks + TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name()); + stepTracker.incrementAndGet();//now 4 + Assert.assertEquals(4, stepTracker.get()); + handle.releaseLocks(); + stepTracker.incrementAndGet();//now 5 + } + catch(Exception ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } + }; + t2.setDaemon(true); + ErrorHandle ueh2 = new ErrorHandle(); + t2.setUncaughtExceptionHandler(ueh2); + t1.start(); + try { + Thread.sleep(1000); + } + catch(InterruptedException ex) { + LOG.info("Sleep was interrupted"); + } + t2.start(); + t1.join(6000);//so that test doesn't block + t2.join(6000); + + if(ueh1.error != null) { + Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false); + } + if (ueh2.error != null) { + Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false); + } + Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get()); + } + private final static class ErrorHandle implements Thread.UncaughtExceptionHandler { + Throwable error = null; + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage()); + error = e; + } + } + + @Test + public void testRetryableRegex() throws Exception { + SQLException sqlException = new SQLException("ORA-08177: can't serialize access for this transaction", "72000"); + // Note that we have 3 regex'es below + conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, "^Deadlock detected, roll back,.*08177.*,.*08178.*"); + boolean result = TxnHandler.isRetryable(conf, sqlException); + Assert.assertTrue("regex should be retryable", result); + + sqlException = new SQLException("This error message, has comma in it"); + conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, ".*comma.*"); + result = TxnHandler.isRetryable(conf, sqlException); + Assert.assertTrue("regex should be retryable", result); + } + + private void updateTxns(Connection conn) throws SQLException { + Statement stmt = conn.createStatement(); + stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1"); + } + + private void updateLocks(Connection conn) throws SQLException { + Statement stmt = conn.createStatement(); + stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1"); + } + + @Before + public void setUp() throws Exception { + TxnDbUtil.prepDb(); + txnHandler = TxnUtils.getTxnStore(conf); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(); + } + + private long openTxn() throws MetaException { + List txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids(); + return txns.get(0); + } + +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index d80a03e..a1bd0fb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -26,9 +26,12 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HouseKeeperService; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; @@ -39,6 +42,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; +import org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService; import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -704,7 +708,7 @@ public static void runHouseKeeperService(HouseKeeperService houseKeeperService, while(houseKeeperService.getIsAliveCounter() <= lastCount) { if(iterCount++ >= maxIter) { //prevent test hangs - throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits"); + throw new IllegalStateException("HouseKeeper didn't run after " + (iterCount - 1) + " waits"); } try { Thread.sleep(100);//make sure it has run at least once @@ -794,6 +798,41 @@ public void testFailHeartbeater() throws Exception { Assert.assertTrue(exception.getMessage().contains("HIVETESTMODEFAILHEARTBEATER=true")); } + @Test + public void testOpenTxnsCounter() throws Exception { + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3); + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 10, TimeUnit.MILLISECONDS); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); + + AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService(); + runHouseKeeperService(openTxnsCounterService, hiveConf); // will update current number of open txns to 3 + + MetaException exception = null; + // This should fail once it finds out the threshold has been reached + try { + txnHandler.openTxns(new OpenTxnRequest(1, "you", "localhost")); + } catch (MetaException e) { + exception = e; + } + Assert.assertNotNull("Opening new transaction shouldn't be allowed", exception); + Assert.assertTrue(exception.getMessage().equals("Maximum allowed number of open transactions has been reached. See hive.max.open.txns.")); + + // After committing the initial txns, and updating current number of open txns back to 0, + // new transactions should be allowed to open + for (long txnid : openTxnsResponse.getTxn_ids()) { + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + } + runHouseKeeperService(openTxnsCounterService, hiveConf); // will update current number of open txns back to 0 + exception = null; + try { + txnHandler.openTxns(new OpenTxnRequest(1, "him", "localhost")); + } catch (MetaException e) { + exception = e; + } + Assert.assertNull(exception); + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order