diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 34df01e60e..77cf958a68 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2632,8 +2632,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The default value is 1000000, since the data limit of a znode is 1MB"), HIVE_MM_ALLOW_ORIGINALS("hive.mm.allow.originals", false, "Whether to allow original files in MM tables. Conversion to MM may be expensive if\n" + - "this is set to false, however unless MAPREDUCE-7086 fix is present, queries that\n" + - "read MM tables with original files will fail. The default in Hive 3.0 is false."), + "this is set to false, however unless MAPREDUCE-7086 fix is present (hadoop 3.1.1+), \n" + + "queries that read non-orc MM tables with original files will fail. The default in \n" + + "Hive 3.0 is false."), // Zookeeper related configs HIVE_ZOOKEEPER_USE_KERBEROS("hive.zookeeper.kerberos.enabled", true, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 95fa6641f2..28a2ed7887 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -826,96 +826,131 @@ public void mmTable() throws Exception { verifyHasBase(table.getSd(), fs, "base_0000002_v0000006"); } - @Test - public void mmTableOriginalsOrc() throws Exception { - mmTableOriginals("ORC"); + @Test public void mmTableOriginalsMajorOrc() throws Exception { + mmTableOriginalsMajor("orc", true); } - @Test - public void mmTableOriginalsText() throws Exception { - mmTableOriginals("TEXTFILE"); + @Test public void mmTableOriginalsMajorText() throws Exception { + mmTableOriginalsMajor("textfile", false); } - private void mmTableOriginals(String format) throws Exception { - // Originals split won't work due to MAPREDUCE-7086 issue in FileInputFormat. - boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format); + /** + * Major compact an mm table that contains original files. + */ + private void mmTableOriginalsMajor(String format, boolean allowOriginals) throws Exception { + driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals); String dbName = "default"; String tblName = "mm_nonpart"; executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + - format + " TBLPROPERTIES ('transactional'='false')", driver); - IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format + + " TBLPROPERTIES ('transactional'='false')", driver); Table table = msClient.getTable(dbName, tblName); executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); - executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 3); FileSystem fs = FileSystem.get(conf); executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " - + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); - verifyFooBarResult(tblName, 3); + if (allowOriginals) { + verifyDeltaCount(table.getSd(), fs, 0); + verifyFooBarResult(tblName, 3); - runMajorCompaction(dbName, tblName); - verifyFooBarResult(tblName, 3); - verifyHasBase(table.getSd(), fs, "base_0000001_v0000009"); + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 3); + verifyHasBase(table.getSd(), fs, "base_0000001_v0000009"); + } else { + verifyDeltaCount(table.getSd(), fs, 1); + // 1 delta dir won't be compacted. Skip testing major compaction. + } + } + + @Test public void mmMajorOriginalsDeltasOrc() throws Exception { + mmMajorOriginalsDeltas("orc", true); + } + + @Test public void mmMajorOriginalsDeltasText() throws Exception { + mmMajorOriginalsDeltas("textfile", false); + } - // Try with an extra delta. + /** + * Major compact an mm table with both original and delta files. + */ + private void mmMajorOriginalsDeltas(String format, boolean allowOriginals) throws Exception { + driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals); + String dbName = "default"; + String tblName = "mm_nonpart"; + FileSystem fs = FileSystem.get(conf); executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + - format + " TBLPROPERTIES ('transactional'='false')", driver); - table = msClient.getTable(dbName, tblName); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format + + " TBLPROPERTIES ('transactional'='false')", driver); + Table table = msClient.getTable(dbName, tblName); executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); - executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 3); executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " - + "('transactional'='true', 'transactional_properties'='insert_only')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); - - // Neither select nor compaction (which is a select) wil work after this. - if (isBrokenUntilMapreduce7086) return; + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 9); runMajorCompaction(dbName, tblName); verifyFooBarResult(tblName, 9); - verifyHasBase(table.getSd(), fs, "base_0000002_v0000023"); + verifyHasBase(table.getSd(), fs, "base_0000002_v0000010"); + } + @Test public void mmMajorOriginalsBaseOrc() throws Exception { + mmMajorOriginalsBase("orc", true); + } + + @Test public void mmMajorOriginalsBaseText() throws Exception { + mmMajorOriginalsBase("textfile", false); + } + + /** + * Insert overwrite and major compact an mm table with only original files. + * + * @param format file format for table + * @throws Exception + */ + private void mmMajorOriginalsBase(String format, boolean allowOriginals) throws Exception { + driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals); // Try with an extra base. + String dbName = "default"; + String tblName = "mm_nonpart"; + FileSystem fs = FileSystem.get(conf); executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + - format + " TBLPROPERTIES ('transactional'='false')", driver); - table = msClient.getTable(dbName, tblName); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format + + " TBLPROPERTIES ('transactional'='false')", driver); + Table table = msClient.getTable(dbName, tblName); executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); - executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 3); executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " - + "('transactional'='true', 'transactional_properties'='insert_only')", driver); - executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 6); runMajorCompaction(dbName, tblName); verifyFooBarResult(tblName, 6); verifyHasBase(table.getSd(), fs, "base_0000002"); - - msClient.close(); } - @Test public void mmTableBucketed() throws Exception { String dbName = "default"; @@ -1067,8 +1102,8 @@ public void mmTablePartitioned() throws Exception { } - private void verifyFooBarResult(String tblName, int count) throws Exception, IOException { - List valuesReadFromHiveDriver = new ArrayList(); + private void verifyFooBarResult(String tblName, int count) throws Exception { + List valuesReadFromHiveDriver = new ArrayList<>(); executeStatementOnDriver("SELECT a,b FROM " + tblName, driver); driver.getResults(valuesReadFromHiveDriver); Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 9659a3f048..957f72b665 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -948,12 +948,12 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, hiveConf.set(ValidTxnList.VALID_TXNS_KEY, "8:9223372036854775807::"); // Check for default case. - qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries); + qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries); Assert.assertEquals("all", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT)); // Check for case where hive.llap.io.etl.skip.format is explicitly set to none - as to always use cache. hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none"); - qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries); + qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries); Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 543ec0b991..8698dfb28c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.regex.Matcher; import org.apache.hadoop.conf.Configuration; @@ -132,7 +133,7 @@ private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, Storag job.setOutputCommitter(CompactorOutputCommitter.class); job.set(FINAL_LOCATION, sd.getLocation()); - job.set(TMP_LOCATION, QueryCompactor.Util.generateTmpPath(sd)); + job.set(TMP_LOCATION, generateTmpPath(sd)); job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat()); job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat()); job.setBoolean(IS_COMPRESSED, sd.isCompressed()); @@ -229,6 +230,7 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor */ QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci); if (queryCompactor != null) { + LOG.info("Will compact with " + queryCompactor.getClass().getName()); queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci); return; } @@ -294,6 +296,15 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor su.gatherStats(); } + /** + * Generate a random tmp path, under the provided storage. + * @param sd storage descriptor, must be not null. + * @return path, always not null + */ + static String generateTmpPath(StorageDescriptor sd) { + return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); + } + /** * @param baseDir if not null, it's either table/partition root folder or base_xxxx. * If it's base_xxxx, it's in dirsToSearch, else the actual original files diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index f47c23a6de..51b96eef7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -25,9 +25,9 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; + import java.io.IOException; import java.util.List; @@ -53,20 +53,14 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - - long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); - long highWaterMark = writeIds.getHighWatermark(); - long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true) - .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) - .maximumWriteId(highWaterMark).statementId(-1).visibilityTxnId(compactorTxnId); - Path tmpTablePath = AcidUtils.baseOrDeltaSubdirPath(new Path(storageDescriptor.getLocation()), options); + Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, + conf, true, false, false); List createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString()); List compactionQueries = getCompactionQueries(table, partition, tmpTableName); List dropQueries = getDropQueries(tmpTableName); - runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, - compactionQueries, dropQueries); + runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, + Lists.newArrayList(tmpTablePath), createQueries, compactionQueries, dropQueries); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index 1bf0beea40..afb4db570c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; @@ -45,7 +44,7 @@ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { LOG.info("Running query based minor compaction"); AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); @@ -61,14 +60,22 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD String tmpTableName = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis(); - List createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor); + Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, + writeIds, conf, false, false, false); + Path resultDeleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, + writeIds, conf, false, true, false); + + List createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, + resultDeltaDir, resultDeleteDeltaDir); List compactionQueries = getCompactionQueries(tmpTableName, table, writeIds); List dropQueries = getDropQueries(tmpTableName); - runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, + runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, + Lists.newArrayList(resultDeltaDir, resultDeleteDeltaDir), createQueries, compactionQueries, dropQueries); } + @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { @@ -89,16 +96,13 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, * @param tempTableBase an unique identifier which is used to create delta/delete-delta temp tables * @param dir the directory, where the delta directories resides * @param writeIds list of valid write ids, used to filter out delta directories which are not relevant for compaction - * @param conf hive configuration - * @param storageDescriptor this is the resolved storage descriptor + * @param tmpTableResultLocation result delta dir + * @param tmpTableDeleteResultLocation result delete delta dir * @return list of create/alter queries, always non-null */ private List getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir, - ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) { + ValidWriteIdList writeIds, Path tmpTableResultLocation, Path tmpTableDeleteResultLocation) { List queries = new ArrayList<>(); - long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); - long highWatermark = writeIds.getHighWatermark(); - long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); // create delta temp table String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase; queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null)); @@ -106,15 +110,9 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, if (!alterQuery.isEmpty()) { queries.add(alterQuery); } - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false) - .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) - .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); - Path location = new Path(storageDescriptor.getLocation()); - String tmpTableResultLocation = AcidUtils.baseOrDeltaSubdirPath(location, - options).toString(); // create delta result temp table queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, true, - tmpTableResultLocation)); + tmpTableResultLocation.toString())); // create delete delta temp tables String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase; @@ -124,13 +122,9 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, if (!alterQuery.isEmpty()) { queries.add(alterQuery); } - options = new AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false) - .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); - String tmpTableDeleteResultLocation = AcidUtils.baseOrDeltaSubdirPath(location, - options).toString(); // create delete delta result temp table queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", false, true, - tmpTableDeleteResultLocation)); + tmpTableDeleteResultLocation.toString())); return queries; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 114b6f7a74..4fa68ca6d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -26,9 +25,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -53,9 +50,6 @@ table.getParameters(), false); QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir); - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path baseLocation = new Path(tmpLocation, "_base"); - // Set up the session for driver. HiveConf driverConf = new HiveConf(hiveConf); driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); @@ -64,13 +58,15 @@ // "insert overwrite directory" command if there were no bucketing or list bucketing. String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - List createTableQueries = - getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), - baseLocation.toString()); + Path resultBaseDir = QueryCompactor.Util.getCompactionResultDir( + storageDescriptor, writeIds, driverConf, true, true, false); + + List createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, + resultBaseDir.toString()); List compactionQueries = getCompactionQueries(table, partition, tmpTableName); List dropQueries = getDropQueries(tmpTableName); runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, - createTableQueries, compactionQueries, dropQueries); + Lists.newArrayList(resultBaseDir), createTableQueries, compactionQueries, dropQueries); } /** @@ -81,26 +77,7 @@ @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); - String from = tempTable.getSd().getLocation(); - Path fromPath = new Path(from), toPath = new Path(dest); - FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. - //todo: is that true? can it be aborted? does it matter for compaction? probably OK since - //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) - .statementId(-1).visibilityTxnId(compactorTxnId); - Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); - fs.mkdirs(newBaseDir); - return; - } - LOG.info("Moving contents of " + from + " to " + dest); - fs.rename(fromPath, newBaseDir); - fs.delete(fromPath, true); + Util.cleanupEmptyDir(conf, tmpTableName); } private List getCreateQueries(String tmpTableName, Table table, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index 383891bfad..9dc9b9584a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -26,9 +25,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -51,27 +48,25 @@ "Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table.getTableName()); - AcidUtils.Directory dir = AcidUtils - .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, - Ref.from(false), false, table.getParameters(), false); + AcidUtils.Directory dir = AcidUtils.getAcidState(null, + new Path(storageDescriptor.getLocation()), hiveConf, writeIds, + Ref.from(false), false, table.getParameters(), false); QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir); - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path sourceTabLocation = new Path(tmpLocation); - Path resultTabLocation = new Path(tmpLocation, "_result"); HiveConf driverConf = setUpDriverSession(hiveConf); String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); String resultTmpTableName = tmpTableName + "_result"; + Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, driverConf, + false, false, false); - List createTableQueries = - getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), - sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds); + List createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, dir, + writeIds, resultDeltaDir); List compactionQueries = getCompactionQueries(tmpTableName, resultTmpTableName, table); List dropQueries = getDropQueries(tmpTableName); runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, - createTableQueries, compactionQueries, dropQueries); + Lists.newArrayList(resultDeltaDir), createTableQueries, compactionQueries, dropQueries); } /** @@ -79,26 +74,7 @@ */ @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - org.apache.hadoop.hive.ql.metadata.Table resultTable = - Hive.get().getTable(tmpTableName + "_result"); - String from = resultTable.getSd().getLocation(); - Path fromPath = new Path(from); - Path toPath = new Path(dest); - FileSystem fs = fromPath.getFileSystem(conf); - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false) - .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1) - .visibilityTxnId(compactorTxnId); - Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newDeltaDir); - fs.mkdirs(newDeltaDir); - return; - } - LOG.info("Moving contents of " + from + " to " + dest); - fs.rename(fromPath, newDeltaDir); - fs.delete(fromPath, true); + Util.cleanupEmptyDir(conf, tmpTableName); } /** @@ -114,32 +90,30 @@ * @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result) * @param t Table to compact * @param sd storage descriptor of table or partition to compact - * @param sourceTabLocation location the "source table" (temp table 1) should go - * @param resultTabLocation location the "result table (temp table 2) should go * @param dir the parent directory of delta directories - * @param validWriteIdList valid write ids for the table/partition to compact + * @param writeIds ValidWriteIdList for the table/partition we are compacting + * @param resultDeltaDir the final location for the * @return List of 3 query strings: 2 create table, 1 alter table */ private List getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd, - String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir, - ValidWriteIdList validWriteIdList) { + AcidUtils.Directory dir, ValidWriteIdList writeIds, Path resultDeltaDir) { List queries = Lists.newArrayList( - getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true), - getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false) + getCreateQuery(tmpTableBase, t, sd, null, true), + getCreateQuery(tmpTableBase + "_result", t, sd, resultDeltaDir.toString(), false) ); - String alterQuery = buildAlterTableQuery(tmpTableBase, dir, validWriteIdList); + String alterQuery = buildAlterTableQuery(tmpTableBase, dir, writeIds); if (!alterQuery.isEmpty()) { queries.add(alterQuery); } return queries; } - private String getCreateQuery(String resultTableName, Table t, StorageDescriptor sd, + private String getCreateQuery(String newTableName, Table t, StorageDescriptor sd, String location, boolean isPartitioned) { return new CompactionQueryBuilder( CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY, CompactionQueryBuilder.Operation.CREATE, - resultTableName) + newTableName) .setSourceTab(t) .setStorageDescriptor(sd) .setLocation(location) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 7f3ccfa04e..1f732f906f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -38,7 +39,6 @@ import java.io.IOException; import java.util.List; -import java.util.UUID; /** * Common interface for query based compactions. @@ -82,6 +82,7 @@ protected abstract void commitCompaction(String dest, String tmpTableName, HiveC * @param storageDescriptor this is the resolved storage descriptor. * @param writeIds valid write IDs used to filter rows while they're being read for compaction. * @param compactionInfo provides info about the type of compaction. + * @param resultDirs the delta/base directory that is created as the result of compaction. * @param createQueries collection of queries which creates the temporary tables. * @param compactionQueries collection of queries which uses data from the original table and writes in temporary * tables. @@ -89,8 +90,9 @@ protected abstract void commitCompaction(String dest, String tmpTableName, HiveC * @throws IOException error during the run of the compaction. */ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo, List createQueries, - List compactionQueries, List dropQueries) throws IOException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo, List resultDirs, + List createQueries, List compactionQueries, List dropQueries) + throws IOException { Util.disableLlapCaching(conf); String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true); @@ -118,6 +120,7 @@ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId); } catch (HiveException e) { LOG.error("Error doing query based {} compaction", compactionInfo.isMajorCompaction() ? "major" : "minor", e); + removeResultDirs(resultDirs, conf); throw new IOException(e); } finally { try { @@ -134,17 +137,52 @@ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor } } + /** + * Call in case compaction failed. Removes the new empty compacted delta/base. + * Cleaner would handle this later but clean up now just in case. + */ + private void removeResultDirs(List resultDirPaths, HiveConf conf) throws IOException { + for (Path path : resultDirPaths) { + LOG.info("Compaction failed, removing directory: " + path.toString()); + FileSystem fs = path.getFileSystem(conf); + if (!fs.listFiles(path, false).hasNext()) { + fs.delete(path, true); + } + } + } + /** * Collection of some helper functions. */ static class Util { + /** - * Generate a random tmp path, under the provided storage. - * @param sd storage descriptor, must be not null. - * @return path, always not null + * Get the path of the base, delta, or delete delta directory that will be the final + * destination of the files during compaction. + * + * @param sd storage descriptor of table or partition to compact + * @param writeIds list of valid writeids + * @param conf HiveConf + * @param writingBase if true, we are creating a base directory, otherwise a delta + * @param createDeleteDelta if true, the delta dir we are creating is a delete delta + * @param bucket0 whether to specify 0 as the bucketid + * + * @return Path of new base/delta/delete delta directory */ - static String generateTmpPath(StorageDescriptor sd) { - return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); + static Path getCompactionResultDir(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf, + boolean writingBase, boolean createDeleteDelta, boolean bucket0) { + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWatermark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId) + .writingBase(writingBase).writingDeleteDelta(createDeleteDelta); + if (bucket0) { + options = options.bucket(0); + } + Path location = new Path(sd.getLocation()); + return AcidUtils.baseOrDeltaSubdirPath(location, options); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 6542eef58a..93dd85bd2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -40,7 +40,9 @@ private QueryCompactorFactory() { *
* {@link MinorQueryCompactor} - handles query based minor compaction *
- * {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables + * {@link MmMajorQueryCompactor} - handles query based major compaction for micro-managed tables + *
+ * {@link MmMinorQueryCompactor} - handles query based minor compaction for micro-managed tables *
*

* @param table the table, on which the compaction should be running, must be not null. @@ -55,7 +57,7 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com return new MajorQueryCompactor(); } else if (!compactionInfo.isMajorCompaction() && "tez" .equalsIgnoreCase(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { - // query based minor compaction is only supported on tez + // query based minor compactigenerateAddMmTaskson is only supported on tez return new MinorQueryCompactor(); } }