diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 95fa6641f2..a1ce4ea581 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -84,6 +84,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -826,96 +827,122 @@ public void mmTable() throws Exception { verifyHasBase(table.getSd(), fs, "base_0000002_v0000006"); } - @Test - public void mmTableOriginalsOrc() throws Exception { - mmTableOriginals("ORC"); + @Test public void mmTableOriginalsMajorOrc() throws Exception { + mmTableOriginalsMajor("orc"); } - @Test - public void mmTableOriginalsText() throws Exception { - mmTableOriginals("TEXTFILE"); + @Test public void mmTableOriginalsMajorText() throws Exception { + mmTableOriginalsMajor("textfile"); } - private void mmTableOriginals(String format) throws Exception { - // Originals split won't work due to MAPREDUCE-7086 issue in FileInputFormat. - boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format); + /** + * Major compact an mm table that contains original files. + */ + private void mmTableOriginalsMajor(String format) throws Exception { String dbName = "default"; String tblName = "mm_nonpart"; executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + - format + " TBLPROPERTIES ('transactional'='false')", driver); - IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format + + " TBLPROPERTIES ('transactional'='false')", driver); Table table = msClient.getTable(dbName, tblName); executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); - executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 3); FileSystem fs = FileSystem.get(conf); executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " - + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); verifyFooBarResult(tblName, 3); runMajorCompaction(dbName, tblName); verifyFooBarResult(tblName, 3); verifyHasBase(table.getSd(), fs, "base_0000001_v0000009"); + } + + @Test public void mmMajorOriginalsDeltasOrc() throws Exception { + mmMajorOriginalsDeltas("orc"); + } + + @Ignore("Compaction will fail until HIVE-23072 is resolved") + @Test public void mmMajorOriginalsDeltasText() throws Exception { + mmMajorOriginalsDeltas("textfile"); + } - // Try with an extra delta. + /** + * Major compact an mm table with both original and delta files. + */ + private void mmMajorOriginalsDeltas(String format) throws Exception { + String dbName = "default"; + String tblName = "mm_nonpart"; + + FileSystem fs = FileSystem.get(conf); executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + - format + " TBLPROPERTIES ('transactional'='false')", driver); - table = msClient.getTable(dbName, tblName); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format + + " TBLPROPERTIES ('transactional'='false')", driver); + Table table = msClient.getTable(dbName, tblName); executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); - executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 3); executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " - + "('transactional'='true', 'transactional_properties'='insert_only')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); - - // Neither select nor compaction (which is a select) wil work after this. - if (isBrokenUntilMapreduce7086) return; - - verifyFooBarResult(tblName, 9); - + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); + runMajorCompaction(dbName, tblName); verifyFooBarResult(tblName, 9); - verifyHasBase(table.getSd(), fs, "base_0000002_v0000023"); + verifyHasBase(table.getSd(), fs, "base_0000002_v0000009"); + } + @Test public void mmMajorOriginalsBaseOrc() throws Exception { + mmMajorOriginalsBase("orc"); + } + + @Test public void mmMajorOriginalsBaseText() throws Exception { + mmMajorOriginalsBase("textfile"); + } + + /** + * Insert overwrite and major compact an mm table with only original files. + * + * @param format file format for table + * @throws Exception + */ + private void mmMajorOriginalsBase(String format) throws Exception { // Try with an extra base. + String dbName = "default"; + String tblName = "mm_nonpart"; + FileSystem fs = FileSystem.get(conf); executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + - format + " TBLPROPERTIES ('transactional'='false')", driver); - table = msClient.getTable(dbName, tblName); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format + + " TBLPROPERTIES ('transactional'='false')", driver); + Table table = msClient.getTable(dbName, tblName); executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); - executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); - executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 3); executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " - + "('transactional'='true', 'transactional_properties'='insert_only')", driver); - executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " - + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " + tblName + + " UNION ALL SELECT a,b FROM " + tblName, driver); verifyFooBarResult(tblName, 6); runMajorCompaction(dbName, tblName); verifyFooBarResult(tblName, 6); verifyHasBase(table.getSd(), fs, "base_0000002"); - - msClient.close(); } - @Test public void mmTableBucketed() throws Exception { String dbName = "default"; @@ -1067,8 +1094,8 @@ public void mmTablePartitioned() throws Exception { } - private void verifyFooBarResult(String tblName, int count) throws Exception, IOException { - List valuesReadFromHiveDriver = new ArrayList(); + private void verifyFooBarResult(String tblName, int count) throws Exception { + List valuesReadFromHiveDriver = new ArrayList<>(); executeStatementOnDriver("SELECT a,b FROM " + tblName, driver); driver.getResults(valuesReadFromHiveDriver); Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size()); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 9659a3f048..957f72b665 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -948,12 +948,12 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, hiveConf.set(ValidTxnList.VALID_TXNS_KEY, "8:9223372036854775807::"); // Check for default case. - qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries); + qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries); Assert.assertEquals("all", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT)); // Check for case where hive.llap.io.etl.skip.format is explicitly set to none - as to always use cache. hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none"); - qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries); + qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries); Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 543ec0b991..102304287a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -227,8 +227,9 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor * 2. More generic approach to collecting files in the same logical bucket to compact within the same task * (currently we're using Tez split grouping). */ - QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci); + QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci, dir); if (queryCompactor != null) { + LOG.info("Will compact with " + queryCompactor.getClass().getName()); queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci); return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index 9385080713..069f6dd07a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -26,10 +26,10 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; + import java.io.IOException; import java.util.List; @@ -55,20 +55,14 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - - long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); - long highWaterMark = writeIds.getHighWatermark(); - long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true) - .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) - .maximumWriteId(highWaterMark).statementId(-1).visibilityTxnId(compactorTxnId); - Path tmpTablePath = AcidUtils.baseOrDeltaSubdirPath(new Path(storageDescriptor.getLocation()), options); + Path tmpTablePath = + QueryCompactor.Util.getResultDir(storageDescriptor, writeIds, conf, true, false); List createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString()); List compactionQueries = getCompactionQueries(table, partition, tmpTableName); List dropQueries = getDropQueries(tmpTableName); - runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, - compactionQueries, dropQueries); + runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, + Lists.newArrayList(tmpTablePath), createQueries, compactionQueries, dropQueries); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index 01cd2fc93d..228d71d3f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import com.google.common.collect.Lists; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; @@ -65,14 +66,26 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD String tmpTableName = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis(); - List createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor); + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWatermark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + Path tabLocation = new Path(storageDescriptor.getLocation()); + Path resultDeltaDir = getResultDeltaLocation(conf, minOpenWriteId, highWatermark, + compactorTxnId, tabLocation, false); + Path resultDeleteDeltaDir = getResultDeltaLocation(conf, minOpenWriteId, highWatermark, + compactorTxnId, tabLocation, true); + + List createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, + resultDeltaDir, resultDeleteDeltaDir); List compactionQueries = getCompactionQueries(tmpTableName, writeIds.getInvalidWriteIds()); List dropQueries = getDropQueries(tmpTableName); - runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, + runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, + Lists.newArrayList(resultDeltaDir, resultDeleteDeltaDir), createQueries, compactionQueries, dropQueries); } + @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { @@ -80,6 +93,27 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, Util.cleanupEmptyDir(conf, AcidUtils.DELETE_DELTA_PREFIX + tmpTableName + "_result"); } + /** + * Create the compaction delta or delete delta directory. + * + * @param conf hive configuration + * @param minOpenWriteId minimum open write id + * @param highWatermark high watermark txn id + * @param compactorTxnId transaction id of this compaction + * @param tableLocation directory where the table to compact is stored + * @param writingDeleteDelta whether a compaction delta or delete delta path should be created + * @return Path of the compaction delta or delete delta + */ + private Path getResultDeltaLocation(HiveConf conf, long minOpenWriteId, long highWatermark, + long compactorTxnId, Path tableLocation, boolean writingDeleteDelta) { + + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false) + .writingDeleteDelta(writingDeleteDelta).isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); + + return AcidUtils.baseOrDeltaSubdirPath(tableLocation, options); + } + /** * Get a list of create/alter table queries. These tables serves as temporary data source for query based * minor compaction. The following tables are created: @@ -93,41 +127,29 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, * @param tempTableBase an unique identifier which is used to create delta/delete-delta temp tables * @param dir the directory, where the delta directories resides * @param writeIds list of valid write ids, used to filter out delta directories which are not relevant for compaction - * @param conf hive configuration - * @param storageDescriptor this is the resolved storage descriptor + * @param tmpTableResultLocation result delta dir + * @param tmpTableDeleteResultLocation result delete delta dir * @return list of create/alter queries, always non-null */ private List getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir, - ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) throws HiveException { + ValidWriteIdList writeIds, Path tmpTableResultLocation, Path tmpTableDeleteResultLocation) + throws HiveException { List queries = new ArrayList<>(); - long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); - long highWatermark = writeIds.getHighWatermark(); - long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); // create delta temp table String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase; queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null)); buildAlterTableQuery(tmpTableName, dir, writeIds, false).ifPresent(queries::add); - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false) - .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) - .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); - Path location = new Path(storageDescriptor.getLocation()); - String tmpTableResultLocation = AcidUtils.baseOrDeltaSubdirPath(location, - options).toString(); // create delta result temp table queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, true, - tmpTableResultLocation)); + tmpTableResultLocation.toString())); // create delete delta temp tables String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase; queries.add(buildCreateTableQuery(table, tmpDeleteTableName, true, false, null)); buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true).ifPresent(queries::add); - options = new AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false) - .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); - String tmpTableDeleteResultLocation = AcidUtils.baseOrDeltaSubdirPath(location, - options).toString(); // create delete delta result temp table queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", false, true, - tmpTableDeleteResultLocation)); + tmpTableDeleteResultLocation.toString())); return queries; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorOriginalFileQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorOriginalFileQueryCompactor.java new file mode 100644 index 0000000000..3fc0a24dbf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorOriginalFileQueryCompactor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Runs query based major compaction on insert only tables, without skipping file renames (as + * opposed to {@link MmMajorQueryCompactor}). + * This is necessary because the compaction query (insert into tmp_table select * + * from source_table) would fail if the table contains both original files and a new base + * directory. + * This class can be deleted when HIVE-23072 is resolved. + */ +final class MmMajorOriginalFileQueryCompactor extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MmMajorQueryCompactor.class.getName()); + + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + LOG.debug("Going to delete directories for aborted transactions for MM table " + + table.getDbName() + "." + table.getTableName()); + AcidUtils.Directory dir = AcidUtils + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, + Ref.from(false), false, table.getParameters(), false); + MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); + + String tmpLocation = Util.generateTmpPath(storageDescriptor); + Path baseLocation = new Path(tmpLocation, "_base"); + + // Set up the session for driver. + HiveConf driverConf = new HiveConf(hiveConf); + driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + + // Note: we could skip creating the table and just add table type stuff directly to the + // "insert overwrite directory" command if there were no bucketing or list bucketing. + String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; + String tmpTableName = tmpPrefix + System.currentTimeMillis(); + List createTableQueries = + getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), + baseLocation.toString()); + List compactionQueries = getCompactionQueries(table, partition, tmpTableName); + List dropQueries = getDropQueries(tmpTableName); + runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, + new ArrayList<>(), createTableQueries, compactionQueries, dropQueries); + } + + /** + * Note: similar logic to the main committer; however, no ORC versions and stuff like that. + * @param dest The final directory; basically a SD directory. Not the actual base/delta. + * @param compactorTxnId txn that the compactor started + */ + @Override + protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); + String from = tempTable.getSd().getLocation(); + Path fromPath = new Path(from), toPath = new Path(dest); + FileSystem fs = fromPath.getFileSystem(conf); + long maxTxn = actualWriteIds.getHighWatermark(); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) + .statementId(-1).visibilityTxnId(compactorTxnId); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); + fs.mkdirs(newBaseDir); + return; + } + LOG.info("Moving contents of " + from + " to " + dest); + fs.rename(fromPath, newBaseDir); + fs.delete(fromPath, true); + } + + private List getCreateQueries(String tmpTableName, Table table, + StorageDescriptor storageDescriptor, String baseLocation) { + return Lists.newArrayList(MmQueryCompactorUtils + .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false)); + } + + private List getCompactionQueries(Table t, Partition p, String tmpName) { + String fullName = t.getDbName() + "." + t.getTableName(); + StringBuilder query = new StringBuilder("insert into table " + tmpName + " "); + StringBuilder filter = new StringBuilder(); + if (p != null) { + filter = new StringBuilder(" where "); + List vals = p.getValues(); + List keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i)) + .append("'"); + } + query.append(" select "); + // Use table descriptor for columns. + List cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } + } else { + query.append("select *"); + } + query.append(" from ").append(fullName).append(filter); + return Lists.newArrayList(query.toString()); + } + + private List getDropQueries(String tmpTableName) { + return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index 41fdd7e210..b51cf5b7f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -27,9 +26,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -54,9 +51,6 @@ table.getParameters(), false); MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path baseLocation = new Path(tmpLocation, "_base"); - // Set up the session for driver. HiveConf driverConf = new HiveConf(hiveConf); driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); @@ -65,13 +59,15 @@ // "insert overwrite directory" command if there were no bucketing or list bucketing. String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - List createTableQueries = - getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), - baseLocation.toString()); + Path resultBaseDir = QueryCompactor.Util.getResultDir( + storageDescriptor, writeIds, driverConf, true, true); + + List createTableQueries = getCreateQueries(tmpTableName, table, + partition == null ? table.getSd() : partition.getSd(), resultBaseDir); List compactionQueries = getCompactionQueries(table, partition, tmpTableName); List dropQueries = getDropQueries(tmpTableName); runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo, - createTableQueries, compactionQueries, dropQueries); + Lists.newArrayList(resultBaseDir), createTableQueries, compactionQueries, dropQueries); } /** @@ -82,32 +78,13 @@ @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); - String from = tempTable.getSd().getLocation(); - Path fromPath = new Path(from), toPath = new Path(dest); - FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. - //todo: is that true? can it be aborted? does it matter for compaction? probably OK since - //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) - .statementId(-1).visibilityTxnId(compactorTxnId); - Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); - fs.mkdirs(newBaseDir); - return; - } - LOG.info("Moving contents of " + from + " to " + dest); - fs.rename(fromPath, newBaseDir); - fs.delete(fromPath, true); + Util.cleanupEmptyDir(conf, tmpTableName); } private List getCreateQueries(String tmpTableName, Table table, - StorageDescriptor storageDescriptor, String baseLocation) { - return Lists.newArrayList(MmQueryCompactorUtils - .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false)); + StorageDescriptor storageDescriptor, Path resultBaseDir) { + return Lists.newArrayList(MmQueryCompactorUtils.getCreateQuery(tmpTableName, table, + storageDescriptor, resultBaseDir.toString(), false)); } private List getCompactionQueries(Table t, Partition p, String tmpName) { @@ -116,7 +93,7 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, // 1) Could use fast merge path for ORC and RC. // 2) Didn't have to create a table. - StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " "); + StringBuilder query = new StringBuilder("insert into table " + tmpName + " "); StringBuilder filter = new StringBuilder(); if (p != null) { filter = new StringBuilder(" where "); @@ -143,5 +120,4 @@ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, private List getDropQueries(String tmpTableName) { return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName); } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java index feb667cba9..bb4fdbe79c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -27,9 +26,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -59,22 +56,20 @@ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); - String tmpLocation = Util.generateTmpPath(storageDescriptor); - Path sourceTabLocation = new Path(tmpLocation); - Path resultTabLocation = new Path(tmpLocation, "_result"); HiveConf driverConf = setUpDriverSession(hiveConf); String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_"; String tmpTableBase = tmpPrefix + System.currentTimeMillis(); + StorageDescriptor sd = partition == null ? table.getSd() : partition.getSd(); + Path resultDeltaDir = QueryCompactor.Util.getResultDir(sd, writeIds, driverConf, false, false); List createTableQueries = - getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(), - sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds); + getCreateQueries(tmpTableBase, table, sd, dir, writeIds, resultDeltaDir); List compactionQueries = getCompactionQueries(tmpTableBase, table.getSd()); List dropQueries = getDropQueries(tmpTableBase); runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo, - createTableQueries, compactionQueries, dropQueries); + Lists.newArrayList(resultDeltaDir), createTableQueries, compactionQueries, dropQueries); } /** @@ -82,26 +77,7 @@ */ @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - org.apache.hadoop.hive.ql.metadata.Table resultTable = - Hive.get().getTable(tmpTableName + "_result"); - String from = resultTable.getSd().getLocation(); - Path fromPath = new Path(from); - Path toPath = new Path(dest); - FileSystem fs = fromPath.getFileSystem(conf); - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false) - .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1) - .visibilityTxnId(compactorTxnId); - Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newDeltaDir); - fs.mkdirs(newDeltaDir); - return; - } - LOG.info("Moving contents of " + from + " to " + dest); - fs.rename(fromPath, newDeltaDir); - fs.delete(fromPath, true); + Util.cleanupEmptyDir(conf, tmpTableName); } /** @@ -117,21 +93,18 @@ * @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result) * @param t Table to compact * @param sd storage descriptor of table or partition to compact - * @param sourceTabLocation location the "source table" (temp table 1) should go - * @param resultTabLocation location the "result table (temp table 2) should go * @param dir the parent directory of delta directories - * @param validWriteIdList valid write ids for the table/partition to compact + * @param writeIds ValidWriteIdList for the table/partition we are compacting * @return List of 3 query strings: 2 create table, 1 alter table */ private List getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd, - String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir, - ValidWriteIdList validWriteIdList) { + AcidUtils.Directory dir, ValidWriteIdList writeIds, Path resultDeltaDir) { List queries = new ArrayList<>(); - queries.add( - MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true)); - buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add); + queries.add(MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, null, true)); + buildAlterTableQuery(tmpTableBase, dir, writeIds).ifPresent(queries::add); + queries.add(MmQueryCompactorUtils - .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false)); + .getCreateQuery(tmpTableBase + "_result", t, sd, resultDeltaDir.toString(), false)); return queries; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java index 891696dba7..064e4a76ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java @@ -59,16 +59,12 @@ private MmQueryCompactorUtils() {} * @param sd StorageDescriptor of the table or partition we are modeling the new table on * @param location of the new table * @param isPartitioned should the new table be partitioned - * @param isExternal should the new table be external * @return query string creating the new table */ static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd, - String location, boolean isPartitioned, boolean isExternal) { - StringBuilder query = new StringBuilder("create temporary "); - if (isExternal) { - query.append("external "); - } - query.append("table ").append(fullName).append("("); + String location, boolean isPartitioned) { + StringBuilder query = new StringBuilder("create temporary external table ") + .append(fullName).append("("); List cols = sourceTab.getSd().getCols(); boolean isFirst = true; for (FieldSchema col : cols) { @@ -132,8 +128,13 @@ static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor ShowCreateTableOperation.appendSerdeParams(query, serdeParams); } query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat())) - .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())) - .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); + .append("' OUTPUTFORMAT '") + .append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("'"); + + if (location != null && !location.isEmpty()) { + query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + } + query.append(" TBLPROPERTIES ("); // Exclude all standard table properties. Set excludes = getHiveMetastoreConstants(); excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 7a9e48ff1e..431b0c0f11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -89,8 +91,9 @@ protected abstract void commitCompaction(String dest, String tmpTableName, HiveC * @throws IOException error during the run of the compaction. */ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo, List createQueries, - List compactionQueries, List dropQueries) throws IOException { + ValidWriteIdList writeIds, CompactionInfo compactionInfo, List resultDirs, + List createQueries, List compactionQueries, List dropQueries) + throws IOException { Util.disableLlapCaching(conf); String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true); @@ -118,6 +121,7 @@ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId); } catch (HiveException e) { LOG.error("Error doing query based {} compaction", compactionInfo.isMajorCompaction() ? "major" : "minor", e); + removeResultDirs(resultDirs, conf); throw new IOException(e); } finally { try { @@ -134,6 +138,20 @@ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor } } + /** + * todo rename and javadoc + * Cleaner would handle this later but clean up now just in case + */ + private void removeResultDirs(List resultDirPaths, HiveConf conf) throws IOException { + for (Path path : resultDirPaths) { + LOG.info("Compaction failed, removing directory: " + path.toString()); + FileSystem fs = path.getFileSystem(conf); + if (!fs.listFiles(path, false).hasNext()) { + fs.delete(path, true); + } + } + } + /** * Collection of some helper functions. */ @@ -147,6 +165,22 @@ static String generateTmpPath(StorageDescriptor sd) { return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); } + static Path getResultDir(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf, + boolean writingBase, boolean bucket0) { + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWatermark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(writingBase).isCompressed(false) + .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1) + .visibilityTxnId(compactorTxnId); + if (bucket0) { + options = options.bucket(0); + } + Path location = new Path(sd.getLocation()); + return AcidUtils.baseOrDeltaSubdirPath(location, options); + } + /** * Unless caching is explicitly required for ETL queries this method disables it. * LLAP cache content lookup is file based, and since compaction alters the file structure it is not beneficial to diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 6542eef58a..1b563ff5a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -40,15 +40,23 @@ private QueryCompactorFactory() { *
* {@link MinorQueryCompactor} - handles query based minor compaction *
- * {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables + * {@link MmMajorQueryCompactor} - handles query based major compaction for micro-managed tables + * with no original files. Contains no file renames (which is less costly for object storage). + *
+ * {@link MmMajorOriginalFileQueryCompactor} - handles query based major compaction for + * micro-managed tables with only original files. Contains file renames. + *
+ * {@link MmMinorQueryCompactor} - handles query based minor compaction for micro-managed tables. Contains no file renames (which is less costly for object storage). *
*

* @param table the table, on which the compaction should be running, must be not null. * @param configuration the hive configuration, must be not null. * @param compactionInfo provides insight about the type of compaction, must be not null. + * @param dir AcidDirectory of the table * @return {@link QueryCompactor} or null. */ - static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, CompactionInfo compactionInfo) { + static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, + CompactionInfo compactionInfo, AcidUtils.Directory dir) { if (!AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf .getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { if (compactionInfo.isMajorCompaction()) { @@ -63,7 +71,19 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com if (AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf .getBoolVar(configuration, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { if (compactionInfo.isMajorCompaction()) { - return new MmMajorQueryCompactor(); + // todo once HIVE-23072 is fixed, replace block with: return new MmMajorQueryCompactor(); + // todo and remove MmMajorOriginalFileQueryCompactor as it won't be needed + // no original files + if (dir.getOriginalFiles().size() == 0 || isStoredAsOrc(table)) { + return new MmMajorQueryCompactor(); + } + // only original files, non-orc table + if (dir.getCurrentDirectories().size() == 0) { + return new MmMajorOriginalFileQueryCompactor(); + } + // mix of original and delta dirs, non-orc table + throw new RuntimeException("Major compaction cannot be run on mix of delta dirs and " + + "original files because of HIVE-23072"); } else { return new MmMinorQueryCompactor(); } @@ -72,4 +92,8 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com return null; } + private static boolean isStoredAsOrc(Table table) { + return table.getSd().getOutputFormat() + .equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); + } }