diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b78bea2..f84c940 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -39,6 +39,7 @@ import javax.security.auth.login.LoginException; +import com.google.common.base.Joiner; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,7 +57,6 @@ import org.apache.hadoop.util.Shell; import org.apache.hive.common.HiveCompat; -import com.google.common.base.Joiner; /** * Hive Configuration. @@ -607,6 +607,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore", "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" + "This class is used to store and retrieval of raw metadata objects such as table, database"), + METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl", + "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler", + "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " + + "class is used to store and retrieve transactions and locks"), METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver", "Driver class name for a JDBC metastore"), METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass", diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 9c0f374..37bbab8 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -8,7 +8,6 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -20,10 +19,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -187,7 +186,7 @@ public void schemaEvolutionAddColDynamicPartitioningInsert() throws Exception { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(4, compacts.size()); @@ -290,7 +289,7 @@ public void schemaEvolutionAddColDynamicPartitioningUpdate() throws Exception { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(4, compacts.size()); @@ -363,7 +362,7 @@ public void testStatsAfterCompactionPartTbl() throws Exception { execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " + tblName + " after load:"); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR); LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci)); Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf, @@ -498,7 +497,7 @@ public void dynamicPartitioningInsert() throws Exception { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(2, compacts.size()); @@ -538,7 +537,7 @@ public void dynamicPartitioningUpdate() throws Exception { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(2, compacts.size()); @@ -580,7 +579,7 @@ public void dynamicPartitioningDelete() throws Exception { initiator.init(stop, new AtomicBoolean()); initiator.run(); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); @@ -620,7 +619,7 @@ public void minorCompactWhileStreaming() throws Exception { writeBatch(connection, writer, true); // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -682,7 +681,7 @@ public void majorCompactWhileStreaming() throws Exception { writeBatch(connection, writer, true); // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -738,7 +737,7 @@ public void minorCompactAfterAbort() throws Exception { txnBatch.abort(); // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -804,7 +803,7 @@ public void majorCompactAfterAbort() throws Exception { // Now, compact - CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java index 767bc54..b241e9e 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -25,7 +25,8 @@ import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; /** @@ -33,7 +34,7 @@ */ public class AcidEventListener extends MetaStoreEventListener { - private TxnHandler txnHandler; + private TxnStore txnHandler; private HiveConf hiveConf; public AcidEventListener(Configuration configuration) { @@ -46,24 +47,47 @@ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { // We can loop thru all the tables to check if they are ACID first and then perform cleanup, // but it's more efficient to unconditionally perform cleanup for the database, especially // when there are a lot of tables - txnHandler = new TxnHandler(hiveConf); + txnHandler = getTxnHandler(); txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null); } @Override public void onDropTable(DropTableEvent tableEvent) throws MetaException { - if (TxnHandler.isAcidTable(tableEvent.getTable())) { - txnHandler = new TxnHandler(hiveConf); + if (TxnUtils.isAcidTable(tableEvent.getTable())) { + txnHandler = getTxnHandler(); txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null); } } @Override public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { - if (TxnHandler.isAcidTable(partitionEvent.getTable())) { - txnHandler = new TxnHandler(hiveConf); + if (TxnUtils.isAcidTable(partitionEvent.getTable())) { + txnHandler = getTxnHandler(); txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(), partitionEvent.getPartitionIterator()); } } + private TxnStore getTxnHandler() { + boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) || + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST); + String origTxnMgr = null; + boolean origConcurrency = false; + + // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues, + // which may change the values of below two entries, we need to avoid pulluting the original values + if (hackOn) { + origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + } + + txnHandler = TxnUtils.getTxnStore(hiveConf); + + // Set them back + if (hackOn) { + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency); + } + + return txnHandler; + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index fba545d..bf65532 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; - import org.apache.commons.cli.OptionBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -176,7 +175,8 @@ import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.shims.HadoopShims; @@ -308,9 +308,9 @@ protected synchronized RawStore initialValue() { } }; - private final ThreadLocal threadLocalTxn = new ThreadLocal() { + private static final ThreadLocal threadLocalTxn = new ThreadLocal() { @Override - protected synchronized TxnHandler initialValue() { + protected TxnStore initialValue() { return null; } }; @@ -584,10 +584,10 @@ public RawStore getMS() throws MetaException { return ms; } - private TxnHandler getTxnHandler() { - TxnHandler txn = threadLocalTxn.get(); + private TxnStore getTxnHandler() { + TxnStore txn = threadLocalTxn.get(); if (txn == null) { - txn = new TxnHandler(hiveConf); + txn = TxnUtils.getTxnStore(hiveConf); threadLocalTxn.set(txn); } return txn; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 393ef3b..50bf43c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -134,7 +134,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; @@ -1846,12 +1846,12 @@ public void cancelDelegationToken(String tokenStrForm) throws MetaException, TEx @Override public ValidTxnList getValidTxns() throws TException { - return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0); + return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0); } @Override public ValidTxnList getValidTxns(long currentTxn) throws TException { - return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn); + return TxnUtils.createValidReadTxnList(client.get_open_txns(), currentTxn); } @Override diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 28e06ed..f7c738a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -20,27 +20,33 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.util.StringUtils; -import java.sql.*; -import java.util.*; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** * Extends the transaction handler with methods needed only by the compactor threads. These * methods are not available through the thrift interface. */ -public class CompactionTxnHandler extends TxnHandler { +class CompactionTxnHandler extends TxnHandler { static final private String CLASS_NAME = CompactionTxnHandler.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS // See TxnHandler for notes on how to deal with deadlocks. Follow those notes. - public CompactionTxnHandler(HiveConf conf) { - super(conf); + public CompactionTxnHandler() { } /** @@ -385,7 +391,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { } // Populate the complete query with provided prefix and suffix - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -450,7 +456,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { prefix.append("delete from TXNS where "); suffix.append(""); - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -620,27 +626,6 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { } /** - * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a - * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to - * compact the files, and thus treats only open transactions as invalid. Additionally any - * txnId > highestOpenTxnId is also invalid. This is avoid creating something like - * delta_17_120 where txnId 80, for example, is still open. - * @param txns txn list from the metastore - * @return a valid txn list. - */ - public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { - long highWater = txns.getTxn_high_water_mark(); - long minOpenTxn = Long.MAX_VALUE; - long[] exceptions = new long[txns.getOpen_txnsSize()]; - int i = 0; - for (TxnInfo txn : txns.getOpen_txns()) { - if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); - exceptions[i++] = txn.getId(); - } - highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; - return new ValidCompactorTxnList(exceptions, -1, highWater); - } - /** * Record the highest txn id that the {@code ci} compaction job will pay attention to. */ public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { @@ -746,7 +731,7 @@ public void purgeCompactionHistory() throws MetaException { prefix.append("delete from COMPLETED_COMPACTIONS where "); suffix.append(""); - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 0ddc078..350c5a6 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -87,14 +87,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class TxnHandler { - // Compactor states (Should really be enum) - static final public String INITIATED_RESPONSE = "initiated"; - static final public String WORKING_RESPONSE = "working"; - static final public String CLEANING_RESPONSE = "ready for cleaning"; - static final public String FAILED_RESPONSE = "failed"; - static final public String SUCCEEDED_RESPONSE = "succeeded"; - static final public String ATTEMPTED_RESPONSE = "attempted"; +abstract class TxnHandler implements TxnStore { static final protected char INITIATED_STATE = 'i'; static final protected char WORKING_STATE = 'w'; @@ -131,7 +124,7 @@ * Number of consecutive deadlocks we have seen */ private int deadlockCnt; - private final long deadlockRetryInterval; + private long deadlockRetryInterval; protected HiveConf conf; protected DatabaseProduct dbProduct; @@ -139,8 +132,8 @@ private long timeout; private String identifierQuoteString; // quotes to use for quoting tables, where necessary - private final long retryInterval; - private final int retryLimit; + private long retryInterval; + private int retryLimit; private int retryNum; /** * Derby specific concurrency control @@ -157,7 +150,10 @@ // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction, // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. - public TxnHandler(HiveConf conf) { + public TxnHandler() { + } + + public void setConf(HiveConf conf) { this.conf = conf; checkQFileTestHack(); @@ -183,7 +179,6 @@ public TxnHandler(HiveConf conf) { TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; - } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @@ -1211,6 +1206,7 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) * Clean up corresponding records in metastore tables, specifically: * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS */ + @Override public void cleanupRecords(HiveObjectType type, Database db, Table table, Iterator partitionIterator) throws MetaException { try { @@ -1485,7 +1481,7 @@ private static boolean needNewQuery(HiveConf conf, StringBuilder sb) { * For testing only, do not use. */ @VisibleForTesting - int numLocksInLockTable() throws SQLException, MetaException { + public int numLocksInLockTable() throws SQLException, MetaException { Connection dbConn = null; Statement stmt = null; ResultSet rs = null; @@ -1508,7 +1504,7 @@ int numLocksInLockTable() throws SQLException, MetaException { /** * For testing only, do not use. */ - long setTimeout(long milliseconds) { + public long setTimeout(long milliseconds) { long previous_timeout = timeout; timeout = milliseconds; return previous_timeout; @@ -1975,7 +1971,7 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat) suffix.append(""); } - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -1998,7 +1994,7 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat) prefix.append("delete from HIVE_LOCKS where "); suffix.append(""); - TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java new file mode 100644 index 0000000..6fc6ed9 --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; + +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * A handler to answer transaction related calls that come into the metastore + * server. + * + * Note on log messages: Please include txnid:X and lockid info using + * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} + * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages. + * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, + * so keeping the format consistent makes grep'ing the logs much easier. + * + * Note on HIVE_LOCKS.hl_last_heartbeat. + * For locks that are part of transaction, we set this 0 (would rather set it to NULL but + * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding + * transaction in TXNS. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface TxnStore { + + // Compactor states (Should really be enum) + static final public String INITIATED_RESPONSE = "initiated"; + static final public String WORKING_RESPONSE = "working"; + static final public String CLEANING_RESPONSE = "ready for cleaning"; + static final public String FAILED_RESPONSE = "failed"; + static final public String SUCCEEDED_RESPONSE = "succeeded"; + static final public String ATTEMPTED_RESPONSE = "attempted"; + + public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000; + + public void setConf(HiveConf conf); + + /** + * Get information about open transactions. This gives extensive information about the + * transactions rather than just the list of transactions. This should be used when the need + * is to see information about the transactions (e.g. show transactions). + * @return information about open transactions + * @throws MetaException + */ + public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException; + + /** + * Get list of valid transactions. This gives just the list of transactions that are open. + * @return list of open transactions, as well as a high water mark. + * @throws MetaException + */ + public GetOpenTxnsResponse getOpenTxns() throws MetaException; + + /** + * Open a set of transactions + * @param rqst request to open transactions + * @return information on opened transactions + * @throws MetaException + */ + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException; + + /** + * Abort (rollback) a transaction. + * @param rqst info on transaction to abort + * @throws NoSuchTxnException + * @throws MetaException + */ + public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException; + + /** + * Commit a transaction + * @param rqst info on transaction to commit + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + public void commitTxn(CommitTxnRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Obtain a lock. + * @param rqst information on the lock to obtain. If the requester is part of a transaction + * the txn information must be included in the lock request. + * @return info on the lock, including whether it was obtained. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + public LockResponse lock(LockRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait + * state. + * @param rqst info on the lock to check + * @return info on the state of the lock + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + public LockResponse checkLock(CheckLockRequest rqst) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case + * the txn should be committed or aborted instead. (Note someday this will change since + * multi-statement transactions will allow unlocking in the transaction.) + * @param rqst lock to unlock + * @throws NoSuchLockException + * @throws TxnOpenException + * @throws MetaException + */ + public void unlock(UnlockRequest rqst) + throws NoSuchLockException, TxnOpenException, MetaException; + + /** + * Get information on current locks. + * @param rqst lock information to retrieve + * @return lock information. + * @throws MetaException + */ + public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException; + + /** + * Send a heartbeat for a lock or a transaction + * @param ids lock and/or txn id to heartbeat + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + public void heartbeat(HeartbeatRequest ids) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Heartbeat a group of transactions together + * @param rqst set of transactions to heartbat + * @return info on txns that were heartbeated + * @throws MetaException + */ + public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) + throws MetaException; + + /** + * Submit a compaction request into the queue. This is called when a user manually requests a + * compaction. + * @param rqst information on what to compact + * @return id of the compaction that has been started + * @throws MetaException + */ + public long compact(CompactionRequest rqst) throws MetaException; + + /** + * Show list of current compactions + * @param rqst info on which compactions to show + * @return compaction information + * @throws MetaException + */ + public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException; + + /** + * Add information on a set of dynamic partitions that participated in a transaction. + * @param rqst dynamic partition info. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + public void addDynamicPartitions(AddDynamicPartitions rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Clean up corresponding records in metastore tables + * @param type Hive object type + * @param db database object + * @param table table object + * @param partitionIterator partition iterator + * @throws MetaException + */ + public void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator partitionIterator) throws MetaException; + /** + * Timeout transactions and/or locks. This should only be called by the compactor. + */ + public void performTimeOuts(); + + /** + * This will look through the completed_txn_components table and look for partitions or tables + * that may be ready for compaction. Also, look through txns and txn_components tables for + * aborted transactions that we should add to the list. + * @param maxAborted Maximum number of aborted queries to allow before marking this as a + * potential compaction. + * @return list of CompactionInfo structs. These will not have id, type, + * or runAs set since these are only potential compactions not actual ones. + */ + public Set findPotentialCompactions(int maxAborted) throws MetaException; + + /** + * Sets the user to run as. This is for the case + * where the request was generated by the user and so the worker must set this value later. + * @param cq_id id of this entry in the queue + * @param user user to run the jobs as + */ + public void setRunAs(long cq_id, String user) throws MetaException; + + /** + * This will grab the next compaction request off of + * the queue, and assign it to the worker. + * @param workerId id of the worker calling this, will be recorded in the db + * @return an info element for this compaction request, or null if there is no work to do now. + */ + public CompactionInfo findNextToCompact(String workerId) throws MetaException; + + /** + * This will mark an entry in the queue as compacted + * and put it in the ready to clean state. + * @param info info on the compaction entry to mark as compacted. + */ + public void markCompacted(CompactionInfo info) throws MetaException; + + /** + * Find entries in the queue that are ready to + * be cleaned. + * @return information on the entry in the queue. + */ + public List findReadyToClean() throws MetaException; + + /** + * This will remove an entry from the queue after + * it has been compacted. + * + * @param info info on the compaction entry to remove + */ + public void markCleaned(CompactionInfo info) throws MetaException; + + /** + * Mark a compaction entry as failed. This will move it to the compaction history queue with a + * failed status. It will NOT clean up aborted transactions in the table/partition associated + * with this compaction. + * @param info information on the compaction that failed. + * @throws MetaException + */ + public void markFailed(CompactionInfo info) throws MetaException; + + /** + * Clean up aborted transactions from txns that have no components in txn_components. The reson such + * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + */ + public void cleanEmptyAbortedTxns() throws MetaException; + + /** + * This will take all entries assigned to workers + * on a host return them to INITIATED state. The initiator should use this at start up to + * clean entries from any workers that were in the middle of compacting when the metastore + * shutdown. It does not reset entries from worker threads on other hosts as those may still + * be working. + * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, + * so that like hostname% will match the worker id. + */ + public void revokeFromLocalWorkers(String hostname) throws MetaException; + + /** + * This call will return all compaction queue + * entries assigned to a worker but over the timeout back to the initiated state. + * This should be called by the initiator on start up and occasionally when running to clean up + * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called + * first. + * @param timeout number of milliseconds since start time that should elapse before a worker is + * declared dead. + */ + public void revokeTimedoutWorkers(long timeout) throws MetaException; + + /** + * Queries metastore DB directly to find columns in the table which have statistics information. + * If {@code ci} includes partition info then per partition stats info is examined, otherwise + * table level stats are examined. + * @throws MetaException + */ + public List findColumnsWithStats(CompactionInfo ci) throws MetaException; + + /** + * Record the highest txn id that the {@code ci} compaction job will pay attention to. + */ + public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException; + + /** + * For any given compactable entity (partition, table if not partitioned) the history of compactions + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the + * history such that a configurable number of each type of state is present. Any other entries + * can be purged. This scheme has advantage of always retaining the last failure/success even if + * it's not recent. + * @throws MetaException + */ + public void purgeCompactionHistory() throws MetaException; + + /** + * Determine if there are enough consecutive failures compacting a table or partition that no + * new automatic compactions should be scheduled. User initiated compactions do not do this + * check. + * @param ci Table or partition to check. + * @return true if it is ok to compact, false if there have been too many failures. + * @throws MetaException + */ + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; + + + @VisibleForTesting + public int numLocksInLockTable() throws SQLException, MetaException; + + @VisibleForTesting + long setTimeout(long milliseconds); +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java new file mode 100644 index 0000000..f60e34b --- /dev/null +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TxnUtils { + private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted transactions as invalid. + * @param txns txn list from the metastore + * @param currentTxn Current transaction that the user has open. If this is greater than 0 it + * will be removed from the exceptions list so that the user sees his own + * transaction as valid. + * @return a valid txn list. + */ + public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { + long highWater = txns.getTxn_high_water_mark(); + Set open = txns.getOpen_txns(); + long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; + int i = 0; + for(long txn: open) { + if (currentTxn > 0 && currentTxn == txn) continue; + exceptions[i++] = txn; + } + return new ValidReadTxnList(exceptions, highWater); + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * compact the files, and thus treats only open transactions as invalid. Additionally any + * txnId > highestOpenTxnId is also invalid. This is avoid creating something like + * delta_17_120 where txnId 80, for example, is still open. + * @param txns txn list from the metastore + * @return a valid txn list. + */ + public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { + long highWater = txns.getTxn_high_water_mark(); + long minOpenTxn = Long.MAX_VALUE; + long[] exceptions = new long[txns.getOpen_txnsSize()]; + int i = 0; + for (TxnInfo txn : txns.getOpen_txns()) { + if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); + exceptions[i++] = txn.getId(); + } + highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; + return new ValidCompactorTxnList(exceptions, -1, highWater); + } + + /** + * Get an instance of the TxnStore that is appropriate for this store + * @param conf configuration + * @return txn store + */ + public static TxnStore getTxnStore(HiveConf conf) { + String className = conf.getVar(HiveConf.ConfVars.METASTORE_TXN_STORE_IMPL); + try { + TxnStore handler = ((Class) MetaStoreUtils.getClass( + className)).newInstance(); + handler.setConf(conf); + return handler; + } catch (Exception e) { + LOG.error("Unable to instantiate raw store directly in fastpath mode", e); + throw new RuntimeException(e); + } + } + + /** Checks if a table is a valid ACID table. + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ + public static boolean isAcidTable(Table table) { + if (table == null) { + return false; + } + Map parameters = table.getParameters(); + String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } + /** + * Build a query (or queries if one query is too big) with specified "prefix" and "suffix", + * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6) + * For NOT IN case, NOT IN list is broken into multiple AND clauses. + * @param queries array of complete query strings + * @param prefix part of the query that comes before IN list + * @param suffix part of the query that comes after IN list + * @param inList the list containing IN list values + * @param inColumn column name of IN list operator + * @param addParens add a pair of parenthesis outside the IN lists + * e.g. ( id in (1,2,3) OR id in (4,5,6) ) + * @param notIn clause to be broken up is NOT IN + */ + public static void buildQueryWithINClause(HiveConf conf, List queries, StringBuilder prefix, + StringBuilder suffix, List inList, + String inColumn, boolean addParens, boolean notIn) { + int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE); + int numWholeBatches = inList.size() / batchSize; + StringBuilder buf = new StringBuilder(); + buf.append(prefix); + if (addParens) { + buf.append("("); + } + buf.append(inColumn); + if (notIn) { + buf.append(" not in ("); + } else { + buf.append(" in ("); + } + + for (int i = 0; i <= numWholeBatches; i++) { + if (needNewQuery(conf, buf)) { + // Wrap up current query string + if (addParens) { + buf.append(")"); + } + buf.append(suffix); + queries.add(buf.toString()); + + // Prepare a new query string + buf.setLength(0); + } + + if (i > 0) { + if (notIn) { + if (buf.length() == 0) { + buf.append(prefix); + if (addParens) { + buf.append("("); + } + } else { + buf.append(" and "); + } + buf.append(inColumn); + buf.append(" not in ("); + } else { + if (buf.length() == 0) { + buf.append(prefix); + if (addParens) { + buf.append("("); + } + } else { + buf.append(" or "); + } + buf.append(inColumn); + buf.append(" in ("); + } + } + + if (i * batchSize == inList.size()) { + // At this point we just realized we don't need another query + return; + } + for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) { + buf.append(inList.get(j)).append(","); + } + buf.setCharAt(buf.length() - 1, ')'); + } + + if (addParens) { + buf.append(")"); + } + buf.append(suffix); + queries.add(buf.toString()); + } + + /** Estimate if the size of a string will exceed certain limit */ + private static boolean needNewQuery(HiveConf conf, StringBuilder sb) { + int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH); + // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml + long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8); + return sizeInBytes / 1024 > queryMemoryLimit; + } +} diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 051da60..bdeacb9 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -42,8 +42,7 @@ public class TestCompactionTxnHandler { private HiveConf conf = new HiveConf(); - private CompactionTxnHandler txnHandler; - static final private Log LOG = LogFactory.getLog(TestCompactionTxnHandler.class); + private TxnStore txnHandler; public TestCompactionTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); @@ -424,7 +423,7 @@ public void addDynamicPartitions() throws Exception { @Before public void setUp() throws Exception { TxnDbUtil.prepDb(); - txnHandler = new CompactionTxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); } @After diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 930af7c..c27dce0 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -34,7 +34,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static junit.framework.Assert.*; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; /** * Tests for TxnHandler. @@ -44,7 +48,7 @@ static final private Log LOG = LogFactory.getLog(CLASS_NAME); private HiveConf conf = new HiveConf(); - private TxnHandler txnHandler; + private TxnStore txnHandler; public TestTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); @@ -1111,99 +1115,102 @@ public void showLocks() throws Exception { @Ignore public void deadlockDetected() throws Exception { LOG.debug("Starting deadlock test"); - Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - Statement stmt = conn.createStatement(); - long now = txnHandler.getDbTime(conn); - stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + - "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + - "'scooby.com')"); - stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + - "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + - "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + - txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + - "'scooby.com')"); - conn.commit(); - txnHandler.closeDbConn(conn); - - final AtomicBoolean sawDeadlock = new AtomicBoolean(); - - final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE); - try { + if (txnHandler instanceof TxnHandler) { + final TxnHandler tHndlr = (TxnHandler)txnHandler; + Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Statement stmt = conn.createStatement(); + long now = tHndlr.getDbTime(conn); + stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + + "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + + "'scooby.com')"); + stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + + "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + + "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + + tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + + "'scooby.com')"); + conn.commit(); + tHndlr.closeDbConn(conn); + + final AtomicBoolean sawDeadlock = new AtomicBoolean(); + + final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); + try { - for (int i = 0; i < 5; i++) { - Thread t1 = new Thread() { - @Override - public void run() { - try { + for (int i = 0; i < 5; i++) { + Thread t1 = new Thread() { + @Override + public void run() { try { - updateTxns(conn1); - updateLocks(conn1); - Thread.sleep(1000); - conn1.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { try { - txnHandler.checkRetryable(conn1, e, "thread t1"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); + updateTxns(conn1); + updateLocks(conn1); + Thread.sleep(1000); + conn1.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + tHndlr.checkRetryable(conn1, e, "thread t1"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.RetryException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.set(true); + } } + conn1.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); } - conn1.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); } - } - }; + }; - Thread t2 = new Thread() { - @Override - public void run() { - try { + Thread t2 = new Thread() { + @Override + public void run() { try { - updateLocks(conn2); - updateTxns(conn2); - Thread.sleep(1000); - conn2.commit(); - LOG.debug("no exception, no deadlock"); - } catch (SQLException e) { try { - txnHandler.checkRetryable(conn2, e, "thread t2"); - LOG.debug("Got an exception, but not a deadlock, SQLState is " + - e.getSQLState() + " class of exception is " + e.getClass().getName() + - " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.RetryException de) { - LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + - "exception is " + e.getClass().getName() + " msg is <" + e - .getMessage() + ">"); - sawDeadlock.set(true); + updateLocks(conn2); + updateTxns(conn2); + Thread.sleep(1000); + conn2.commit(); + LOG.debug("no exception, no deadlock"); + } catch (SQLException e) { + try { + tHndlr.checkRetryable(conn2, e, "thread t2"); + LOG.debug("Got an exception, but not a deadlock, SQLState is " + + e.getSQLState() + " class of exception is " + e.getClass().getName() + + " msg is <" + e.getMessage() + ">"); + } catch (TxnHandler.RetryException de) { + LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + + "exception is " + e.getClass().getName() + " msg is <" + e + .getMessage() + ">"); + sawDeadlock.set(true); + } } + conn2.rollback(); + } catch (Exception e) { + throw new RuntimeException(e); } - conn2.rollback(); - } catch (Exception e) { - throw new RuntimeException(e); } - } - }; - - t1.start(); - t2.start(); - t1.join(); - t2.join(); - if (sawDeadlock.get()) break; + }; + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + if (sawDeadlock.get()) break; + } + assertTrue(sawDeadlock.get()); + } finally { + conn1.rollback(); + tHndlr.closeDbConn(conn1); + conn2.rollback(); + tHndlr.closeDbConn(conn2); } - assertTrue(sawDeadlock.get()); - } finally { - conn1.rollback(); - txnHandler.closeDbConn(conn1); - conn2.rollback(); - txnHandler.closeDbConn(conn2); } } @@ -1297,7 +1304,7 @@ private void runAgainstDerby(List queries) throws Exception { @Before public void setUp() throws Exception { TxnDbUtil.prepDb(); - txnHandler = new TxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); } @After diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java index abe1e37..6c27515 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java @@ -37,7 +37,7 @@ public void testBadConnection() throws Exception { conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah"); RuntimeException e = null; try { - TxnHandler txnHandler1 = new TxnHandler(conf); + TxnUtils.getTxnStore(conf); } catch(RuntimeException ex) { LOG.info("Expected error: " + ex.getMessage(), ex); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java index a91ca5c..59c8fe4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java @@ -18,20 +18,12 @@ package org.apache.hadoop.hive.ql.txn; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HouseKeeperService; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; -import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -60,10 +52,10 @@ public String getServiceDescription() { } private static final class ObsoleteEntryReaper implements Runnable { - private final CompactionTxnHandler txnHandler; + private final TxnStore txnHandler; private final AtomicInteger isAliveCounter; private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { - txnHandler = new CompactionTxnHandler(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); this.isAliveCounter = isAliveCounter; } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java index 38151fb..de74a7b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -20,15 +20,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HouseKeeperService; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -58,10 +53,10 @@ public String getServiceDescription() { } private static final class TimedoutTxnReaper implements Runnable { - private final TxnHandler txnHandler; + private final TxnStore txnHandler; private final AtomicInteger isAliveCounter; private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { - txnHandler = new TxnHandler(hiveConf); + txnHandler = TxnUtils.getTxnStore(hiveConf); this.isAliveCounter = isAliveCounter; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index c956f58..ae8865c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -31,7 +31,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -50,7 +51,7 @@ static final private Log LOG = LogFactory.getLog(CLASS_NAME); protected HiveConf conf; - protected CompactionTxnHandler txnHandler; + protected TxnStore txnHandler; protected RawStore rs; protected int threadId; protected AtomicBoolean stop; @@ -75,7 +76,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException setDaemon(true); // this means the process will exit without waiting for this thread // Get our own instance of the transaction handler - txnHandler = new CompactionTxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); // Get our own connection to the database so we can get table and partition information. rs = RawStoreProxy.getProxy(conf, conf, diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index c023c27..1898a4d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -37,8 +37,8 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -81,7 +81,7 @@ public void run() { try {//todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); ValidTxnList txns = - CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); Set potentials = txnHandler.findPotentialCompactions(abortedThreshold); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); @@ -184,7 +184,7 @@ private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { - if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) && + if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 59a765b..516b92e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -27,13 +27,15 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; @@ -135,7 +137,7 @@ public void run() { final boolean isMajor = ci.isMajorCompaction(); final ValidTxnList txns = - CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); + TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo()); LOG.debug("ValidCompactTxnList: " + txns.writeToString()); txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark()); final StringBuilder jobName = new StringBuilder(name); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 44b77e7..6f8dc35 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -32,10 +32,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; @@ -486,7 +486,7 @@ public void testInitiatorWithMultipleFailedCompactions() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); - CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); AtomicBoolean stop = new AtomicBoolean(true); //create failed compactions for(int i = 0; i < numFailedCompactions; i++) { @@ -556,27 +556,27 @@ public void testInitiatorWithMultipleFailedCompactions() throws Exception { private int working; private int total; } - private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException { + private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException { ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); CompactionsByState compactionsByState = new CompactionsByState(); compactionsByState.total = resp.getCompactsSize(); for(ShowCompactResponseElement compact : resp.getCompacts()) { - if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) { + if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) { compactionsByState.failed++; } - else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) { + else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) { compactionsByState.readyToClean++; } - else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) { + else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) { compactionsByState.initiated++; } - else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) { + else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) { compactionsByState.succeeded++; } - else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) { + else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) { compactionsByState.working++; } - else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) { + else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) { compactionsByState.attempted++; } } @@ -632,7 +632,7 @@ public void writeBetweenWorkerAndCleaner() throws Exception { runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3"); //run Worker to execute compaction - CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); Worker t = new Worker(); t.setThreadId((int) t.getId()); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index a4f7e5b..99705b4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; @@ -42,7 +42,11 @@ import org.junit.Before; import org.junit.Test; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -243,10 +247,10 @@ public void testLockTimeout() throws Exception { } expireLocks(txnMgr, 5); //create a lot of locks - for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { + for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) { ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat } - expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); + expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17); } private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception { DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager(); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 5545574..cac4623 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -20,15 +20,31 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreThread; -import org.apache.hadoop.hive.metastore.api.*; -import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -39,7 +55,11 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; import org.apache.thrift.TException; @@ -62,7 +82,7 @@ static final private String CLASS_NAME = CompactorTest.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); - protected CompactionTxnHandler txnHandler; + protected TxnStore txnHandler; protected IMetaStoreClient ms; protected long sleepTime = 1000; protected HiveConf conf; @@ -75,7 +95,7 @@ protected CompactorTest() throws Exception { TxnDbUtil.setConfValues(conf); TxnDbUtil.cleanDb(); ms = new HiveMetaStoreClient(conf); - txnHandler = new CompactionTxnHandler(conf); + txnHandler = TxnUtils.getTxnStore(conf); tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + "compactor_test_tables"); tmpdir.mkdir(); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 913c8bc..17634f0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -25,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.Test; import java.util.ArrayList; @@ -73,7 +73,7 @@ public void cleanupAfterMajorTableCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, null); @@ -105,7 +105,7 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, p); @@ -135,7 +135,7 @@ public void cleanupAfterMinorTableCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, null); @@ -174,7 +174,7 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, p); @@ -329,7 +329,7 @@ public void notBlockedBySubsequentLock() throws Exception { rsp = txnHandler.showCompact(new ShowCompactRequest()); compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test @@ -403,7 +403,7 @@ public void partitionNotBlockedBySubsequentLock() throws Exception { rsp = txnHandler.showCompact(new ShowCompactRequest()); compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test @@ -429,7 +429,7 @@ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); // Check that the files are removed List paths = getDirectories(conf, t, p); @@ -460,7 +460,7 @@ public void droppedTable() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test @@ -488,7 +488,7 @@ public void droppedPartition() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Override boolean useHive130DeltaDirName() { diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 07f477d..f84bd7e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -17,13 +17,12 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.Before; import org.junit.Test; @@ -205,12 +204,12 @@ public void cleanEmptyAbortedTxns() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) { + for (int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) { txnid = openTxn(); txnHandler.abortTxn(new AbortTxnRequest(txnid)); } GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); - Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); + Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); startInitiator(); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 8862402..381eeb3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -22,13 +22,18 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -935,7 +940,7 @@ public void droppedTable() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState())); } @Test @@ -960,6 +965,6 @@ public void droppedPartition() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); - Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } }