diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 5966740f88..c047c18034 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; @@ -45,9 +46,11 @@ import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -704,16 +707,7 @@ public void majorCompactWhileStreaming() throws Exception { // it has an open txn in it writeBatch(connection, writer, true); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -827,16 +821,7 @@ public void majorCompactAfterAbort() throws Exception { txnBatch.abort(); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); @@ -860,6 +845,142 @@ public void majorCompactAfterAbort() throws Exception { } } + + @Test + public void mmTable() throws Exception { + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS TEXTFILE" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + msClient.close(); + + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver); + + verifyFooBarResult(tblName, 1); + + // Check that we have two deltas. + FileSystem fs = FileSystem.get(conf); + verifyDeltaCount(table.getSd(), fs, 2); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 1); + verifyHasBase(table.getSd(), fs, "base_0000003"); + + // Make sure we don't compact if we don't need to compact. + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 1); + verifyHasBase(table.getSd(), fs, "base_0000003"); + } + + private void verifyHasBase( + StorageDescriptor sd, FileSystem fs, String baseName) throws Exception { + FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), AcidUtils.baseFileFilter); + for (FileStatus file : stat) { + if (baseName.equals(file.getPath().getName())) return; + } + Assert.fail("Cannot find " + baseName + ": " + Arrays.toString(stat)); + } + + private void verifyDeltaCount( + StorageDescriptor sd, FileSystem fs, int count) throws Exception { + FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), AcidUtils.deltaFileFilter); + Assert.assertEquals(Arrays.toString(stat), count, stat.length); + } + + @Test + public void mmTablePartitioned() throws Exception { + String dbName = "default"; + String tblName = "mm_part"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds int) STORED AS TEXTFILE" + + " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + driver); + + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 1)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 3)", driver); + + verifyFooBarResult(tblName, 3); + + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Partition p1 = msClient.getPartition(dbName, tblName, "ds=1"), + p2 = msClient.getPartition(dbName, tblName, "ds=2"), + p3 = msClient.getPartition(dbName, tblName, "ds=3"); + msClient.close(); + + FileSystem fs = FileSystem.get(conf); + verifyDeltaCount(p1.getSd(), fs, 3); + verifyDeltaCount(p2.getSd(), fs, 2); + verifyDeltaCount(p3.getSd(), fs, 1); + + runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3"); + + verifyFooBarResult(tblName, 3); + verifyDeltaCount(p3.getSd(), fs, 1); + verifyHasBase(p1.getSd(), fs, "base_0000007"); + verifyHasBase(p2.getSd(), fs, "base_0000008"); + + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver); + executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver); + + runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3"); + + // Make sure we don't compact if we don't need to compact; but do if we do. + verifyFooBarResult(tblName, 4); + verifyDeltaCount(p3.getSd(), fs, 1); + verifyHasBase(p1.getSd(), fs, "base_0000007"); + verifyHasBase(p2.getSd(), fs, "base_0000011"); + + } + + private void verifyFooBarResult(String tblName, int count) throws Exception, IOException { + List valuesReadFromHiveDriver = new ArrayList(); + executeStatementOnDriver("SELECT a,b FROM " + tblName, driver); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size()); + int fooCount = 0, barCount = 0; + for (String s : valuesReadFromHiveDriver) { + if ("1\tfoo".equals(s)) { + ++fooCount; + } else if ("2\tbar".equals(s)) { + ++barCount; + } else { + Assert.fail("Unexpected " + s); + } + } + Assert.assertEquals(fooCount, count); + Assert.assertEquals(barCount, count); + } + + private void runMajorCompaction( + String dbName, String tblName, String... partNames) throws MetaException { + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setConf(conf); + t.init(new AtomicBoolean(true), new AtomicBoolean()); + if (partNames.length == 0) { + txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); + t.run(); + } else { + for (String partName : partNames) { + CompactionRequest cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + cr.setPartitionname(partName); + txnHandler.compact(cr); + t.run(); + } + } + } + @Test public void majorCompactWhileStreamingForSplitUpdate() throws Exception { String dbName = "default"; @@ -885,16 +1006,7 @@ public void majorCompactWhileStreamingForSplitUpdate() throws Exception { // Start a third batch, but don't close it. writeBatch(connection, writer, true); - // Now, compact - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR)); - Worker t = new Worker(); - t.setThreadId((int) t.getId()); - t.setConf(conf); - AtomicBoolean stop = new AtomicBoolean(true); - AtomicBoolean looped = new AtomicBoolean(); - t.init(stop, looped); - t.run(); + runMajorCompaction(dbName, tblName); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index b1c2288d01..6685284926 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -37,20 +37,32 @@ import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -70,6 +82,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -200,19 +213,37 @@ private void overrideMRProps(JobConf job, Map properties) { * @param ci CompactionInfo * @throws java.io.IOException if the job fails */ - void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds, + void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } - // For MM tables we don't need to launch MR jobs as there is no compaction needed. - // We just need to delete the directories for aborted transactions. - if (AcidUtils.isInsertOnlyTable(t.getParameters())) { + // For MM tables, we can cheaply delete the directories for aborted transactions. + boolean isMmTable = AcidUtils.isInsertOnlyTable(t.getParameters()); + if (isMmTable) { LOG.debug("Going to delete directories for aborted transactions for MM table " + t.getDbName() + "." + t.getTableName()); - removeFiles(conf, sd.getLocation(), writeIds, t); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), + conf, writeIds, Ref.from(false), false, t.getParameters()); + removeFilesForMmTable(conf, dir); + if (!ci.isMajorCompaction()) { + // Not supported for MM tables right now. + LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction"); + } + + int deltaCount = dir.getCurrentDirectories().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) { + LOG.debug("Not compacting " + sd.getLocation() + "; current base is " + + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas"); + return; + } + try { + runMmCompactionQuery(conf, t, p); + } catch (HiveException e) { + throw new IOException(e); + } return; } @@ -294,6 +325,63 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWrit su.gatherStats(); } + private void runMmCompactionQuery( + HiveConf conf, Table t, Partition p) throws IOException, HiveException { + conf = new HiveConf(conf); + String fullName = t.getDbName() + "." + t.getTableName(); + // TODO: replace with alter table concatenate when that is ready, + // to be able to utilize the fast path for ORC and RC. + String query = "insert overwrite table " + fullName + " "; + String filter = ""; + if (p != null) { + conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + query += " partition ("; + filter = " where "; + List vals = p.getValues(); + List keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + String str = "`" + keys.get(i).getName() + "`='" + vals.get(i) + "'"; + query += (i == 0 ? "" : ", ") + str; + filter += (i == 0 ? "" : " and ") + str; + } + query += ") select "; + // Use table descriptor for columns. + for (int i = 0; i < t.getSd().getCols().size(); ++i) { + query += (i == 0 ? "`" : ", `") + (t.getSd().getCols().get(i).getName() + "`"); + } + } else { + query += "select *"; + } + query += " from " + fullName + filter; + + LOG.info("Compacting a MM table via " + query); + + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + if (SessionState.get() == null) { + // TODO: move to global? should be ok if it's always the same thread. + SessionState sessionState = new SessionState(conf, user); + SessionState.setCurrentSessionState(sessionState); + } + + try { + QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build(); + Driver driver = new Driver(qs, user, null, null); + try { + CommandProcessorResponse cpr = driver.run(query); + if (cpr.getResponseCode() != 0) { + LOG.error("Failed to run " + query, cpr.getException()); + throw new HiveException("Failed to run " + query, cpr.getException()); + } + } finally { + driver.close(); + driver.destroy(); + } + } finally { + //txnManager.closeTxnManager(); + } + } + /** * @param baseDir if not null, it's either table/partition root folder or base_xxxx. * If it's base_xxxx, it's in dirsToSearch, else the actual original files @@ -378,22 +466,22 @@ private void setColumnTypes(JobConf job, List cols) { } // Remove the directories for aborted transactions only - private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t) - throws IOException { - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList, - Ref.from(false), false, t.getParameters()); + private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException { // For MM table, we only want to delete delta dirs for aborted txns. List abortedDirs = dir.getAbortedDirectories(); List filesToDelete = new ArrayList<>(abortedDirs.size()); for (FileStatus stat : abortedDirs) { filesToDelete.add(stat.getPath()); } + // TODO: Also delete obsolete directories? How do we account for readers? + /*List obsolete = dir.getObsolete(); + for (FileStatus stat : obsolete) { + filesToDelete.add(stat.getPath()); + }*/ if (filesToDelete.size() < 1) { - LOG.warn("Hmm, nothing to delete in the worker for directory " + location + - ", that hardly seems right."); return; } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location); + LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index fe0aaa4ff5..74612990de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -84,6 +84,7 @@ public void run() { // so wrap it in a big catch Throwable statement. try { final CompactionInfo ci = txnHandler.findNextToCompact(name); + LOG.debug("Processing compaction request " + ci); if (ci == null && !stop.get()) { try { @@ -170,14 +171,15 @@ public void run() { launchedJob = true; try { if (runJobAsSelf(runAs)) { - mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); + final Partition fp = p; ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler); + mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, txnHandler); return null; } });