diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 82ba775286..6a8853d994 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; @@ -30,11 +31,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -45,9 +48,11 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -704,16 +709,7 @@ public void majorCompactWhileStreaming() throws Exception { // it has an open txn in it writeBatch(connection, writer, true); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -827,16 +823,7 @@ public void majorCompactAfterAbort() throws Exception { txnBatch.abort(); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -860,6 +847,198 @@ public void majorCompactAfterAbort() throws Exception { } } + + @Test + public void mmTable() throws Exception { + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS TEXTFILE" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + msClient.close(); + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 1); + + // Check that we have two deltas. + FileSystem fs = FileSystem.get(conf); + verifyDeltaCount(table.getSd(), fs, 2); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 1); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + // Make sure we don't compact if we don't need to compact. + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 1); + verifyHasBase(table.getSd(), fs, "base_0000002"); + } + + + @Test + public void mmTableOpenWriteId() throws Exception { + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS TEXTFILE" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + msClient.close(); + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 1); + + long openTxnId = msClient.openTxn("test"); + long openWriteId = msClient.allocateTableWriteId(openTxnId, dbName, tblName); + Assert.assertEquals(3, openWriteId); // Just check to make sure base_5 below is not new. + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 2); + + runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened. + FileSystem fs = FileSystem.get(conf); + verifyHasBase(table.getSd(), fs, "base_0000002"); + verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter); + verifyFooBarResult(tblName, 2); + + runCleaner(conf); + verifyHasDir(table.getSd(), fs, "delta_0000004_0000004_0000", AcidUtils.deltaFileFilter); + verifyHasDir(table.getSd(), fs, "delta_0000005_0000005_0000", AcidUtils.deltaFileFilter); + verifyFooBarResult(tblName, 2); + + msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3. + runMajorCompaction(dbName, tblName); // Compact 4 and 5. + verifyFooBarResult(tblName, 2); + verifyHasBase(table.getSd(), fs, "base_0000005"); + runCleaner(conf); + verifyDeltaCount(table.getSd(), fs, 0); + } + + private void verifyHasBase( + StorageDescriptor sd, FileSystem fs, String baseName) throws Exception { + verifyHasDir(sd, fs, baseName, AcidUtils.baseFileFilter); + } + + private void verifyHasDir( + StorageDescriptor sd, FileSystem fs, String name, PathFilter filter) throws Exception { + FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter); + for (FileStatus file : stat) { + if (name.equals(file.getPath().getName())) return; + } + Assert.fail("Cannot find " + name + ": " + Arrays.toString(stat)); + } + + private void verifyDeltaCount( + StorageDescriptor sd, FileSystem fs, int count) throws Exception { + verifyDirCount(sd, fs, count, AcidUtils.deltaFileFilter); + } + + private void verifyDirCount( + StorageDescriptor sd, FileSystem fs, int count, PathFilter filter) throws Exception { + FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter); + Assert.assertEquals(Arrays.toString(stat), count, stat.length); + } + + @Test + public void mmTablePartitioned() throws Exception { + String dbName = "default"; + String tblName = "mm_part"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds int) STORED AS TEXTFILE" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 3)", driver); + + verifyFooBarResult(tblName, 3); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Partition p1 = msClient.getPartition(dbName, tblName, "ds=1"), + p2 = msClient.getPartition(dbName, tblName, "ds=2"), + p3 = msClient.getPartition(dbName, tblName, "ds=3"); + msClient.close(); + + FileSystem fs = FileSystem.get(conf); + verifyDeltaCount(p1.getSd(), fs, 3); + verifyDeltaCount(p2.getSd(), fs, 2); + verifyDeltaCount(p3.getSd(), fs, 1); + + runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3"); + + verifyFooBarResult(tblName, 3); + verifyDeltaCount(p3.getSd(), fs, 1); + verifyHasBase(p1.getSd(), fs, "base_0000006"); + verifyHasBase(p2.getSd(), fs, "base_0000006"); + + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver); + + runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3"); + + // Make sure we don't compact if we don't need to compact; but do if we do. + verifyFooBarResult(tblName, 4); + verifyDeltaCount(p3.getSd(), fs, 1); + verifyHasBase(p1.getSd(), fs, "base_0000006"); + verifyHasBase(p2.getSd(), fs, "base_0000008"); + + } + + private void verifyFooBarResult(String tblName, int count) throws Exception, IOException { + List valuesReadFromHiveDriver = new ArrayList(); + executeStatementOnDriver("SELECT a,b FROM " + tblName, driver); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size()); + int fooCount = 0, barCount = 0; + for (String s : valuesReadFromHiveDriver) { + if ("1\tfoo".equals(s)) { + ++fooCount; + } else if ("2\tbar".equals(s)) { + ++barCount; + } else { + Assert.fail("Unexpected " + s); + } + } + Assert.assertEquals(fooCount, count); + Assert.assertEquals(barCount, count); + } + + private void runMajorCompaction( + String dbName, String tblName, String... partNames) throws MetaException { + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(conf); + t.init(new AtomicBoolean(true), new AtomicBoolean()); + if (partNames.length == 0) { + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + t.run(); + } else { + for (String partName : partNames) { + CompactionRequest cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + cr.setPartitionname(partName); + txnHandler.compact(cr); + t.run(); + } + } + } + @Test public void majorCompactWhileStreamingForSplitUpdate() throws Exception { String dbName = "default"; @@ -885,16 +1064,7 @@ public void majorCompactWhileStreamingForSplitUpdate() throws Exception { // Start a third batch, but don't close it. writeBatch(connection, writer, true); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -1461,6 +1631,7 @@ static void executeStatementOnDriver(String cmd, IDriver driver) throws Exceptio throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr); } } + static void createTestDataFile(String filename, String[] lines) throws IOException { FileWriter writer = null; try { diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index a88453c978..be0e8c9462 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -46,6 +46,8 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -61,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; @@ -208,6 +211,8 @@ private CacheUsage cacheUsage; private CacheEntry usedCacheEntry; + private boolean isCompactionDriver = false; + private ValidReaderWriteIdList compactionWriteIds = null; private enum DriverState { INITIALIZED, @@ -540,8 +545,10 @@ private void compile(String command, boolean resetTaskIds, boolean deferClose) t conf.setQueryString(queryStr); // FIXME: sideeffect will leave the last query set at the session level - SessionState.get().getConf().setQueryString(queryStr); - SessionState.get().setupQueryCurrentTimestamp(); + if (SessionState.get() != null) { + SessionState.get().getConf().setQueryString(queryStr); + SessionState.get().setupQueryCurrentTimestamp(); + } // Whether any error occurred during query compilation. Used for query lifetime hook. boolean compileError = false; @@ -1308,7 +1315,16 @@ private void recordValidWriteIds(HiveTxnManager txnMgr) throws LockException { throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); } - ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(getTransactionalTableList(plan), txnString); + List txnTables = getTransactionalTableList(plan); + ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString); + if (this.isCompactionDriver) { + if (txnTables.size() != 1) { + throw new LockException("Unexpected tables in compaction: " + txnTables); + } + ValidWriteIdList originalIds = txnWriteIds.getTableValidWriteIdList(txnTables.get(0)); + this.compactionWriteIds = (ValidReaderWriteIdList) originalIds; + TxnUtils.updateForCompactionQuery(compactionWriteIds); + } String writeIdStr = txnWriteIds.toString(); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr); if (plan.getFetchTask() != null) { @@ -2755,4 +2771,12 @@ public boolean hasResultSet() { return false; } } + + public void setIsCompactionDriver(boolean val) { + this.isCompactionDriver = val; + } + + public ValidReaderWriteIdList getCompactionWriteIds() { + return compactionWriteIds; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index b1c2288d01..adaa701a95 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -21,6 +21,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,22 +36,32 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -70,6 +81,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -128,7 +140,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag } job.set(FINAL_LOCATION, sd.getLocation()); - job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString()); + job.set(TMP_LOCATION, generateTmpPath(sd)); job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat()); job.setBoolean(IS_COMPRESSED, sd.isCompressed()); @@ -200,19 +212,15 @@ private void overrideMRProps(JobConf job, Map properties) { * @param ci CompactionInfo * @throws java.io.IOException if the job fails */ - void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds, + void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } - // For MM tables we don't need to launch MR jobs as there is no compaction needed. - // We just need to delete the directories for aborted transactions. if (AcidUtils.isInsertOnlyTable(t.getParameters())) { - LOG.debug("Going to delete directories for aborted transactions for MM table " - + t.getDbName() + "." + t.getTableName()); - removeFiles(conf, sd.getLocation(), writeIds, t); + runMmCompaction(conf, t, p, sd, writeIds, ci); return; } @@ -294,6 +302,117 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWrit su.gatherStats(); } + private void runMmCompaction(HiveConf conf, Table t, Partition p, + StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { + LOG.debug("Going to delete directories for aborted transactions for MM table " + + t.getDbName() + "." + t.getTableName()); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), + conf, writeIds, Ref.from(false), false, t.getParameters()); + removeFilesForMmTable(conf, dir); + + // Then, actually do the compaction. + if (!ci.isMajorCompaction()) { + // Not supported for MM tables right now. + LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction"); + } + + int deltaCount = dir.getCurrentDirectories().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) { + LOG.debug("Not compacting " + sd.getLocation() + "; current base is " + + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas"); + return; + } + try { + String tmpLocation = generateTmpPath(sd); + long maxTxn = Long.MIN_VALUE; + for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) { + maxTxn = Math.max(maxTxn, delta.getMaxWriteId()); + } + assert maxTxn != Long.MIN_VALUE; + Path baseLocation = new Path(tmpLocation, "_base"); + ValidReaderWriteIdList actualWriteIds = runMmCompactionQuery(conf, t, p, baseLocation); + commitMmCompaction(tmpLocation, sd.getLocation(), conf, actualWriteIds); + } catch (HiveException e) { + throw new IOException(e); + } + return; + } + + private String generateTmpPath(StorageDescriptor sd) { + return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); + } + + private ValidReaderWriteIdList runMmCompactionQuery( + HiveConf conf, Table t, Partition p, Path baseLocation) + throws IOException, HiveException { + conf = new HiveConf(conf); + String fullName = t.getDbName() + "." + t.getTableName(); + // TODO: replace with alter table concatenate when that is ready, + // to be able to utilize the fast path for ORC and RC. + String query = "insert overwrite directory '" + baseLocation + "' "; + String filter = ""; + if (p != null) { + conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + filter = " where "; + List vals = p.getValues(); + List keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'"); + } + query += " select "; + // Use table descriptor for columns. + List cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`"); + } + } else { + query += "select *"; + } + query += " from " + fullName + filter; + + LOG.info("Compacting a MM table via " + query); + + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + SessionState sessionState = SessionState.get(); + if (sessionState == null) { + // Note: we assume that workers run on the same threads repeatedly, so we can set up + // the session here and it will be reused without explicitly storing in the worker. + sessionState = new SessionState(conf, user); + SessionState.setCurrentSessionState(sessionState); + } + + ValidReaderWriteIdList result = null; + boolean isOk = false; + try { + QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build(); + Driver driver = new Driver(qs, user, null, null); + driver.setIsCompactionDriver(true); + try { + CommandProcessorResponse cpr = driver.run(query); + if (cpr.getResponseCode() != 0) { + LOG.error("Failed to run " + query, cpr.getException()); + throw new HiveException("Failed to run " + query, cpr.getException()); + } + result = driver.getCompactionWriteIds(); + } finally { + driver.close(); + driver.destroy(); + } + isOk = true; + } finally { + if (!isOk) { + try { + sessionState.close(); // This also resets SessionState.get. + } catch (Throwable th) { + LOG.warn("Failed to close a bad session", th); + SessionState.detachSession(); + } + } + } + return result; + } + /** * @param baseDir if not null, it's either table/partition root folder or base_xxxx. * If it's base_xxxx, it's in dirsToSearch, else the actual original files @@ -309,6 +428,10 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa dirsToSearch = new StringableList(); } StringableList deltaDirs = new StringableList(); + // Note: if compaction creates a delta, it won't replace an existing base dir, so the txn ID + // of the base dir won't be a part of delta's range. If otoh compaction creates a base, + // we don't care about this value because bases don't have min txn ID in the name. + // However logically this should also take base into account if it's included. long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; for (AcidUtils.ParsedDelta delta : parsedDeltas) { @@ -378,10 +501,7 @@ private void setColumnTypes(JobConf job, List cols) { } // Remove the directories for aborted transactions only - private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t) - throws IOException { - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList, - Ref.from(false), false, t.getParameters()); + private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException { // For MM table, we only want to delete delta dirs for aborted txns. List abortedDirs = dir.getAbortedDirectories(); List filesToDelete = new ArrayList<>(abortedDirs.size()); @@ -389,11 +509,9 @@ private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeI filesToDelete.add(stat.getPath()); } if (filesToDelete.size() < 1) { - LOG.warn("Hmm, nothing to delete in the worker for directory " + location + - ", that hardly seems right."); return; } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location); + LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); @@ -969,4 +1087,37 @@ public void abortJob(JobContext context, int status) throws IOException { fs.delete(tmpLocation, true); } } + + /** + * Note: similar logic to the main committer; however, no ORC versions and stuff like that. + * @param from The temp directory used for compactor output. Not the actual base/delta. + * @param to The final directory; basically a SD directory. Not the actual base/delta. + * @param destPathOpts The options used to create the actual base/delta directory. + */ + private void commitMmCompaction(String from, String to, Configuration conf, + ValidReaderWriteIdList actualWriteIds) throws IOException { + Path fromPath = new Path(from), toPath = new Path(to); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + long maxTxn = actualWriteIds.getHighWatermark(); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1); + Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info(from + " not found. Assuming 0 splits. Creating " + newDeltaDir); + fs.mkdirs(newDeltaDir); + AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs); + return; + } + LOG.info("Moving contents of " + from + " to " + to); + FileStatus[] children = fs.listStatus(fromPath); + if (children.length != 1) { + throw new IOException("Unexpected files in the source: " + Arrays.toString(children)); + } + for (FileStatus fileStatus : children) { + fs.rename(fileStatus.getPath(), newDeltaDir); + AcidUtils.MetaDataFile.createCompactorMarker(newDeltaDir, fs); + } + fs.delete(fromPath, true); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 22765b8e63..eeaae57c5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -240,11 +240,6 @@ private CompactionType checkForCompaction(final CompactionInfo ci, return CompactionType.MAJOR; } - // If it is for insert-only transactional table, return null. - if (AcidUtils.isInsertOnlyTable(tblproperties)) { - return null; - } - if (runJobAsSelf(runAs)) { return determineCompactionType(ci, writeIds, sd, tblproperties); } else { @@ -333,14 +328,20 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : Integer.parseInt(deltaNumProp); boolean enough = deltas.size() > deltaNumThreshold; - if (enough) { - LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + - (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") + - " compaction"); - // If there's no base file, do a major compaction - return noBase ? CompactionType.MAJOR : CompactionType.MINOR; + if (!enough) { + return null; + } + if (AcidUtils.isInsertOnlyTable(tblproperties)) { + LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size() + + " delta files, threshold is " + deltaNumThreshold); + return CompactionType.MAJOR; } - return null; + // TODO: this log statement looks wrong + LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + + (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") + + " compaction"); + // If there's no base file, do a major compaction + return noBase ? CompactionType.MAJOR : CompactionType.MINOR; } private long sumDirSize(FileSystem fs, Path dir) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index fe0aaa4ff5..74612990de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -84,6 +84,7 @@ public void run() { // so wrap it in a big catch Throwable statement. try { final CompactionInfo ci = txnHandler.findNextToCompact(name); + LOG.debug("Processing compaction request " + ci); if (ci == null && !stop.get()) { try { @@ -170,14 +171,15 @@ public void run() { launchedJob = true; try { if (runJobAsSelf(runAs)) { - mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); + final Partition fp = p; ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, txnHandler); return null; } }); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 7b02865e18..10c318a7d5 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; @@ -155,6 +156,16 @@ public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValid } } + public static void updateForCompactionQuery(ValidReaderWriteIdList ids) { + // This is based on the existing valid write ID list that was built for a select query; + // therefore we assume all the aborted txns, etc. were already accounted for. + // All we do is adjust the high watermark to only include contiguous txns. + Long minOpenWriteId = ids.getMinOpenWriteId(); + if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) { + ids.updateHighWatermark(ids.getMinOpenWriteId() - 1); + } + } + /** * Get an instance of the TxnStore that is appropriate for this store * @param conf configuration diff --git storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java index 107ea9028a..9ff93a5ec6 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java +++ storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java @@ -250,5 +250,9 @@ public RangeResponse isWriteIdRangeAborted(long minWriteId, long maxWriteId) { return RangeResponse.SOME; } } + + public void updateHighWatermark(long value) { + this.highWatermark = value; + } }