diff --git hcatalog/core/pom.xml hcatalog/core/pom.xml
index b5e85cd..9a81e8c 100644
--- hcatalog/core/pom.xml
+++ hcatalog/core/pom.xml
@@ -71,6 +71,12 @@
jackson-mapper-asl
${jackson.version}
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 360e77b..1907d8f 100644
--- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -183,4 +183,14 @@ private void readObject(ObjectInputStream ois)
new ObjectInputStream(new InflaterInputStream(ois));
partitions = (List)partInfoReader.readObject();
}
+
+
+ /**
+ * Built to enable testing with this class in other packages.
+ * @param partitions
+ */
+ public void setPartitionsForTesting(List partitions) {
+ this.partitions = partitions;
+ }
+
}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java
new file mode 100644
index 0000000..6893ae3
--- /dev/null
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.txn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.thrift.TException;
+
+import java.util.List;
+
+/**
+ * Database lock manager for HCatalog. Extends
+ * {@link org.apache.hadoop.hive.ql.lockmgr.DbLockManager} with the ability to fetch back its
+ * lock info from the database. This is useful for MR jobs where the OutputCommitter
+ * needs to be retried.
+ */
+public class HCatDbLockManager extends DbLockManager {
+
+ static final private Log LOG = LogFactory.getLog(HCatDbLockManager.class.getName());
+
+
+ private String jobid;
+
+ HCatDbLockManager(HiveMetaStoreClient client, String jobid) {
+ super(client, true);
+ this.jobid = jobid;
+ }
+
+ @Override
+ public List getLocks(boolean verifyTablePartitions, boolean fetchData)
+ throws LockException {
+ if (locks == null || locks.size() == 0) {
+ reconstructLockInfo();
+ }
+ return super.getLocks(verifyTablePartitions, fetchData);
+ }
+
+ private void reconstructLockInfo() throws LockException {
+ // We need to find the transaction associated with this client. Usually this means we are in
+ // a retry state and thus we no longer know our transaction id.
+ try {
+ List lockList = client.showLocks().getLocks();
+ for (ShowLocksResponseElement lock : lockList) {
+ if (lock.getUser().contains('-' + jobid)) {
+ locks.add(new DbHiveLock(lock.getLockid()));
+ LOG.debug("Recovering lock with id " + lock.getLockid() + " from metastore");
+ }
+ }
+ } catch (TException e) {
+ throw new LockException("Unable to communicate with metastore to find our locks: "
+ + e.getMessage(), e);
+ }
+
+ }
+}
diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java
new file mode 100644
index 0000000..85fa4fe
--- /dev/null
+++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.txn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.PartInfo;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Database transaction manager for HCatalog. Extends
+ * {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager} with the ability to fetch back its
+ * transaction info from the database. This is useful for MR jobs where the OutputCommitter
+ * needs to be retried.
+ */
+public class HCatDbTxnManager extends DbTxnManager {
+
+ static final private Log LOG = LogFactory.getLog(HCatDbTxnManager.class.getName());
+
+ // The jobid is appended to the user name so that we can find our transactions in case we need
+ // to reconstruct the list.
+ private String jobid;
+
+ HCatDbTxnManager() {
+ super(true);
+ }
+
+ /**
+ * Set the jobid for this transaction manager. This must be called before any of the other
+ * calls in this class.
+ * @param jobid Hadoop jobid.
+ */
+ public void setJobid(String jobid) {
+ this.jobid = jobid;
+ }
+
+ @Override
+ protected DbLockManager instantiateLockMgr(HiveMetaStoreClient client) {
+ return new HCatDbLockManager(client, jobid);
+ }
+
+ @Override
+ public void openTxn(String user) throws LockException {
+ super.openTxn(buildUserName(user));
+ }
+
+ /**
+ * An HCatalog specific version of acquireLocks.
+ * @param input Input information for this job
+ * @param output Output information for this job
+ * @param user Name of the user
+ */
+ public void acquireLocks(InputJobInfo input, OutputJobInfo output, String user)
+ throws LockException {
+ init();
+ getLockManager();
+ LockRequestBuilder rqstBuilder = new LockRequestBuilder();
+
+ LOG.debug("Setting lock request transaction id to " + txnId);
+ rqstBuilder.setTransactionId(txnId).setUser(buildUserName(user));
+
+ if (input != null) {
+ List parts = input.getPartitions();
+ if (parts != null && parts.size() > 0) {
+ for (PartInfo part : parts) {
+ addInputComponent(rqstBuilder, input, part);
+ }
+ } else {
+ addInputComponent(rqstBuilder, input, null);
+ }
+ }
+
+ if (output != null) {
+ LockComponentBuilder compBuilder = new LockComponentBuilder();
+ compBuilder.setShared() // Shared because we are just inserting
+ .setDbName(output.getDatabaseName())
+ .setTableName(output.getTableName());
+ Map partVals = output.getPartitionValues();
+ if (partVals != null && partVals.size() > 0 && !output.isDynamicPartitioningUsed()) {
+ compBuilder.setPartitionName(FileUtils.makePartName(
+ new ArrayList(partVals.keySet()), new ArrayList(partVals.values())));
+ }
+ LockComponent component = compBuilder.build();
+ LOG.debug("Adding lock component to lock request " + component.toString());
+ rqstBuilder.addLockComponent(component);
+ }
+ lockMgr.lock(rqstBuilder.build());
+ }
+
+ @Override
+ public void commitTxn() throws LockException {
+ if (txnId < 1) reconstructTxnInfo();
+ super.commitTxn();
+ }
+
+ @Override
+ public void rollbackTxn() throws LockException {
+ if (txnId < 1) reconstructTxnInfo();
+ super.rollbackTxn();
+ }
+
+ @Override
+ public void heartbeat() throws LockException {
+ if (txnId < 1) reconstructTxnInfo();
+ super.heartbeat();
+ }
+
+ /**
+ * We override finalize to remove the
+ * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl} implmentation which calls
+ * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl#destruct()}. We don't want that
+ * called in the general case when this is finalized because it rolls back transactions and
+ * releases locks. Since the object might be finalized due to OutputCommitter failure,
+ * which will later be retried, we don't want to have destruct called. We can't override
+ * destruct because we still want
+ * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager#closeTxnManager()} to call it and
+ * have the expected effects of rolling back any open transactions and releasing any currently
+ * held locks.
+ * @throws Throwable
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ }
+
+ /**
+ * For testing only.
+ * @return
+ */
+ String getJobid() {
+ return jobid;
+ }
+
+ private void addInputComponent(LockRequestBuilder rqstBuilder, InputJobInfo input,
+ PartInfo part) {
+ LockComponentBuilder compBuilder = new LockComponentBuilder();
+ compBuilder.setShared()
+ .setDbName(input.getDatabaseName())
+ .setTableName(input.getTableName());
+ if (part != null) {
+ Map partVals = part.getPartitionValues();
+ compBuilder.setPartitionName(FileUtils.makePartName(new ArrayList(partVals.keySet()),
+ new ArrayList(partVals.values())));
+ }
+ LockComponent component = compBuilder.build();
+ LOG.debug("Adding lock component to lock request " + component.toString());
+ rqstBuilder.addLockComponent(component);
+ }
+
+ private String buildUserName(String user) {
+ return user + '-' + jobid;
+ }
+
+ private void reconstructTxnInfo() throws LockException {
+ // We need to find the transaction associated with this client. Usually this means we are in
+ // a retry state and thus we no longer know our transaction id.
+ try {
+ List txns = client.showTxns().getOpen_txns();
+ for (TxnInfo txn : txns) {
+ if (txn.getUser().contains('-' + jobid)) {
+ txnId = txn.getId();
+ return;
+ }
+ }
+ throw new LockException("Unable to find a transaction that matches our jobid: " + jobid);
+ } catch (TException e) {
+ throw new LockException("Unable to communicate with metastore to find our transaction id: "
+ + e.getMessage(), e);
+ }
+ }
+}
diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java
new file mode 100644
index 0000000..b9f0653
--- /dev/null
+++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.txn;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.PartInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Tests for {@link org.apache.hive.hcatalog.txn.HCatDbTxnManager}
+ */
+public class TestHCatDbTxnManager {
+ static final private Log LOG = LogFactory.getLog(TestHCatDbTxnManager.class.getName());
+
+ private static HiveStorageHandler mockStorageHandler;
+
+ private HiveConf conf;
+ private HCatDbTxnManager txnMgr;
+
+ public TestHCatDbTxnManager() throws Exception {
+ tearDown(); // Just in case there were left overs from a previous run.
+ }
+
+ @Test
+ public void testSingleTableRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable", null);
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSinglePartitionRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSinglePartitionMultiValueRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(addPartValue(null, "region", "us"), "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testMultiPartitionRead() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(addPartition(null, addPartValue(null, "ds", "yeserday")),
+ addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(2,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.getLockManager().unlock(locks.get(0));
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testUnlockWithNewTxnMgr() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).getLockId()));
+ String jobid = txnMgr.getJobid();
+ // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its
+ // locks, which we don't want
+ HCatDbTxnManager newTxnMgr =
+ (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf);
+ newTxnMgr.setJobid(jobid);
+ newTxnMgr.getLockManager().unlock(locks.get(0));
+ locks = newTxnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testShowWithNewTxnMgr() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ txnMgr.acquireLocks(input, null, "fred");
+ String jobid = txnMgr.getJobid();
+ // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its
+ // locks, which we don't want.
+ HCatDbTxnManager newTxnMgr =
+ (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf);
+ newTxnMgr.setJobid(jobid);
+ List locks = newTxnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ newTxnMgr.getLockManager().unlock(locks.get(0));
+ locks = newTxnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSingleTableWrite() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null);
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testSinglePartitionWrite() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable",
+ addPartValue(null, "ds", "today"));
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testDynamicPartitionWrite() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null);
+ output.setDynamicPartitioningKeys(Arrays.asList(new String[]{"ds"}));
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ // Make sure we're locking the whole table, since this is dynamic partitioning
+ ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
+ List elms = rsp.getLocks();
+ Assert.assertEquals(1, elms.size());
+ Assert.assertNotNull(elms.get(0).getTablename());
+ Assert.assertNull(elms.get(0).getPartname());
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testRollback() throws Exception {
+ OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null);
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(null, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(1,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.rollbackTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testReadWrite() throws Exception {
+ InputJobInfo input = buildInput("mydb", "mytable",
+ addPartition(null, addPartValue(null, "ds", "today")));
+ OutputJobInfo output = OutputJobInfo.create("mydb", "yourtable",
+ addPartValue(null, "ds", "today"));
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(input, output, "fred");
+ List locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(2,
+ TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId()));
+ txnMgr.commitTxn();
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new HiveConf();
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.prepDb();
+ HiveTxnManager tmpMgr = TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf);
+ Assert.assertTrue(tmpMgr instanceof HCatDbTxnManager);
+ txnMgr = (HCatDbTxnManager)tmpMgr;
+ txnMgr.setJobid(UUID.randomUUID().toString());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TxnDbUtil.cleanDb();
+ }
+
+ private InputJobInfo buildInput(String dbName, String tableName,
+ List