diff --git hcatalog/core/pom.xml hcatalog/core/pom.xml index b5e85cd..9a81e8c 100644 --- hcatalog/core/pom.xml +++ hcatalog/core/pom.xml @@ -71,6 +71,12 @@ jackson-mapper-asl ${jackson.version} + + org.mockito + mockito-all + ${mockito-all.version} + test + diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java index 360e77b..1907d8f 100644 --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java @@ -183,4 +183,14 @@ private void readObject(ObjectInputStream ois) new ObjectInputStream(new InflaterInputStream(ois)); partitions = (List)partInfoReader.readObject(); } + + + /** + * Built to enable testing with this class in other packages. + * @param partitions + */ + public void setPartitionsForTesting(List partitions) { + this.partitions = partitions; + } + } diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java new file mode 100644 index 0000000..6893ae3 --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbLockManager.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.txn; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.thrift.TException; + +import java.util.List; + +/** + * Database lock manager for HCatalog. Extends + * {@link org.apache.hadoop.hive.ql.lockmgr.DbLockManager} with the ability to fetch back its + * lock info from the database. This is useful for MR jobs where the OutputCommitter + * needs to be retried. + */ +public class HCatDbLockManager extends DbLockManager { + + static final private Log LOG = LogFactory.getLog(HCatDbLockManager.class.getName()); + + + private String jobid; + + HCatDbLockManager(HiveMetaStoreClient client, String jobid) { + super(client, true); + this.jobid = jobid; + } + + @Override + public List getLocks(boolean verifyTablePartitions, boolean fetchData) + throws LockException { + if (locks == null || locks.size() == 0) { + reconstructLockInfo(); + } + return super.getLocks(verifyTablePartitions, fetchData); + } + + private void reconstructLockInfo() throws LockException { + // We need to find the transaction associated with this client. Usually this means we are in + // a retry state and thus we no longer know our transaction id. + try { + List lockList = client.showLocks().getLocks(); + for (ShowLocksResponseElement lock : lockList) { + if (lock.getUser().contains('-' + jobid)) { + locks.add(new DbHiveLock(lock.getLockid())); + LOG.debug("Recovering lock with id " + lock.getLockid() + " from metastore"); + } + } + } catch (TException e) { + throw new LockException("Unable to communicate with metastore to find our locks: " + + e.getMessage(), e); + } + + } +} diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java new file mode 100644 index 0000000..85fa4fe --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/txn/HCatDbTxnManager.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.txn; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.LockComponentBuilder; +import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hive.hcatalog.mapreduce.PartInfo; +import org.apache.thrift.TException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Database transaction manager for HCatalog. Extends + * {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager} with the ability to fetch back its + * transaction info from the database. This is useful for MR jobs where the OutputCommitter + * needs to be retried. + */ +public class HCatDbTxnManager extends DbTxnManager { + + static final private Log LOG = LogFactory.getLog(HCatDbTxnManager.class.getName()); + + // The jobid is appended to the user name so that we can find our transactions in case we need + // to reconstruct the list. + private String jobid; + + HCatDbTxnManager() { + super(true); + } + + /** + * Set the jobid for this transaction manager. This must be called before any of the other + * calls in this class. + * @param jobid Hadoop jobid. + */ + public void setJobid(String jobid) { + this.jobid = jobid; + } + + @Override + protected DbLockManager instantiateLockMgr(HiveMetaStoreClient client) { + return new HCatDbLockManager(client, jobid); + } + + @Override + public void openTxn(String user) throws LockException { + super.openTxn(buildUserName(user)); + } + + /** + * An HCatalog specific version of acquireLocks. + * @param input Input information for this job + * @param output Output information for this job + * @param user Name of the user + */ + public void acquireLocks(InputJobInfo input, OutputJobInfo output, String user) + throws LockException { + init(); + getLockManager(); + LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + + LOG.debug("Setting lock request transaction id to " + txnId); + rqstBuilder.setTransactionId(txnId).setUser(buildUserName(user)); + + if (input != null) { + List parts = input.getPartitions(); + if (parts != null && parts.size() > 0) { + for (PartInfo part : parts) { + addInputComponent(rqstBuilder, input, part); + } + } else { + addInputComponent(rqstBuilder, input, null); + } + } + + if (output != null) { + LockComponentBuilder compBuilder = new LockComponentBuilder(); + compBuilder.setShared() // Shared because we are just inserting + .setDbName(output.getDatabaseName()) + .setTableName(output.getTableName()); + Map partVals = output.getPartitionValues(); + if (partVals != null && partVals.size() > 0 && !output.isDynamicPartitioningUsed()) { + compBuilder.setPartitionName(FileUtils.makePartName( + new ArrayList(partVals.keySet()), new ArrayList(partVals.values()))); + } + LockComponent component = compBuilder.build(); + LOG.debug("Adding lock component to lock request " + component.toString()); + rqstBuilder.addLockComponent(component); + } + lockMgr.lock(rqstBuilder.build()); + } + + @Override + public void commitTxn() throws LockException { + if (txnId < 1) reconstructTxnInfo(); + super.commitTxn(); + } + + @Override + public void rollbackTxn() throws LockException { + if (txnId < 1) reconstructTxnInfo(); + super.rollbackTxn(); + } + + @Override + public void heartbeat() throws LockException { + if (txnId < 1) reconstructTxnInfo(); + super.heartbeat(); + } + + /** + * We override finalize to remove the + * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl} implmentation which calls + * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl#destruct()}. We don't want that + * called in the general case when this is finalized because it rolls back transactions and + * releases locks. Since the object might be finalized due to OutputCommitter failure, + * which will later be retried, we don't want to have destruct called. We can't override + * destruct because we still want + * {@link org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager#closeTxnManager()} to call it and + * have the expected effects of rolling back any open transactions and releasing any currently + * held locks. + * @throws Throwable + */ + @Override + protected void finalize() throws Throwable { + } + + /** + * For testing only. + * @return + */ + String getJobid() { + return jobid; + } + + private void addInputComponent(LockRequestBuilder rqstBuilder, InputJobInfo input, + PartInfo part) { + LockComponentBuilder compBuilder = new LockComponentBuilder(); + compBuilder.setShared() + .setDbName(input.getDatabaseName()) + .setTableName(input.getTableName()); + if (part != null) { + Map partVals = part.getPartitionValues(); + compBuilder.setPartitionName(FileUtils.makePartName(new ArrayList(partVals.keySet()), + new ArrayList(partVals.values()))); + } + LockComponent component = compBuilder.build(); + LOG.debug("Adding lock component to lock request " + component.toString()); + rqstBuilder.addLockComponent(component); + } + + private String buildUserName(String user) { + return user + '-' + jobid; + } + + private void reconstructTxnInfo() throws LockException { + // We need to find the transaction associated with this client. Usually this means we are in + // a retry state and thus we no longer know our transaction id. + try { + List txns = client.showTxns().getOpen_txns(); + for (TxnInfo txn : txns) { + if (txn.getUser().contains('-' + jobid)) { + txnId = txn.getId(); + return; + } + } + throw new LockException("Unable to find a transaction that matches our jobid: " + jobid); + } catch (TException e) { + throw new LockException("Unable to communicate with metastore to find our transaction id: " + + e.getMessage(), e); + } + } +} diff --git hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java new file mode 100644 index 0000000..b9f0653 --- /dev/null +++ hcatalog/core/src/test/java/org/apache/hive/hcatalog/txn/TestHCatDbTxnManager.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.hcatalog.txn; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hive.hcatalog.mapreduce.InputJobInfo; +import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hive.hcatalog.mapreduce.PartInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass;import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays;import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Tests for {@link org.apache.hive.hcatalog.txn.HCatDbTxnManager} + */ +public class TestHCatDbTxnManager { + static final private Log LOG = LogFactory.getLog(TestHCatDbTxnManager.class.getName()); + + private static HiveStorageHandler mockStorageHandler; + + private HiveConf conf; + private HCatDbTxnManager txnMgr; + + public TestHCatDbTxnManager() throws Exception { + tearDown(); // Just in case there were left overs from a previous run. + } + + @Test + public void testSingleTableRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", null); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSinglePartitionRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSinglePartitionMultiValueRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(addPartValue(null, "region", "us"), "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testMultiPartitionRead() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(addPartition(null, addPartValue(null, "ds", "yeserday")), + addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(2, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.getLockManager().unlock(locks.get(0)); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testUnlockWithNewTxnMgr() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).getLockId())); + String jobid = txnMgr.getJobid(); + // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its + // locks, which we don't want + HCatDbTxnManager newTxnMgr = + (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf); + newTxnMgr.setJobid(jobid); + newTxnMgr.getLockManager().unlock(locks.get(0)); + locks = newTxnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testShowWithNewTxnMgr() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + txnMgr.acquireLocks(input, null, "fred"); + String jobid = txnMgr.getJobid(); + // Don't overwrite the existing txnMgr, as they may cause it to call finalize and release its + // locks, which we don't want. + HCatDbTxnManager newTxnMgr = + (HCatDbTxnManager)TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf); + newTxnMgr.setJobid(jobid); + List locks = newTxnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + newTxnMgr.getLockManager().unlock(locks.get(0)); + locks = newTxnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSingleTableWrite() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testSinglePartitionWrite() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", + addPartValue(null, "ds", "today")); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testDynamicPartitionWrite() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null); + output.setDynamicPartitioningKeys(Arrays.asList(new String[]{"ds"})); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + // Make sure we're locking the whole table, since this is dynamic partitioning + ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(); + List elms = rsp.getLocks(); + Assert.assertEquals(1, elms.size()); + Assert.assertNotNull(elms.get(0).getTablename()); + Assert.assertNull(elms.get(0).getPartname()); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testRollback() throws Exception { + OutputJobInfo output = OutputJobInfo.create("mydb", "mytable", null); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(null, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(1, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.rollbackTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testReadWrite() throws Exception { + InputJobInfo input = buildInput("mydb", "mytable", + addPartition(null, addPartValue(null, "ds", "today"))); + OutputJobInfo output = OutputJobInfo.create("mydb", "yourtable", + addPartValue(null, "ds", "today")); + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(input, output, "fred"); + List locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(1, locks.size()); + Assert.assertEquals(2, + TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock)locks.get(0)).getLockId())); + txnMgr.commitTxn(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Before + public void setup() throws Exception { + conf = new HiveConf(); + TxnDbUtil.setConfValues(conf); + TxnDbUtil.prepDb(); + HiveTxnManager tmpMgr = TxnManagerFactory.getTxnManagerFactory(true).getTxnManager(conf); + Assert.assertTrue(tmpMgr instanceof HCatDbTxnManager); + txnMgr = (HCatDbTxnManager)tmpMgr; + txnMgr.setJobid(UUID.randomUUID().toString()); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(); + } + + private InputJobInfo buildInput(String dbName, String tableName, + List> parts) throws Exception { + InputJobInfo input = InputJobInfo.create(dbName, tableName, "whatever", null); + if (parts != null) { + List pis = new ArrayList(parts.size()); + for (Map m : parts) { + PartInfo pi = new PartInfo(null, mockStorageHandler, null, null, null, + null); + pi.setPartitionValues(m); + pis.add(pi); + } + input.setPartitionsForTesting(pis); + } + return input; + } + + private Map addPartValue(Map p, String key, String value) { + if (p == null) p = new HashMap(); + p.put(key, value); + return p; + } + + private List> addPartition(List> list, + Map partVals) { + if (list == null) list = new ArrayList>(); + list.add(partVals); + return list; + } + + @BeforeClass + public static void buildMockStorageHandler() { + mockStorageHandler = Mockito.mock(HiveStorageHandler.class); + final InputFormat mif = Mockito.mock(InputFormat.class); + Mockito.when(mockStorageHandler.getInputFormatClass()).thenAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return mif.getClass(); + } + } + ); + final OutputFormat mof = Mockito.mock(OutputFormat.class); + Mockito.when(mockStorageHandler.getOutputFormatClass()).thenAnswer( + new Answer() { + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return mof.getClass(); + } + } + ); + final SerDe ms = Mockito.mock(SerDe.class); + Mockito.when(mockStorageHandler.getSerDeClass()).thenAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return ms.getClass(); + } + } + ); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 5f02950..caf361d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -40,9 +40,9 @@ static final private Log LOG = LogFactory.getLog(CLASS_NAME); private static final long MAX_SLEEP = 15000; + protected Set locks; + protected HiveMetaStoreClient client; private HiveLockManagerCtx context; - private Set locks; - private HiveMetaStoreClient client; private long nextSleep = 50; DbLockManager(HiveMetaStoreClient client) { @@ -50,6 +50,10 @@ this.client = client; } + protected DbLockManager(HiveMetaStoreClient client, boolean fromHcat) { + this(client); + } + @Override public void setContext(HiveLockManagerCtx ctx) throws LockException { context = ctx; @@ -73,7 +77,7 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, * @param lock lock request * @throws LockException */ - List lock(LockRequest lock) throws LockException { + public List lock(LockRequest lock) throws LockException { try { LOG.debug("Requesting lock"); LockResponse res = client.lock(lock); @@ -174,14 +178,18 @@ public void refresh() { // NOP } - static class DbHiveLock extends HiveLock { + public static class DbHiveLock extends HiveLock { long lockId; - DbHiveLock(long id) { + public DbHiveLock(long id) { lockId = id; } + public long getLockId() { + return lockId; + } + @Override public HiveLockObject getHiveLockObject() { throw new UnsupportedOperationException(); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index f74f683..6eaf786 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -45,13 +45,23 @@ static final private String CLASS_NAME = DbTxnManager.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); - private DbLockManager lockMgr = null; - private HiveMetaStoreClient client = null; - private long txnId = 0; + protected DbLockManager lockMgr = null; + protected HiveMetaStoreClient client = null; + protected long txnId = 0; + /** + * Do not instantiate directly, use {@link org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory}. + */ DbTxnManager() { } + /** + * A constructor for extending classes. + * @param fromHCat + */ + protected DbTxnManager(boolean fromHCat) { + } + @Override public void openTxn(String user) throws LockException { init(); @@ -68,7 +78,7 @@ public void openTxn(String user) throws LockException { public HiveLockManager getLockManager() throws LockException { init(); if (lockMgr == null) { - lockMgr = new DbLockManager(client); + lockMgr = instantiateLockMgr(client); } return lockMgr; } @@ -170,6 +180,8 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo compBuilder.setTableName(t.getTableName()); break; + // TODO, I don't think this is right. For dynamic partitioning case (DUMMYPARTITION) we + // should be setting the lock on the whole table I think. case PARTITION: case DUMMYPARTITION: compBuilder.setPartitionName(output.getPartition().getName()); @@ -306,7 +318,11 @@ protected void destruct() { } } - private void init() throws LockException { + protected DbLockManager instantiateLockMgr(HiveMetaStoreClient client) { + return new DbLockManager(client); + } + + protected void init() throws LockException { if (client == null) { if (conf == null) { throw new RuntimeException("Must call setHiveConf before any other " + diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java index 4d616d0..3ecc018 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/TxnManagerFactory.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.util.ReflectionUtils; @@ -32,19 +34,27 @@ */ public class TxnManagerFactory { + static final private Log LOG = LogFactory.getLog(TxnManagerFactory.class.getName()); + private static TxnManagerFactory self; + private boolean inHcat; /** * Get the singleton instance of this factory. + * @param hcat should be set to true if this is an Hcatalog process rather than a Hive client. * @return this factory */ - public static synchronized TxnManagerFactory getTxnManagerFactory() { + public static synchronized TxnManagerFactory getTxnManagerFactory(boolean hcat) { if (self == null) { - self = new TxnManagerFactory(); + self = new TxnManagerFactory(hcat); } return self; } + public static TxnManagerFactory getTxnManagerFactory() { + return getTxnManagerFactory(false); + } + /** * Create a new transaction manager. The transaction manager to * instantiate will be determined by the hive.txn.manager value in the @@ -64,6 +74,11 @@ public HiveTxnManager getTxnManager(HiveConf conf) throws if (txnMgrName == null || txnMgrName.isEmpty()) { throw new LockException(ErrorMsg.TXNMGR_NOT_SPECIFIED.getMsg()); } + if (inHcat && txnMgrName.equals(DbTxnManager.class.getName())) { + // This is unfortunate, but I don't see a way around it. I could create an HCatalog + // specific TxnManagerFactory, but I'd end up duplicating a lot of logic. + txnMgrName = "org.apache.hive.hcatalog.txn.HCatDbTxnManager"; + } // Instantiate the chosen transaction manager try { @@ -72,11 +87,13 @@ public HiveTxnManager getTxnManager(HiveConf conf) throws impl.setHiveConf(conf); txnMgr = impl; } catch (ClassNotFoundException e) { + LOG.error("Unable to instantiate " + txnMgrName + ", " + e.getMessage()); throw new LockException(ErrorMsg.TXNMGR_NOT_INSTANTIATED.getMsg()); } return txnMgr; } - private TxnManagerFactory() { + private TxnManagerFactory(boolean hcat) { + inHcat = hcat; } }