diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 5dfa7ca974..6f64290120 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -55,6 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.getMSForConf; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; /** @@ -224,7 +224,7 @@ private static String idWatermark(CompactionInfo ci) { return " id=" + ci.id; } private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) - throws IOException, NoSuchObjectException { + throws IOException, NoSuchObjectException, MetaException { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from( false), false, null, false); @@ -258,8 +258,8 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti } FileSystem fs = filesToDelete.get(0).getFileSystem(conf); - Database db = rs.getDatabase(getDefaultCatalog(conf), ci.dbname); - Boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); + Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname); + boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 99da86f910..b378d40964 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -53,7 +53,7 @@ protected static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); protected HiveConf conf; - protected RawStore rs; + protected AtomicBoolean stop; protected AtomicBoolean looped; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java index 8bfb524ac6..aa258b331f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java @@ -18,14 +18,11 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.metastore.MetaStoreThread; -import org.apache.hadoop.hive.metastore.RawStore; -import org.apache.hadoop.hive.metastore.RawStoreProxy; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -36,6 +33,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.getMSForConf; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; /** @@ -45,7 +43,6 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaStoreThread { protected TxnStore txnHandler; - protected RawStore rs; protected int threadId; @Override @@ -59,15 +56,11 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { // Get our own instance of the transaction handler txnHandler = TxnUtils.getTxnStore(conf); - - // Get our own connection to the database so we can get table and partition information. - rs = RawStoreProxy.getProxy(conf, conf, - MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId); } @Override Table resolveTable(CompactionInfo ci) throws MetaException { try { - return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); + return getMSForConf(conf).getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); } catch (MetaException e) { LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage()); throw e; @@ -76,7 +69,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { try { - Database database = rs.getDatabase(getDefaultCatalog(conf), dbName); + Database database = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName); // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); if (isReplCompactDisabled) { @@ -91,7 +84,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException { try { - return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, + return getMSForConf(conf).getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName, Collections.singletonList(ci.partName)); } catch (MetaException e) { LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);