diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index dc7b2877bf..b0a2a8d20d 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -142,12 +142,14 @@ public void setup() throws Exception { hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.cleanDb(hiveConf); TxnDbUtil.prepDb(hiveConf); conf = hiveConf; HiveConf.setBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS, true); + HiveConf.setIntVar(conf, ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0); msClient = new HiveMetaStoreClient(conf); driver = DriverFactory.newDriver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); @@ -793,6 +795,272 @@ private void majorCompactAfterAbort(boolean newStreamingAPI) throws Exception { checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); } + @Test + public void testCleanAbortCompactAfterAbortTwoPartitions() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection1 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableTwoPartitionsAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCompactAfterAbort() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + // Create three folders with two different transactions + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,3".getBytes()); + connection2.write("2,3".getBytes()); + connection2.write("3,3".getBytes()); + connection2.abortTransaction(); + + assertAndCompactCleanAbort(dbName, tblName); + + connection1.close(); + connection2.close(); + } + + private void assertAndCompactCleanAbort(String dbName, String tblName) throws Exception { + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (3 != stat.length) { + Assert.fail("Expecting three directories corresponding to three partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + stat = + fs.listStatus(new Path(table.getSd().getLocation())); + if (0 != stat.length) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompacts().size()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename()); + Assert.assertEquals(CompactionType.CLEAN_ABORTED, + rsp.getCompacts().get(0).getType()); + } + + @Test + public void testCleanAbortCompactSeveralTables() throws Exception { + String dbName = "default"; + String tblName1 = "cws1"; + String tblName2 = "cws2"; + + HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName1, 1); + HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName2, 1); + + connection1.beginTransaction(); + connection1.write("1,1".getBytes()); + connection1.write("2,2".getBytes()); + connection1.abortTransaction(); + + connection2.beginTransaction(); + connection2.write("1,1".getBytes()); + connection2.write("2,2".getBytes()); + connection2.abortTransaction(); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + FileSystem fs = FileSystem.get(conf); + Table table1 = msClient.getTable(dbName, tblName1); + FileStatus[] stat = + fs.listStatus(new Path(table1.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + Table table2 = msClient.getTable(dbName, tblName2); + stat = fs.listStatus(new Path(table2.getSd().getLocation())); + if (2 != stat.length) { + Assert.fail("Expecting two directories corresponding to two partitions, FileStatus[] stat " + Arrays.toString(stat)); + } + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 2, count); + + runInitiator(conf); + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_TYPE='p'"); + // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table + // with rows in TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + + runCleaner(conf); + + // After the cleaner runs TXN_COMPONENTS and COMPACTION_QUEUE should have zero rows, also the folders should have been deleted. + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + stat = + fs.listStatus(new Path(table1.getSd().getLocation())); + if (0 != stat.length) { + Assert.fail("Expecting compaction to have cleaned the directories, FileStatus[] stat " + Arrays.toString(stat)); + } + + connection1.close(); + connection2.close(); + } + + @Test + public void testCleanAbortCorrectlyCleaned() throws Exception { + // Test that at commit the tables are cleaned properly + String dbName = "default"; + String tblName = "cws"; + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 1, count); + + connection.commitTransaction(); + + // After commit the row should have been deleted + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where TC_OPERATION_TYPE='p'"); + // We should have two rows corresponding to the two aborted transactions + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + } + + @Test + public void testCleanAbortAndMinorCompact() throws Exception { + String dbName = "default"; + String tblName = "cws"; + + HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1); + + connection.beginTransaction(); + connection.write("1,1".getBytes()); + connection.write("2,2".getBytes()); + connection.abortTransaction(); + + executeStatementOnDriver("insert into " + tblName + " partition (a) values (1, '1')", driver); + + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + runInitiator(conf); + runWorker(conf); + + int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count); + // Cleaning should happen in threads concurrently for the minor compaction and the clean abort one. + runCleaner(conf); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 0, count); + + count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS"); + Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 0, count); + + } + + private HiveStreamingConnection prepareTableAndConnection(String dbName, String tblName, int batchSize) throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(b STRING) " + + " PARTITIONED BY (a INT)" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + // Create three folders with two different transactions + return HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo(agentInfo) + .withHiveConf(conf) + .withRecordWriter(writer) + .withStreamingOptimizations(true) + // Transaction size has to be one or exception should happen. + .withTransactionBatchSize(batchSize) + .connect(); + } + + private HiveStreamingConnection prepareTableTwoPartitionsAndConnection(String dbName, String tblName, int batchSize) throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(c STRING) " + + " PARTITIONED BY (a INT, b INT)" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + + // Create three folders with two different transactions + return HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withAgentInfo(agentInfo) + .withHiveConf(conf) + .withRecordWriter(writer) + .withStreamingOptimizations(true) + // Transaction size has to be one or exception should happen. + .withTransactionBatchSize(batchSize) + .connect(); + } @Test public void mmTable() throws Exception { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 5dbf634825..b5ef4200b6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -42,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -2532,4 +2535,76 @@ public static void validateAcidPartitionLocation(String location, Configuration throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ex.getMessage()), ex); } } + + /** + * Look for delta directories matching the list of writeIds and deletes them. + * @param rootPartition root partition to look for the delta directories + * @param conf configuration + * @param writeIds list of writeIds to look for in the delta directories + * @return list of deleted directories. + * @throws IOException + */ + public static List deleteDeltaDirectories(Path rootPartition, + Configuration conf, Set writeIds) throws IOException { + FileSystem fs = rootPartition.getFileSystem(conf); + PathFilter filter = (p) -> { + String name = p.getName(); + for (Long wId: writeIds) { + if (name.startsWith(deltaSubdir(wId, wId)) && !name.contains("=")) { + return true; + } + } + return false; + }; + List deleted = new ArrayList<>(); + deleteDeltaDirectoriesAux(rootPartition, fs, filter, deleted); + return deleted; + } + + private static void deleteDeltaDirectoriesAux(Path root, FileSystem fs, PathFilter filter, List deleted) throws IOException { + RemoteIterator it = listIterator(fs, root, null); + while (it.hasNext()) { + FileStatus fStatus = it.next(); + if (fStatus.isDirectory()) { + if (filter.accept(fStatus.getPath())) { + fs.delete(fStatus.getPath(), true); + deleted.add(fStatus); + } else { + deleteDeltaDirectoriesAux(fStatus.getPath(), fs, filter, deleted); + if (isDirectoryEmpty(fs, fStatus.getPath())) { + fs.delete(fStatus.getPath(), false); + deleted.add(fStatus); + } + } + } + } + } + + private static boolean isDirectoryEmpty(FileSystem fs, Path path) throws IOException { + RemoteIterator it = listIterator(fs, path, null); + return !it.hasNext(); + } + + private static RemoteIterator listIterator(FileSystem fs, Path path, PathFilter filter) + throws IOException { + try { + return new ToFileStatusIterator(SHIMS.listLocatedHdfsStatusIterator(fs, path, filter)); + } catch (Throwable t) { + return HdfsUtils.listLocatedStatusIterator(fs, path, filter); + } + } + + static class ToFileStatusIterator implements RemoteIterator { + private final RemoteIterator it; + ToFileStatusIterator(RemoteIterator it) { + this.it = it; + } + @Override public boolean hasNext() throws IOException { + return it.hasNext(); + } + + @Override public FileStatus next() throws IOException { + return it.next().getFileStatus(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java index 3482cfce36..84540024a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java @@ -94,6 +94,63 @@ public static long createTestFileId( return result; } + /** + * Lists files and folder in path and filters them according to the filter. + * Returns an interator. + * @param fs filesystem + * @param path path to ls + * @param filter filter to apply to the found paths when lsing. + * @return iterator with the files and paths. + * @throws IOException + */ + public static RemoteIterator listLocatedStatusIterator(final FileSystem fs, + final Path path, + final PathFilter filter + ) throws IOException { + return new FilterRemoteIterator(fs.listLocatedStatus(path), filter); + } + + static class FilterRemoteIterator implements RemoteIterator { + private final RemoteIterator it; + private final PathFilter filter; + private boolean nextUsed = true; + private FileStatus next = null; + + FilterRemoteIterator(RemoteIterator it, PathFilter filter) { + this.it = it; + this.filter = filter; + } + + @Override public boolean hasNext() throws IOException { + if (!nextUsed) { + return next != null; + } + next = getNext(); + nextUsed = false; + return next != null; + } + + @Override public FileStatus next() throws IOException { + if (!nextUsed) { + nextUsed = true; + return next; + } + return getNext(); + } + + private FileStatus getNext() throws IOException { + while (it.hasNext()) { + FileStatus fStatus = it.next(); + if (filter == null) { + return fStatus; + } else if (filter.accept(fStatus.getPath())) { + return fStatus; + } + } + return null; + } + } + // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this // is still going to work. Otherwise, file IDs can be turned off. Later, we should use // as public utility method in HDFS to obtain the inode-based path. diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 06b0209aa0..9fefc55cb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -25,9 +25,11 @@ import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -49,8 +51,13 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,11 +72,31 @@ private long cleanerCheckInterval = 0; private ReplChangeManager replChangeManager; + private CleanerExecutorService executorService; + + private final int KEEPALIVE_SECONDS = 60; + + /** + * These locks prevent a concurrent p-clean(where the whole table will be scanned) + * and partition clean on a table. When a partition clean needs to happen + * a shared lock will be acquired and when a p-clean happens an exclusive one + * will be acquired. + */ + private final Map tableLock = new ConcurrentHashMap<>(); @Override public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception { super.init(stop, looped); replChangeManager = ReplChangeManager.getInstance(conf); + int coreThreads = MetastoreConf.getIntVar(conf, + MetastoreConf.ConfVars.COMPACTOR_CORE_CLEANER_THREADS); + int maxThreads = MetastoreConf.getIntVar(conf, + MetastoreConf.ConfVars.COMPACTOR_MAX_CLEANER_THREADS); + assert coreThreads > 0; + assert maxThreads > 0; + assert maxThreads >= coreThreads; + executorService = new CleanerExecutorService(coreThreads, maxThreads, KEEPALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + ShutdownHookManager.addShutdownHook(executorService::shutdownNow); } @Override @@ -92,9 +119,39 @@ public void run() { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); long minOpenTxnId = txnHandler.findMinOpenTxnId(); - for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { - clean(compactionInfo, minOpenTxnId); + Collection calls = new ArrayList<>(); + List cis = txnHandler.findReadyToClean(); + LOG.info("Found " + cis.size() + " potentials compations to clean"); + + HashMap dbs = new HashMap<>(); + HashMap tbls = new HashMap<>(); + for(CompactionInfo compactionInfo : cis) { + Database db = null; + if (dbs.containsKey(compactionInfo.dbname)) { + db = dbs.get(compactionInfo.dbname); + } else { + db = rs.getDatabase(getDefaultCatalog(conf), compactionInfo.dbname); + dbs.put(compactionInfo.dbname, db); + } + + Table tbl = null; + String tableKey = compactionInfo.dbname + "." + compactionInfo.tableName; + if (tbls.containsKey(tableKey)) { + tbl = tbls.get(tableKey); + } else { + tbl = resolveTable(compactionInfo); + tbls.put(tableKey, tbl); + } + calls.add(new CleanWork(compactionInfo, minOpenTxnId, db, tbl, resolvePartition(compactionInfo))); } + // We have to wait now for all the Callables to finish before proceeding + // because otherwise we could start again the same cleaning work. When the + // cleaning work finishes markCleaned will be called and rows will be removed + // from TXN_COMPONENTS that should prevent this. + // TODO: optimize this so we don't have this constraint. + // Maybe adding a new state besides READY_FOR_CLEANING, like CLEANING_RUNNING + executorService.executeAll(calls); + } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException(t)); @@ -103,6 +160,7 @@ public void run() { if (handle != null) { handle.releaseLocks(); } + tableLock.clear(); } if (setLooped) { looped.set(true); @@ -121,150 +179,299 @@ public void run() { } while (!stop.get()); } - private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException { - LOG.info("Starting cleaning for " + ci); - try { - Table t = resolveTable(ci); - if (t == null) { - // The table was dropped before we got around to cleaning it. - LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + - idWatermark(ci)); - txnHandler.markCleaned(ci); + private boolean tryLockTable(String tableName, boolean shared) { + NonReentrantReadWriteLock rwLock = tableLock.computeIfAbsent(tableName, (k) -> new NonReentrantReadWriteLock()); + if (shared) { + return rwLock.tryAcquireRead(); + } else { + return rwLock.tryAcquireWrite(); + } + } + + private void unlockTable(String tableName, boolean shared) { + NonReentrantReadWriteLock rwLock = tableLock.get(tableName); + if (shared) { + rwLock.releaseRead(); + } else { + rwLock.releaseWrite(); + } + } + + private static String idWatermark(CompactionInfo ci) { + return " id=" + ci.id; + } + + /** + * In the first one we scan all the directories and delete files of + * transaction that were aborted before addDynamicPartitions is called. + * The second one does the "regular" clean and removes files as a result + * of compaction and other kinds of aborted transactions. + */ + private class CleanWork implements Runnable, + CleanerExecutorService.CleanWorkI { + final CompactionInfo ci; + final long minOpenTxnGLB; + final Database db; + final Table tbl; + final Partition partition; + + /** + * Contructor that corresponds to the second kind of clean work. + * @param ci compaction info. + */ + CleanWork(CompactionInfo ci, long minOpenTxnGLB, Database db, Table tbl, Partition partition) { + this.ci = ci; + this.minOpenTxnGLB = minOpenTxnGLB; + this.db = db; + this.tbl = tbl; + this.partition = partition; + } + + @Override + public void run() { + try { + clean(); + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor cleaner, " + + StringUtils.stringifyException(t)); + } + } + + void clean() throws MetaException { + if (ci.isCleanAbortedCompaction()) { + cleanAborted(); + } else { + cleanRegular(); + } + } + + private void cleanAborted() throws MetaException { + if (ci.writeIds == null || ci.writeIds.size() == 0) { + LOG.warn("Attempted cleaning aborted transaction with empty writeId list"); return; } - Partition p = null; - if (ci.partName != null) { - p = resolvePartition(ci); - if (p == null) { - // The partition was dropped before we got around to cleaning it. - LOG.info("Unable to find partition " + ci.getFullPartitionName() + - ", assuming it was dropped." + idWatermark(ci)); + LOG.info("Starting abort cleaning for table " + ci.getFullTableName() + + ". This will scan all the partition directories."); + try { + if (tbl == null) { + // The table was dropped before we got around to cleaning it. + LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + + idWatermark(ci)); + txnHandler.markCleaned(ci); + return; + } + + StorageDescriptor sd = resolveStorageDescriptor(tbl, null); + + if (runJobAsSelf(ci.runAs)) { + rmFilesClean(sd.getLocation(), ci); + } else { + LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + rmFilesClean(sd.getLocation(), ci); + return null; + } + }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + + ci.getFullPartitionName() + idWatermark(ci), exception); + } + } + txnHandler.markCleaned(ci); + } catch (Exception e) { + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + + StringUtils.stringifyException(e)); + txnHandler.markFailed(ci); + } + } + + private void cleanRegular() throws MetaException { + LOG.info("Starting cleaning for " + ci); + try { + if (tbl == null) { + // The table was dropped before we got around to cleaning it. + LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + + idWatermark(ci)); txnHandler.markCleaned(ci); return; } + if (ci.partName != null) { + if (partition == null) { + // The partition was dropped before we got around to cleaning it. + LOG.info("Unable to find partition " + ci.getFullPartitionName() + + ", assuming it was dropped." + idWatermark(ci)); + txnHandler.markCleaned(ci); + return; + } + } + StorageDescriptor sd = resolveStorageDescriptor(tbl, partition); + final String location = sd.getLocation(); + ValidTxnList validTxnList = + TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); + //save it so that getAcidState() sees it + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + /** + * {@code validTxnList} is capped by minOpenTxnGLB so if + * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta + * produced by a compactor, that means every reader that could be active right now see it + * as well. That means if this base/delta shadows some earlier base/delta, the it will be + * used in favor of any files that it shadows. Thus the shadowed files are safe to delete. + * + * + * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted + * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID. + * See {@link TxnStore#markCleaned(CompactionInfo)} for details. + * For example given partition P1, txnid:150 starts and sees txnid:149 as open. + * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved + * writeId:17. Compactor will produce base_17_c160. + * Suppose txnid:149 writes delta_18_18 + * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries + * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but + * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by + * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the + * metadata that says that 18 is aborted. + * In a slightly different case, whatever txn created delta_18 (and all other txn) may have + * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove + * it (since it has nothing but aborted data). But we can't tell which actually happened + * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID. + * + * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up + * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed + * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be + * useful if there is all of a sudden a flood of aborted txns. (For another day). + */ + List tblNames = Collections.singletonList( + TableName.getDbTable(tbl.getDbName(), tbl.getTableName())); + GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames); + rqst.setValidTxnList(validTxnList.writeToString()); + GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); + //we could have no write IDs for a table if it was never written to but + // since we are in the Cleaner phase of compactions, there must have + // been some delta/base dirs + assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; + //Creating 'reader' list since we are interested in the set of 'obsolete' files + ValidReaderWriteIdList validWriteIdList = + TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)); + + if (runJobAsSelf(ci.runAs)) { + rmFilesRegular(location, validWriteIdList, ci); + } else { + LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, + UserGroupInformation.getLoginUser()); + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + rmFilesRegular(location, validWriteIdList, ci); + return null; + } + }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + + ci.getFullPartitionName() + idWatermark(ci), exception); + } + } + txnHandler.markCleaned(ci); + } catch (Exception e) { + LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + + StringUtils.stringifyException(e)); + txnHandler.markFailed(ci); + } + } + + private void rmFilesClean(String rootLocation, CompactionInfo ci) throws IOException, NoSuchObjectException { + List deleted = AcidUtils.deleteDeltaDirectories(new Path(rootLocation), conf, ci.writeIds); + + if (deleted.size() == 0) { + LOG.info("No files were deleted in the clean abort compaction: " + idWatermark(ci)); + return; + } + + FileSystem fs = deleted.get(0).getPath().getFileSystem(conf); + Boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); + + for (FileStatus dead : deleted) { + Path deadPath = dead.getPath(); + LOG.debug("Deleted path " + deadPath.toString()); + if (isSourceOfRepl) { + replChangeManager.recycle(deadPath, ReplChangeManager.RecycleType.MOVE, true); + } + fs.delete(deadPath, true); } - StorageDescriptor sd = resolveStorageDescriptor(t, p); - final String location = sd.getLocation(); - ValidTxnList validTxnList = - TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); - //save it so that getAcidState() sees it - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + } + + private void rmFilesRegular(String location, ValidWriteIdList writeIdList, CompactionInfo ci) + throws IOException, NoSuchObjectException { + Path locPath = new Path(location); + AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); + List obsoleteDirs = dir.getObsolete(); /** - * {@code validTxnList} is capped by minOpenTxnGLB so if - * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta - * produced by a compactor, that means every reader that could be active right now see it - * as well. That means if this base/delta shadows some earlier base/delta, the it will be - * used in favor of any files that it shadows. Thus the shadowed files are safe to delete. - * - * - * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted - * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID. - * See {@link TxnStore#markCleaned(CompactionInfo)} for details. - * For example given partition P1, txnid:150 starts and sees txnid:149 as open. - * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved - * writeId:17. Compactor will produce base_17_c160. - * Suppose txnid:149 writes delta_18_18 - * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries - * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but - * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by - * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the - * metadata that says that 18 is aborted. - * In a slightly different case, whatever txn created delta_18 (and all other txn) may have - * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove - * it (since it has nothing but aborted data). But we can't tell which actually happened - * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID. - * - * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up - * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed - * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be - * useful if there is all of a sudden a flood of aborted txns. (For another day). + * add anything in 'dir' that only has data from aborted transactions - no one should be + * trying to read anything in that dir (except getAcidState() that only reads the name of + * this dir itself) + * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there + * are no active txns when cleaner runs). The key is to not delete metadata about aborted + * txns with write IDs > {@link CompactionInfo#highestWriteId}. + * See {@link TxnStore#markCleaned(CompactionInfo)} */ - List tblNames = Collections.singletonList( - TableName.getDbTable(t.getDbName(), t.getTableName())); - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames); - rqst.setValidTxnList(validTxnList.writeToString()); - GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst); - //we could have no write IDs for a table if it was never written to but - // since we are in the Cleaner phase of compactions, there must have - // been some delta/base dirs - assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; - //Creating 'reader' list since we are interested in the set of 'obsolete' files - ValidReaderWriteIdList validWriteIdList = - TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)); - - if (runJobAsSelf(ci.runAs)) { - removeFiles(location, validWriteIdList, ci); - } else { - LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); - UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, - UserGroupInformation.getLoginUser()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - removeFiles(location, validWriteIdList, ci); - return null; - } - }); - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - ci.getFullPartitionName() + idWatermark(ci), exception); + obsoleteDirs.addAll(dir.getAbortedDirectories()); + List filesToDelete = new ArrayList<>(obsoleteDirs.size()); + StringBuilder extraDebugInfo = new StringBuilder("["); + for (FileStatus stat : obsoleteDirs) { + filesToDelete.add(stat.getPath()); + extraDebugInfo.append(stat.getPath().getName()).append(","); + if(!FileUtils.isPathWithinSubtree(stat.getPath(), locPath)) { + LOG.info(idWatermark(ci) + " found unexpected file: " + stat.getPath()); + } + } + extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); + LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); + if (filesToDelete.size() < 1) { + LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location + + ", that hardly seems right."); + return; + } + + FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + Boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); + + for (Path dead : filesToDelete) { + LOG.debug("Deleted path " + dead.toString()); + if (isSourceOfRepl) { + replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); } + fs.delete(dead, true); } - txnHandler.markCleaned(ci); - } catch (Exception e) { - LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + - StringUtils.stringifyException(e)); - txnHandler.markFailed(ci); } - } - private static String idWatermark(CompactionInfo ci) { - return " id=" + ci.id; - } - private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) - throws IOException, NoSuchObjectException { - Path locPath = new Path(location); - AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); - List obsoleteDirs = dir.getObsolete(); + /** - * add anything in 'dir' that only has data from aborted transactions - no one should be - * trying to read anything in that dir (except getAcidState() that only reads the name of - * this dir itself) - * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there - * are no active txns when cleaner runs). The key is to not delete metadata about aborted - * txns with write IDs > {@link CompactionInfo#highestWriteId}. - * See {@link TxnStore#markCleaned(CompactionInfo)} + * Gives higher priority to regular clean tasks. Lower value + * means more priority + * @return priority. */ - obsoleteDirs.addAll(dir.getAbortedDirectories()); - List filesToDelete = new ArrayList<>(obsoleteDirs.size()); - StringBuilder extraDebugInfo = new StringBuilder("["); - for (FileStatus stat : obsoleteDirs) { - filesToDelete.add(stat.getPath()); - extraDebugInfo.append(stat.getPath().getName()).append(","); - if(!FileUtils.isPathWithinSubtree(stat.getPath(), locPath)) { - LOG.info(idWatermark(ci) + " found unexpected file: " + stat.getPath()); + public int getPriority() { + if (ci.isCleanAbortedCompaction()) { + return 2; + } else { + return 1; } } - extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); - LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + - " obsolete directories from " + location + ". " + extraDebugInfo.toString()); - if (filesToDelete.size() < 1) { - LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location + - ", that hardly seems right."); - return; - } - FileSystem fs = filesToDelete.get(0).getFileSystem(conf); - Database db = rs.getDatabase(getDefaultCatalog(conf), ci.dbname); - Boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); + public boolean tryAcquire() { + return tryLockTable(ci.getFullTableName(), !ci.isCleanAbortedCompaction()); + } - for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); - if (isSourceOfRepl) { - replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); - } - fs.delete(dead, true); + public void release() { + unlockTable(ci.getFullTableName(), !ci.isCleanAbortedCompaction()); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanerExecutorService.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanerExecutorService.java new file mode 100644 index 0000000000..2832e56577 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanerExecutorService.java @@ -0,0 +1,173 @@ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The way to use this class would be + * {@code + * cleanerExecutorService.executeAll(tasks, comparator); + * } + */ +public class CleanerExecutorService extends ThreadPoolExecutor { + + static final private Logger LOG = LoggerFactory.getLogger(CleanerExecutorService.class.getName()); + + private final int corePoolSize; + + private AtomicInteger finishedTasks; + private CountDownLatch latch; + private final Lock lock = new ReentrantLock(); + private Condition condition; + private Set toRelease; + + public CleanerExecutorService(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + this.corePoolSize = corePoolSize; + assert corePoolSize > 0; + } + + /** + * This executor will try to acquire the resources for the thread to run + * before executing the runnable. This way the runnable doesn't start + * and gets blocked acquiring the resources, when other runnables would + * be able to run. + * + * Submit several tasks. This method will respect the priority of the tasks. + * @param tasksCollection tasks to execute. + */ + public void executeAll(Collection tasksCollection) throws InterruptedException { + Comparator comparator = Comparator.comparing(CleanWorkI::getPriority); + PriorityQueue tasksQueue = new PriorityQueue<>(corePoolSize, comparator); + tasksQueue.addAll(tasksCollection); + + int remaining = tasksCollection.size(); + latch = new CountDownLatch(tasksCollection.size()); + finishedTasks = new AtomicInteger(0); + condition = lock.newCondition(); + toRelease = ConcurrentHashMap.newKeySet(); + + Collection toReAdd = new HashSet<>(); + + + int beforefinishedTasks = 0; + + while (remaining > 0) { + int currentSize = tasksQueue.size(); + toReAdd.clear(); + + // From the remaining items in the queue, we try to submit the most we can + while (currentSize > 0) { + CleanWorkI task = tasksQueue.poll(); + + if (task == null) { + LOG.info("null value found submitted to CleanerExecutorService. This shouldn't happen"); + remaining--; + currentSize--; + continue; + } + + boolean acquired = task.tryAcquire(); + if (acquired) { + execute(task); + remaining--; + } else { + // read to the queue the resources can't be acquired now. + toReAdd.add(task); + } + currentSize--; + } + tasksQueue.addAll(toReAdd); + + if (remaining > 0) { + try { + lock.lock(); + int afterFinishedTasks = finishedTasks.get(); + if (afterFinishedTasks > beforefinishedTasks) { + // If some tasks have finish maybe now we can acquire the resources. + // Iterate again to see if we submit anything. + } else { + // If there are tasks remaining and no tasks have finished in this time + // then wait until any tasks finish to try to submit more. + condition.await(); + } + beforefinishedTasks = afterFinishedTasks; + // wait until a worker finishes to go on trying to submit. + } finally { + lock.unlock(); + } + releaseFinished(); + } + } + latch.await(); + releaseFinished(); + } + + synchronized private void releaseFinished() { + Set toReleaseCopy = new HashSet<>(); + for (CleanWorkI task: toRelease) { + task.release(); + toReleaseCopy.add(task); + } + for (CleanWorkI task: toReleaseCopy) { + toRelease.remove(task); + } + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + + // Resources must be released from the same thread that acquired them. + toRelease.add((CleanWorkI)r); + + try { + lock.lock(); + finishedTasks.incrementAndGet(); + condition.signal(); + } finally { + lock.unlock(); + } + latch.countDown(); + } + + public interface CleanWorkI extends Runnable { + /** + * Get the priority. + * @return priority. + */ + int getPriority(); + + /** + * Acquire resource if any. + * @return if the resources were acquired + */ + boolean tryAcquire(); + + /** + * Release resources. + */ + void release(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a0df82cb20..61caa3cca2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -96,7 +96,7 @@ public void run() { LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); for (CompactionInfo ci : potentials) { - LOG.info("Checking to see if we should compact " + ci.getFullPartitionName()); + LOG.info("Checking to see if we should compact " + ci.getFullPartitionName() + " with type " + ci.type); try { Table t = resolveTable(ci); if (t == null) { @@ -114,7 +114,7 @@ public void run() { // Check to see if this is a table level request on a partitioned table. If so, // then it's a dynamic partitioning case and we shouldn't check the table itself. - if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && + if (!ci.isCleanAbortedCompaction() && t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && ci.partName == null) { LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" + " partitioning"); @@ -140,7 +140,7 @@ public void run() { // Figure out who we should run the file operations as Partition p = resolvePartition(ci); - if (p == null && ci.partName != null) { + if (!ci.isCleanAbortedCompaction() && p == null && ci.partName != null) { LOG.info("Can't find partition " + ci.getFullPartitionName() + ", assuming it has been dropped and moving on."); continue; @@ -219,12 +219,15 @@ private boolean lookForCurrentCompactions(ShowCompactResponse compactions, CompactionInfo ci) { if (compactions.getCompacts() != null) { for (ShowCompactResponseElement e : compactions.getCompacts()) { - if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && + if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) && e.getDbname().equals(ci.dbname) && e.getTablename().equals(ci.tableName) && (e.getPartitionname() == null && ci.partName == null || e.getPartitionname().equals(ci.partName))) { return true; + } else if (e.getState().equals(TxnStore.CLEANING_RESPONSE) && e.getDbname().equals(ci.dbname) + && e.getTablename().equals(ci.tableName)) { + return true; } } } @@ -241,7 +244,11 @@ private CompactionType checkForCompaction(final CompactionInfo ci, if (ci.tooManyAborts) { LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " + "initiating major compaction"); - return CompactionType.MAJOR; + if (ci.isCleanAbortedCompaction()) { + return CompactionType.CLEAN_ABORTED; + } else { + return CompactionType.MAJOR; + } } if (runJobAsSelf(runAs)) { @@ -270,6 +277,10 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi StorageDescriptor sd, Map tblproperties) throws IOException, InterruptedException { + if (ci.isCleanAbortedCompaction()) { + return CompactionType.CLEAN_ABORTED; + } + boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/NonReentrantReadWriteLock.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/NonReentrantReadWriteLock.java new file mode 100644 index 0000000000..e69840c1a4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/NonReentrantReadWriteLock.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.hive.ql.txn.compactor; + +import java.util.concurrent.Semaphore; + + +public class NonReentrantReadWriteLock { + final int PERMITS = Integer.MAX_VALUE; + Semaphore semaphore = new Semaphore(PERMITS); + + boolean tryAcquireRead() { + return semaphore.tryAcquire(1); + } + + void releaseRead() { + semaphore.release(1); + } + + boolean tryAcquireWrite() { + return semaphore.tryAcquire(PERMITS); + } + + void releaseWrite() { + semaphore.release(PERMITS); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 5e085f84af..cacdd32d45 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -1750,7 +1750,7 @@ private void testMerge3Way(boolean cc) throws Exception { Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, + 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); //complete 1st txn long writeId = txnMgr.getTableWriteId("default", "target"); @@ -1816,7 +1816,7 @@ private void testMerge3Way(boolean cc) throws Exception { Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, + 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2)); //complete 2nd txn writeId = txnMgr2.getTableWriteId("default", "target"); @@ -2043,7 +2043,8 @@ public void testDynamicPartitionInsert() throws Exception { Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0, + // We have one before addDynamicPartitions in case the txn fails before. + 1, TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1)); //now actually write to table to generate some partitions checkCmdOnDriver(driver.run("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)")); @@ -2138,7 +2139,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0,//because it's using a DP write + 1,//because it's using a DP write TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1)); //complete T1 transaction (simulate writing to 2 partitions) long writeId = txnMgr.getTableWriteId("default", "target"); @@ -2174,7 +2175,7 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { Assert.assertEquals( "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " + TxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), - 0,//because it's using a DP write + 1,//because it's using a DP write TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2)); //complete T2 txn //simulate Insert into 2 partitions diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerExecutorService.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerExecutorService.java new file mode 100644 index 0000000000..dc46d8a0f5 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerExecutorService.java @@ -0,0 +1,191 @@ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class TestCleanerExecutorService { + private final Map + tableLock = new ConcurrentHashMap<>(); + private List ends = Collections.synchronizedList(new ArrayList<>()); + + + @Before + public void setup() { + ends.clear(); + } + + @Test + public void testPriorityIsRespected() throws Exception { + CleanerExecutorService service = new CleanerExecutorService(2, + 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + Collection tasks = new ArrayList<>(); + + tasks.add(new CleanWork(0, "A", false, 1)); + tasks.add(new CleanWork(1, "A", false, 1)); + tasks.add(new CleanWork(2, "A", false, 1)); + tasks.add(new CleanWork(3, "A", false, 1)); + tasks.add(new CleanWork(4, "A", false, 1)); + tasks.add(new CleanWork(-5, "A", false, 1)); + tasks.add(new CleanWork(-1, "A", false, 1)); + service.executeAll(tasks); + + List expected = Collections.synchronizedList(new ArrayList<>()); + expected.add(new CleanWork(-5, "A", false, 1)); + expected.add(new CleanWork(-1, "A", false, 1)); + expected.add(new CleanWork(0, "A", false, 1)); + expected.add(new CleanWork(1, "A", false, 1)); + expected.add(new CleanWork(2, "A", false, 1)); + expected.add(new CleanWork(3, "A", false, 1)); + expected.add(new CleanWork(4, "A", false, 1)); + + Assert.assertEquals(expected, ends); + service.shutdown(); + } + + @Test + public void testSharedRunConcurrently() throws Exception { + CleanerExecutorService service = new CleanerExecutorService(1, + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + Collection tasks = new ArrayList<>(); + + tasks.add(new CleanWork(0, "A", false, 1)); + tasks.add(new CleanWork(1, "A", false, 1)); + tasks.add(new CleanWork(2, "A", false, 1)); + tasks.add(new CleanWork(3, "A", true, 1)); + tasks.add(new CleanWork(4, "A", true, 1)); + tasks.add(new CleanWork(-5, "A", true, 1)); + tasks.add(new CleanWork(-1, "A", false, 1)); + service.executeAll(tasks); + + List expected = Collections.synchronizedList(new ArrayList<>()); + expected.add(new CleanWork(-5, "A", true, 1)); + expected.add(new CleanWork(3, "A", true, 1)); + expected.add(new CleanWork(4, "A", true, 1)); + expected.add(new CleanWork(-1, "A", false, 1)); + expected.add(new CleanWork(0, "A", false, 1)); + expected.add(new CleanWork(1, "A", false, 1)); + expected.add(new CleanWork(2, "A", false, 1)); + + Assert.assertEquals(expected, ends); + service.shutdown(); + } + + @Test + public void testSharedAreBlockedByExclusiveTwoTables() throws Exception { + CleanerExecutorService service = new CleanerExecutorService(2, + 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + Collection tasks = new ArrayList<>(); + + tasks.add(new CleanWork(2, "A", false, 1)); + tasks.add(new CleanWork(1, "A", false, 1)); + tasks.add(new CleanWork(4, "B", false, 2)); + tasks.add(new CleanWork(3, "B", false, 1)); + tasks.add(new CleanWork(-5, "A", true, 5)); + tasks.add(new CleanWork(-1, "B", true, 1)); + tasks.add(new CleanWork(0, "A", false, 1)); + service.executeAll(tasks); + + List expected = Collections.synchronizedList(new ArrayList<>()); + expected.add(new CleanWork(-1, "B", true, 1)); + expected.add(new CleanWork(3, "B", false, 1)); + expected.add(new CleanWork(4, "B", false, 2)); + expected.add(new CleanWork(-5, "A", true, 5)); + + expected.add(new CleanWork(0, "A", false, 1)); + expected.add(new CleanWork(1, "A", false, 1)); + expected.add(new CleanWork(2, "A", false, 1)); + + Assert.assertEquals(expected, ends); + service.shutdown(); + } + + private boolean tryLockTable(String tableName, boolean shared) { + NonReentrantReadWriteLock + rwLock = tableLock.computeIfAbsent(tableName, (k) -> new NonReentrantReadWriteLock()); + if (shared) { + return rwLock.tryAcquireRead(); + } else { + return rwLock.tryAcquireWrite(); + } + } + + private void unlockTable(String tableName, boolean shared) { + NonReentrantReadWriteLock rwLock = tableLock.get(tableName); + if (shared) { + rwLock.releaseRead(); + } else { + rwLock.releaseWrite(); + } + } + + private class CleanWork implements Runnable, + CleanerExecutorService.CleanWorkI { + int priority; + String name; + boolean shared; + double sleep; + + public CleanWork(int priority, String name, boolean shared, double sleep) { + this.priority = priority; + this.name = name; + this.shared = shared; + this.sleep = sleep; + } + + @Override + public int getPriority() { + return priority; + } + + @Override + public boolean tryAcquire() { + return tryLockTable(name, shared); + } + + @Override + public void release() { + unlockTable(name, shared); + } + + @Override + public void run() { + try { + Thread.sleep((int)(sleep * 100)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + ends.add(this); + } + + @Override public String toString() { + return "CleanWork{" + "priority=" + priority + ", name='" + name + + '\'' + ", shared=" + shared + ", sleep=" + sleep + '}'; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + CleanWork cleanWork = (CleanWork) o; + return priority == cleanWork.priority && shared == cleanWork.shared + && sleep == cleanWork.sleep && Objects.equals(name, cleanWork.name); + } + + @Override public int hashCode() { + + return Objects.hash(priority, name, shared, sleep); + } + } +} diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index b6f70ebe63..4905a4d837 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -794,6 +794,95 @@ public Long getFileId() { return result; } + @Override + public RemoteIterator listLocatedHdfsStatusIterator( + FileSystem fs, Path path, PathFilter filter) throws IOException { + return new FileIterator(fs, path, filter); + } + + class FileIterator implements RemoteIterator { + private final DFSClient dfsc; + private final URI fsUri; + private final Path p; + private final String src; + + private DirectoryListing current; + private PathFilter filter; + private int i = 0; + private HdfsFileStatusWithId next = null; + private boolean nextUsed = true; + + org.apache.hadoop.hdfs.protocol.HdfsFileStatus[] hfss = null; + + FileIterator(FileSystem fs, Path p, PathFilter filter) throws IOException { + DistributedFileSystem dfs = ensureDfs(fs); + dfsc = dfs.getClient(); + src = p.toUri().getPath(); + current = dfsc.listPaths(src, + org.apache.hadoop.hdfs.protocol.HdfsFileStatus.EMPTY_NAME, true); + if (current == null) { // the directory does not exist + throw new FileNotFoundException("File " + p + " does not exist."); + } + fsUri = fs.getUri(); + this.filter = filter; + this.p = p; + } + + @Override + public boolean hasNext() throws IOException { + if (!nextUsed) { + return next != null; + } + next = getNext(); + nextUsed = false; + return next != null; + } + + @Override + public HdfsFileStatusWithId next() throws IOException { + if (!nextUsed) { + nextUsed = true; + return next; + } + return getNext(); + } + + private HdfsFileStatusWithId getNext() throws IOException { + if (!nextUsed) { + return next; + } + while (current != null) { + // First time we call getNext + if (hfss == null) { + hfss = current.getPartialListing(); + i = 0; + } else if (hfss.length == i) { + current = current.hasMore() ? dfsc + .listPaths(src, current.getLastName(), true) : null; + if (current == null) { + return null; + } + hfss = current.getPartialListing(); + i = 0; + } + + while (i < hfss.length) { + HdfsLocatedFileStatus next = (HdfsLocatedFileStatus) (hfss[i]); + i++; + if (filter != null) { + Path filterPath = next.getFullPath(p).makeQualified(fsUri, null); + if (!filter.accept(filterPath)) + continue; + } + LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, p); + return new HdfsFileStatusWithIdImpl(lfs, next.getFileId()); + } + } + return null; + + } + } + private DistributedFileSystem ensureDfs(FileSystem fs) { if (!(fs instanceof DistributedFileSystem)) { throw new UnsupportedOperationException("Only supported for DFS; got " + fs.getClass()); diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index c569b242ae..2afbc5f841 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -26,6 +26,7 @@ import java.security.NoSuchAlgorithmException; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeMap; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.LongWritable; @@ -253,6 +255,18 @@ RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter repor List listLocatedHdfsStatus( FileSystem fs, Path path, PathFilter filter) throws IOException; + /** + * Returns an interator of the list of files in a directory. Useful + * when a big number of files/directories are expected to be listed. + * @param fs filesystem + * @param path path to list + * @param filter filter to apply to the files and folders int the path + * @return iterator with the listed files + * @throws IOException + */ + RemoteIterator listLocatedHdfsStatusIterator( + FileSystem fs, Path path, PathFilter filter) throws IOException; + /** * For file status returned by listLocatedStatus, convert them into a list * of block locations. diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java index 7450b27cf3..b9b4e5d0ba 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java +++ standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionType.java @@ -13,7 +13,8 @@ public enum CompactionType implements org.apache.thrift.TEnum { MINOR(1), - MAJOR(2); + MAJOR(2), + CLEAN_ABORTED(3); private final int value; @@ -38,6 +39,8 @@ public static CompactionType findByValue(int value) { return MINOR; case 2: return MAJOR; + case 3: + return CLEAN_ABORTED; default: return null; } diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 13e287e352..9841f410f6 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -99,9 +99,11 @@ final class LockType { final class CompactionType { const MINOR = 1; const MAJOR = 2; + const CLEAN_ABORTED = 3; static public $__names = array( 1 => 'MINOR', 2 => 'MAJOR', + 3 => 'CLEAN_ABORTED', ); } diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 8f149d1d6e..20fc495143 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -143,15 +143,18 @@ class LockType: class CompactionType: MINOR = 1 MAJOR = 2 + CLEAN_ABORTED = 3 _VALUES_TO_NAMES = { 1: "MINOR", 2: "MAJOR", + 3: "CLEAN_ABORTED", } _NAMES_TO_VALUES = { "MINOR": 1, "MAJOR": 2, + "CLEAN_ABORTED": 3, } class GrantRevokeType: diff --git standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 9e5f0860f2..8dec1a2640 100644 --- standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -68,8 +68,9 @@ end module CompactionType MINOR = 1 MAJOR = 2 - VALUE_MAP = {1 => "MINOR", 2 => "MAJOR"} - VALID_VALUES = Set.new([MINOR, MAJOR]).freeze + CLEAN_ABORTED = 3 + VALUE_MAP = {1 => "MINOR", 2 => "MAJOR", 3 => "CLEAN_ABORTED"} + VALID_VALUES = Set.new([MINOR, MAJOR, CLEAN_ABORTED]).freeze end module GrantRevokeType diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index be1f8c7849..83c20b9cae 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -400,6 +400,15 @@ public static ConfVars getMetaConf(String name) { "tables or partitions to be compacted once they are determined to need compaction.\n" + "It will also increase the background load on the Hadoop cluster as more MapReduce jobs\n" + "will be running in the background."), + COMPACTOR_CORE_CLEANER_THREADS("metastore.compactor.cleaner.core.threads", + "hive.metastore.compactor.cleaner.threads", 2, + "Determines together with hive.metastore.compactor.max.threads how many cleaner threads" + + " will do clean related work. A way to know if more threads are need" + + " is to check the rows in table COMPACTION_QUEUE with cq_state='r' (ready for cleaning)." + + " If too many of these rows start to queue up a higher number is needed."), + COMPACTOR_MAX_CLEANER_THREADS("metastore.compactor.cleaner.max.threads", + "hive.metastore.compactor.max.threads", 4, + "Max threads that will be running for cleaning tasks"), COMPACTOR_MINOR_STATS_COMPRESSION( "metastore.compactor.enable.stats.compression", "metastore.compactor.enable.stats.compression", true, diff --git standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 9576f8775a..8f6df0f014 100644 --- standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -176,6 +176,7 @@ enum LockType { enum CompactionType { MINOR = 1, MAJOR = 2, + CLEAN_ABORTED = 3, } enum GrantRevokeType { diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index ea70503988..9dc24246ff 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -26,6 +26,9 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; +import java.util.Objects; +import java.util.Set; /** * Information on a possible or running compaction. @@ -63,6 +66,9 @@ private String fullPartitionName = null; private String fullTableName = null; + // This is used for the compactions of type 'p'. It won't be serialized. + public Set writeIds; + public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) { this.dbname = dbname; this.tableName = tableName; @@ -99,10 +105,19 @@ public String getFullTableName() { } return fullTableName; } + + public boolean isMinorCompaction() { + return CompactionType.MINOR == type; + } + public boolean isMajorCompaction() { return CompactionType.MAJOR == type; } + public boolean isCleanAbortedCompaction() { + return CompactionType.CLEAN_ABORTED == type; + } + @Override public int compareTo(CompactionInfo o) { return getFullPartitionName().compareTo(o.getFullPartitionName()); @@ -117,13 +132,17 @@ public String toString() { "properties:" + properties + "," + "runAs:" + runAs + "," + "tooManyAborts:" + tooManyAborts + "," + - "highestWriteId:" + highestWriteId; + "highestWriteId:" + highestWriteId + "," + + "writeIds:" + writeIds; } @Override public int hashCode() { int result = 17; result = 31 * result + this.getFullPartitionName().hashCode(); + if (isCleanAbortedCompaction()) { + result += Objects.hash(type); + } return result; } @@ -161,6 +180,7 @@ static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLExcept fullCi.hadoopJobId = rs.getString(13); return fullCi; } + static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException { pStmt.setLong(1, ci.id); pStmt.setString(2, ci.dbname); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 8253ccb9c9..eb74d8ac57 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -83,10 +83,11 @@ public CompactionTxnHandler() { } rs.close(); - // Check for aborted txns + // Check for aborted txns that are not 'p' type s = "select tc_database, tc_table, tc_partition " + "from TXNS, TXN_COMPONENTS " + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + + " and tc_operation_type <> '" + OperationType.ALL_PARTITIONS.getSqlConst() + "' " + "group by tc_database, tc_table, tc_partition " + "having count(*) > " + maxAborted; @@ -100,6 +101,26 @@ public CompactionTxnHandler() { info.tooManyAborts = true; response.add(info); } + rs.close(); + + // Check for aborted txns that are 'p' type + s = "select tc_database, tc_table, tc_partition, tc_operation_type " + + "from TXNS, TXN_COMPONENTS " + + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + + " and tc_operation_type = '" + OperationType.ALL_PARTITIONS.getSqlConst() + "' " + + "group by tc_database, tc_table, tc_partition, tc_operation_type "; + + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + info.type = dbCompactionType2ThriftType(rs.getString(4).charAt(0)); + info.tooManyAborts = true; + response.add(info); + } LOG.debug("Going to rollback"); dbConn.rollback(); @@ -259,14 +280,32 @@ public void markCompacted(CompactionInfo info) throws MetaException { info.tableName = rs.getString(3); info.partName = rs.getString(4); switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + case TxnHandler.MAJOR_TYPE: info.type = CompactionType.MAJOR; break; + case TxnHandler.MINOR_TYPE: info.type = CompactionType.MINOR; break; + case TxnHandler.CLEAN_ABORTED: info.type = CompactionType.CLEAN_ABORTED; break; + default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } info.runAs = rs.getString(6); info.highestWriteId = rs.getLong(7); rc.add(info); } + + for (CompactionInfo ci: rc) { + if (ci.type.equals(CompactionType.CLEAN_ABORTED)) { + String s2 = "select tc_writeid from TXN_COMPONENTS where tc_database=? AND tc_table=? AND tc_operation_type=?"; + PreparedStatement pStmt = dbConn.prepareStatement(s2); + LOG.debug("Going to execute query <" + s2 + ">"); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + pStmt.setString(3, String.valueOf(OperationType.ALL_PARTITIONS.getSqlConst())); + rs = pStmt.executeQuery(); + ci.writeIds = new HashSet<>(); + while(rs.next()) { + ci.writeIds.add(rs.getLong(1)); + } + } + } + LOG.debug("Going to rollback"); dbConn.rollback(); return rc; @@ -354,8 +393,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); pStmt.setLong(1, info.id); rs = pStmt.executeQuery(); + Set writeIds = info.writeIds; if(rs.next()) { info = CompactionInfo.loadFullFromCompactionQueue(rs); + info.writeIds = writeIds; } else { throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); @@ -399,8 +440,12 @@ public void markCleaned(CompactionInfo info) throws MetaException { } LOG.debug("Going to execute update <" + s + ">"); if ((updCount = pStmt.executeUpdate()) < 1) { - LOG.error("Expected to remove at least one row from completed_txn_components when " + - "marking compaction entry as clean!"); + // In the case of clean abort commit hasn't happened so completed_txn_components hasn't been filled + if (!info.isCleanAbortedCompaction()) { + LOG.error( + "Expected to remove at least one row from completed_txn_components when " + + "marking compaction entry as clean!"); + } } /** * compaction may remove data from aborted txns above tc_writeid bit it only guarantees to @@ -412,6 +457,14 @@ public void markCleaned(CompactionInfo info) throws MetaException { TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; if (info.partName != null) s += " and tc_partition = ?"; + if (info.writeIds != null && info.writeIds.size() > 0) { + String[] wriStr = new String[info.writeIds.size()]; + int i = 0; + for (Long writeId: writeIds) { + wriStr[i++] = writeId.toString(); + } + s += " and tc_writeid in (" + String.join(",", wriStr) + ")"; + } pStmt = dbConn.prepareStatement(s); paramCount = 1; @@ -449,6 +502,14 @@ public void markCleaned(CompactionInfo info) throws MetaException { if (info.partName != null) { suffix.append(" and tc_partition = ?"); } + if (info.writeIds != null && info.writeIds.size() > 0) { + String[] wriStr = new String[info.writeIds.size()]; + int i = 0; + for (Long writeId: writeIds) { + wriStr[i++] = writeId.toString(); + } + suffix.append(" and tc_writeid in (").append(String.join(",", wriStr)).append(")"); + } // Populate the complete query with provided prefix and suffix List counts = TxnUtils diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 91a9ab4053..029d116172 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -159,6 +159,7 @@ // Compactor types static final protected char MAJOR_TYPE = 'a'; static final protected char MINOR_TYPE = 'i'; + static final protected char CLEAN_ABORTED = 'p'; // Transaction states static final protected char TXN_ABORTED = 'a'; @@ -188,7 +189,7 @@ * These are the valid values for TXN_COMPONENTS.TC_OPERATION_TYPE */ enum OperationType { - SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'), COMPACT('c'); + SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'), COMPACT('c'), ALL_PARTITIONS('p'); private final char sqlConst; OperationType(char sqlConst) { this.sqlConst = sqlConst; @@ -208,6 +209,8 @@ public static OperationType fromString(char sqlConst) { return DELETE; case 'c': return COMPACT; + case 'p': + return ALL_PARTITIONS; default: throw new IllegalArgumentException(quoteChar(sqlConst)); } @@ -1225,7 +1228,8 @@ public void commitTxn(CommitTxnRequest rqst) "' from TXN_COMPONENTS where tc_txnid = " + txnid + //we only track compactor activity in TXN_COMPONENTS to handle the case where the //compactor txn aborts - so don't bother copying it to COMPLETED_TXN_COMPONENTS - " AND tc_operation_type <> " + quoteChar(OperationType.COMPACT.sqlConst); + " AND tc_operation_type <> " + quoteChar(OperationType.COMPACT.sqlConst) + + " AND tc_operation_type <> " + quoteChar(OperationType.ALL_PARTITIONS.sqlConst); LOG.debug("Going to execute insert <" + s + ">"); if ((stmt.executeUpdate(s)) < 1) { @@ -1740,7 +1744,6 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // This is for idempotent case. return new AllocateTableWriteIdsResponse(txnToWriteIds); } - long srcWriteId = 0; if (rqst.isSetReplPolicy()) { // In replication flow, we always need to allocate write ID equal to that of source. @@ -2281,6 +2284,10 @@ private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character tx return null; } + private static String tableWithWriteIdToHash(String dbName, String tblName, Long writeId) { + return dbName + tblName + writeId; + } + /** * This enters locks into the queue in {@link #LOCK_WAITING} mode. * @@ -2332,6 +2339,8 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); + Set tableAndWriteId = new HashSet<>(); + if (txnid > 0) { List rows = new ArrayList<>(); List> paramsList = new ArrayList<>(); @@ -2343,6 +2352,8 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc continue; } boolean updateTxnComponents; + boolean updateDynamicPartition = false; + OperationType op = null; if(!lc.isSetOperationType()) { //request came from old version of the client updateTxnComponents = true;//this matches old behavior @@ -2352,17 +2363,13 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc case INSERT: case UPDATE: case DELETE: - if(!lc.isSetIsDynamicPartitionWrite()) { - //must be old client talking, i.e. we don't know if it's DP so be conservative - updateTxnComponents = true; - } - else { - /** - * we know this is part of DP operation and so we'll get - * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list - * of partitions actually chaged. - */ - updateTxnComponents = !lc.isIsDynamicPartitionWrite(); + updateTxnComponents = true; + op = OperationType.fromDataOperationType(lc.getOperationType()); + if (lc.isSetIsDynamicPartitionWrite()) { + if (lc.isIsDynamicPartitionWrite()) { + updateDynamicPartition = true; + op = OperationType.ALL_PARTITIONS; + } } break; case SELECT: @@ -2402,11 +2409,19 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc if (rs.next()) { writeId = rs.getLong(1); } + if (updateDynamicPartition) { + // Only add one row per writeId per table + String hash = tableWithWriteIdToHash(dbName, tblName, writeId); + if (tableAndWriteId.contains(hash)) { + continue; + } + tableAndWriteId.add(hash); + } } rows.add(txnid + ", ?, " + (tblName == null ? "null" : "?") + ", " + (partName == null ? "null" : "?")+ "," + - quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," + + quoteString(op.toString())+ "," + (writeId == null ? "null" : writeId)); List params = new ArrayList<>(); params.add(dbName); @@ -2987,10 +3002,15 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { long id = generateCompactionQueueId(stmt); List params = new ArrayList<>(); + // There could a clean abort compaction for the same database, table and with null + // partition but in this case it would be in READY_FOR_CLEANING state. StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where"). append(" cq_state IN(").append(quoteChar(INITIATED_STATE)). - append(",").append(quoteChar(WORKING_STATE)). - append(") AND cq_database=?"). + append(",").append(quoteChar(WORKING_STATE)); + if (rqst.getType().equals(CompactionType.CLEAN_ABORTED)) { + sb.append(",").append(quoteChar(READY_FOR_CLEANING)); + } + sb.append(") AND cq_database=?"). append(" AND cq_table=?").append(" AND "); params.add(rqst.getDbname()); params.add(rqst.getTablename()); @@ -3001,6 +3021,12 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { params.add(rqst.getPartitionname()); } + // This means even if we have several writeIds for the same table + // only one entry will be inserted in COMPACTION_QUEUE + if (rqst.getType().equals(CompactionType.CLEAN_ABORTED)) { + sb.append(" AND cq_type=").append(quoteChar(CLEAN_ABORTED)); + } + pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params); LOG.debug("Going to execute query <" + sb.toString() + ">"); ResultSet rs = pst.executeQuery(); @@ -3037,7 +3063,14 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { } else { buf.append("'"); } - buf.append(INITIATED_STATE); + char state = INITIATED_STATE; + // We send the work directly to the cleaning stage because + // the worker doesn't have to do anything, we only have to delete + // files that may have been written. + if (rqst.getType().equals(CompactionType.CLEAN_ABORTED)) { + state = READY_FOR_CLEANING; + } + buf.append(state); buf.append("', '"); switch (rqst.getType()) { case MAJOR: @@ -3048,6 +3081,10 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { buf.append(MINOR_TYPE); break; + case CLEAN_ABORTED: + buf.append(CLEAN_ABORTED); + break; + default: LOG.debug("Going to rollback"); dbConn.rollback(); @@ -3131,6 +3168,7 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep switch (rs.getString(5).charAt(0)) { case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; case MINOR_TYPE: e.setType(CompactionType.MINOR); break; + case CLEAN_ABORTED: e.setType(CompactionType.CLEAN_ABORTED); break; default: //do nothing to handle RU/D if we add another status } @@ -3223,6 +3261,15 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) for(PreparedStatement pst : insertPreparedStmts) { modCount = pst.executeUpdate(); } + + // Delete from TXN_COMPONENTS the row indicating to scan all partition in + // case of failure because at this point partitions are added and we know + // what to scan. + PreparedStatement deletePreparedStmt = sqlGenerator.prepareStmtWithParameters(dbConn, + "delete from TXN_COMPONENTS where TC_TXNID=? and TC_OPERATION_TYPE=?", + Arrays.asList(Long.toString(rqst.getTxnid()), OperationType.ALL_PARTITIONS.toString())); + deletePreparedStmt.execute(); + LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { @@ -5223,6 +5270,8 @@ static CompactionType dbCompactionType2ThriftType(char dbValue) { return CompactionType.MAJOR; case MINOR_TYPE: return CompactionType.MINOR; + case CLEAN_ABORTED: + return CompactionType.CLEAN_ABORTED; default: LOG.warn("Unexpected compaction type " + dbValue); return null; @@ -5234,6 +5283,8 @@ static Character thriftCompactionType2DbType(CompactionType ct) { return MAJOR_TYPE; case MINOR: return MINOR_TYPE; + case CLEAN_ABORTED: + return CLEAN_ABORTED; default: LOG.warn("Unexpected compaction type " + ct); return null; diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index 898a94dcd6..ae37b2ae1d 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -2365,7 +2365,7 @@ public long allocateTableWriteId(long txnId, String dbName, String tableName) th rqst.setSrcTxnToWriteIdList(srcTxnToWriteIdList); return allocateTableWriteIdsBatchIntr(rqst); } - + private List allocateTableWriteIdsBatchIntr(AllocateTableWriteIdsRequest rqst) throws TException { return client.allocate_table_write_ids(rqst).getTxnToWriteIds(); }