queries = new ArrayList<>();
+ // create delta temp table
+ String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase;
+ queries.add(buildCreateTableQuery(table, tmpTableName, true, true, false));
+ buildAlterTableQuery(tmpTableName, dir, writeIds, false).ifPresent(queries::add);
+ // create delta result temp table
+ queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, false, true));
+
+ // create delete delta temp tables
+ String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase;
+ queries.add(buildCreateTableQuery(table, tmpDeleteTableName, true, true, false));
+ buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true).ifPresent(queries::add);
+ // create delete delta result temp table
+ queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", false, false, true));
+ return queries;
+ }
+
+ /**
+ * Helper method, which builds a create table query. The create query is customized based on the input arguments, but
+ * the schema of the table is the same as an ORC ACID file schema.
+ * @param table he source table, where the compaction is running on
+ * @param newTableName name of the table to be created
+ * @param isExternal true, if new table should be external
+ * @param isPartitioned true, if new table should be partitioned
+ * @param isBucketed true, if the new table should be bucketed
+ * @return a create table statement, always non-null. Example:
+ *
+ * if source table schema is: (a:int, b:int)
+ *
+ * the corresponding create statement is:
+ *
+ * CREATE TEMPORARY EXTERNAL TABLE tmp_table (`operation` int, `originalTransaction` bigint, `bucket` int,
+ * `rowId` bigint, `currentTransaction` bigint, `row` struct<`a` :int, `b` :int> PARTITIONED BY (`file_name` string)
+ * STORED AS ORC TBLPROPERTIES ('transactional'='false','queryminorcomp'='true');
+ *
+ */
+ private String buildCreateTableQuery(Table table, String newTableName, boolean isExternal, boolean isPartitioned,
+ boolean isBucketed) {
+ StringBuilder query = new StringBuilder("create temporary ");
+ if (isExternal) {
+ query.append("external ");
+ }
+ query.append("table ").append(newTableName).append(" (");
+ // Acid virtual columns
+ query.append(
+ "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, "
+ + "`row` struct<");
+ List cols = table.getSd().getCols();
+ boolean isFirst = true;
+ // Actual columns
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
+ }
+ query.append(">)");
+ if (isPartitioned) {
+ query.append(" partitioned by (`file_name` string)");
+ }
+ int bucketingVersion = 0;
+ if (isBucketed) {
+ int numBuckets = 1;
+ try {
+ org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(table.getDbName(), table.getTableName());
+ numBuckets = Math.max(t.getNumBuckets(), numBuckets);
+ bucketingVersion = t.getBucketingVersion();
+ } catch (HiveException e) {
+ LOG.info("Error finding table {}. Minor compaction result will use 0 buckets.", table.getTableName());
+ } finally {
+ query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, `originalTransaction`, `rowId`)")
+ .append(" into ").append(numBuckets).append(" buckets");
+ }
+ }
+
+ query.append(" stored as orc");
+ query.append(" tblproperties ('transactional'='false'");
+ query.append(", 'queryminorcomp'='true'");
+ if (isBucketed) {
+ query.append(", 'bucketing_version'='")
+ .append(bucketingVersion)
+ .append("')");
+ } else {
+ query.append(")");
+ }
+ return query.toString();
+ }
+
+ /**
+ * Builds an alter table query, which adds partitions pointing to location of delta directories.
+ * @param tableName name of the to be altered table
+ * @param dir the parent directory of delta directories
+ * @param validWriteIdList list of valid write IDs
+ * @param isDeleteDelta if true, only the delete delta directories will be mapped as new partitions, otherwise only
+ * the delta directories
+ * @return alter table statement wrapped in {@link Optional}.
+ */
+ private Optional buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
+ ValidWriteIdList validWriteIdList, boolean isDeleteDelta) {
+ // add partitions
+ if (!dir.getCurrentDirectories().isEmpty()) {
+ long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
+ long highWatermark = validWriteIdList.getHighWatermark();
+ List deltas = dir.getCurrentDirectories().stream().filter(
+ delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark
+ && delta.getMinWriteId() >= minWriteID)
+ .collect(Collectors.toList());
+ if (!deltas.isEmpty()) {
+ StringBuilder query = new StringBuilder().append("alter table ").append(tableName);
+ query.append(" add ");
+ deltas.forEach(
+ delta -> query.append("partition (file_name='").append(delta.getPath().getName()).append("') location '")
+ .append(delta.getPath()).append("' "));
+ return Optional.of(query.toString());
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Get a list of compaction queries which fills up the delta/delete-delta temporary result tables.
+ * @param tmpTableBase an unique identifier, which helps to find all the temporary tables
+ * @param invalidWriteIds list of invalid write IDs. This list is used to filter out aborted/open transactions
+ * @return list of compaction queries, always non-null
+ */
+ private List getCompactionQueries(String tmpTableBase, long[] invalidWriteIds) {
+ List queries = new ArrayList<>();
+ String sourceTableName = AcidUtils.DELTA_PREFIX + tmpTableBase;
+ String resultTableName = sourceTableName + "_result";
+ queries.add(buildCompactionQuery(sourceTableName, resultTableName, invalidWriteIds));
+ String sourceDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase;
+ String resultDeleteTableName = sourceDeleteTableName + "_result";
+ queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, invalidWriteIds));
+ return queries;
+ }
+
+ /**
+ * Build a minor compaction query. A compaction query selects the content of the source temporary table and inserts
+ * it into the result table, filtering out all rows which belong to open/aborted transactions.
+ * @param sourceTableName the name of the source table
+ * @param resultTableName the name of the result table
+ * @param invalidWriteIds list of invalid write IDs
+ * @return compaction query, always non-null
+ */
+ private String buildCompactionQuery(String sourceTableName, String resultTableName, long[] invalidWriteIds) {
+ StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName)
+ .append(" select `operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from ")
+ .append(sourceTableName);
+ if (invalidWriteIds.length > 0) {
+ query.append(" where `originalTransaction` not in (")
+ .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")).append(")");
+ }
+
+ return query.toString();
+ }
+
+ /**
+ * Get list of drop table statements.
+ * @param tmpTableBase an unique identifier, which helps to find all the tables used in query based minor compaction
+ * @return list of drop table statements, always non-null
+ */
+ private List getDropQueries(String tmpTableBase) {
+ List queries = new ArrayList<>();
+ String dropStm = "drop table if exists ";
+ queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase);
+ queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase);
+ queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase + "_result");
+ queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result");
+ return queries;
+ }
+
+ /**
+ * Creates the delta directory and moves the result files.
+ * @param deltaTableName name of the temporary table, where the results are stored
+ * @param dest destination path, where the result should be moved
+ * @param isDeleteDelta is the destination a delete delta directory
+ * @param conf hive configuration
+ * @param actualWriteIds list of valid write Ids
+ * @param compactorTxnId transaction Id of the compaction
+ * @throws HiveException the result files cannot be moved
+ * @throws IOException the destination delta directory cannot be created
+ */
+ private void commitCompaction(String deltaTableName, String dest, boolean isDeleteDelta, HiveConf conf,
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws HiveException, IOException {
+ org.apache.hadoop.hive.ql.metadata.Table deltaTable = Hive.get().getTable(deltaTableName);
+ Util.moveContents(new Path(deltaTable.getSd().getLocation()), new Path(dest), false, isDeleteDelta, conf,
+ actualWriteIds, compactorTxnId);
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 9b8420902f..7c92288b66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.conf.Configuration;
+import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -25,7 +25,6 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -35,14 +34,12 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.util.DirectionUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.Ref;
@@ -61,12 +58,11 @@
/**
* Class responsible to run query based major compaction on insert only tables.
*/
-class MmMajorQueryCompactor extends QueryCompactor {
+final class MmMajorQueryCompactor extends QueryCompactor {
private static final Logger LOG = LoggerFactory.getLogger(MmMajorQueryCompactor.class.getName());
- @Override
- void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table
.getTableName());
@@ -82,52 +78,58 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD
return;
}
- try {
- String tmpLocation = Util.generateTmpPath(storageDescriptor);
- Path baseLocation = new Path(tmpLocation, "_base");
+ if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) {
+ return;
+ }
- // Set up the session for driver.
- HiveConf driverConf = new HiveConf(hiveConf);
- driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
- driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused
- //thinking it already has a txn opened
+ String tmpLocation = Util.generateTmpPath(storageDescriptor);
+ Path baseLocation = new Path(tmpLocation, "_base");
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true);
+ // Set up the session for driver.
+ HiveConf driverConf = new HiveConf(hiveConf);
+ driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
- // Note: we could skip creating the table and just add table type stuff directly to the
- // "insert overwrite directory" command if there were no bucketing or list bucketing.
- String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_";
- String tmpTableName;
- while (true) {
- tmpTableName = tmpPrefix + System.currentTimeMillis();
- String query =
- buildMmCompactionCtQuery(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
- baseLocation.toString());
- LOG.info("Compacting a MM table into " + query);
- try {
- DriverUtils.runOnDriver(driverConf, user, sessionState, query);
- break;
- } catch (Exception ex) {
- Throwable cause = ex;
- while (cause != null && !(cause instanceof AlreadyExistsException)) {
- cause = cause.getCause();
- }
- if (cause == null) {
- throw new IOException(ex);
- }
- }
- }
- String query = buildMmCompactionQuery(table, partition, tmpTableName);
- LOG.info("Compacting a MM table via " + query);
- long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(hiveConf);
- DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
- commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId);
- DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName);
- } catch (HiveException e) {
- LOG.error("Error compacting a MM table", e);
- throw new IOException(e);
+ // Note: we could skip creating the table and just add table type stuff directly to the
+ // "insert overwrite directory" command if there were no bucketing or list bucketing.
+ String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_";
+ String tmpTableName = tmpPrefix + System.currentTimeMillis();
+ List createTableQueries =
+ getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
+ baseLocation.toString());
+ List compactionQueries = getCompactionQueries(table, partition, tmpTableName);
+ List dropQueries = getDropQueries(tmpTableName);
+ runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
+ createTableQueries, compactionQueries, dropQueries);
+ }
+
+ /**
+ * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
+ * @param dest The final directory; basically a SD directory. Not the actual base/delta.
+ * @param compactorTxnId txn that the compactor started
+ */
+ @Override
+ protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
+ org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
+ String from = tempTable.getSd().getLocation();
+ Path fromPath = new Path(from), toPath = new Path(dest);
+ FileSystem fs = fromPath.getFileSystem(conf);
+ // Assume the high watermark can be used as maximum transaction ID.
+ //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
+ //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
+ long maxTxn = actualWriteIds.getHighWatermark();
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
+ .statementId(-1).visibilityTxnId(compactorTxnId);
+ Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+ if (!fs.exists(fromPath)) {
+ LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
+ fs.mkdirs(newBaseDir);
+ return;
}
+ LOG.info("Moving contents of " + from + " to " + dest);
+ fs.rename(fromPath, newBaseDir);
+ fs.delete(fromPath, true);
}
// Remove the directories for aborted transactions only
@@ -145,7 +147,7 @@ private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throw
}
}
- private String buildMmCompactionCtQuery(String fullName, Table t, StorageDescriptor sd, String location) {
+ private List getCreateQueries(String fullName, Table t, StorageDescriptor sd, String location) {
StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("(");
List cols = t.getSd().getCols();
boolean isFirst = true;
@@ -229,11 +231,11 @@ private String buildMmCompactionCtQuery(String fullName, Table t, StorageDescrip
query.append(", ");
}
query.append("'transactional'='false')");
- return query.toString();
+ return Lists.newArrayList(query.toString());
}
- private String buildMmCompactionQuery(Table t, Partition p, String tmpName) {
+ private List getCompactionQueries(Table t, Partition p, String tmpName) {
String fullName = t.getDbName() + "." + t.getTableName();
// ideally we should make a special form of insert overwrite so that we:
// 1) Could use fast merge path for ORC and RC.
@@ -260,40 +262,11 @@ private String buildMmCompactionQuery(Table t, Partition p, String tmpName) {
query.append("select *");
}
query.append(" from ").append(fullName).append(filter);
- return query.toString();
+ return Lists.newArrayList(query.toString());
}
- /**
- * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
- * @param from The temp directory used for compactor output. Not the actual base/delta.
- * @param to The final directory; basically a SD directory. Not the actual base/delta.
- * @param compactorTxnId txn that the compactor started
- */
- private void commitMmCompaction(String from, String to, Configuration conf, ValidWriteIdList actualWriteIds,
- long compactorTxnId) throws IOException {
- Path fromPath = new Path(from), toPath = new Path(to);
- FileSystem fs = fromPath.getFileSystem(conf);
- // Assume the high watermark can be used as maximum transaction ID.
- //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
- //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
- long maxTxn = actualWriteIds.getHighWatermark();
- AcidOutputFormat.Options options =
- new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
- .statementId(-1).visibilityTxnId(compactorTxnId);
- Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
- if (!fs.exists(fromPath)) {
- LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
- fs.mkdirs(newBaseDir);
- return;
- }
- LOG.info("Moving contents of " + from + " to " + to);
- FileStatus[] children = fs.listStatus(fromPath);
- if (children.length != 1) {
- throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
- }
- FileStatus dirPath = children[0];
- fs.rename(dirPath.getPath(), newBaseDir);
- fs.delete(fromPath, true);
+ private List getDropQueries(String tmpTableName) {
+ return Lists.newArrayList("drop table if exists " + tmpTableName);
}
private static Set getHiveMetastoreConstants() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 1eab5b888d..9896df3bb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -17,18 +17,31 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -53,6 +66,78 @@
abstract void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException;
+ /**
+ * This is the final step of the compaction, which can vary based on compaction type. Usually this involves some file
+ * operation.
+ * @param dest The final directory; basically an SD directory.
+ * @param tmpTableName The name of the temporary table.
+ * @param conf hive configuration.
+ * @param actualWriteIds valid write Ids used to fetch the high watermark Id.
+ * @param compactorTxnId transaction, that the compacter started.
+ * @throws IOException failed to execute file system operation.
+ * @throws HiveException failed to execute file operation within hive.
+ */
+ protected abstract void commitCompaction(String dest, String tmpTableName, HiveConf conf,
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException;
+
+ /**
+ * Run all the queries which performs the compaction.
+ * @param conf hive configuration, must be not null.
+ * @param tmpTableName The name of the temporary table.
+ * @param storageDescriptor this is the resolved storage descriptor.
+ * @param writeIds valid write IDs used to filter rows while they're being read for compaction.
+ * @param compactionInfo provides info about the type of compaction.
+ * @param createQueries collection of queries which creates the temporary tables.
+ * @param compactionQueries collection of queries which uses data from the original table and writes in temporary
+ * tables.
+ * @param dropQueries queries which drops the temporary tables.
+ * @throws IOException error during the run of the compaction.
+ */
+ protected void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor,
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo, List createQueries,
+ List compactionQueries, List dropQueries) throws IOException {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true);
+ long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
+ try {
+ for (String query : createQueries) {
+ try {
+ LOG.info("Running {} compaction query into temp table with query: {}",
+ compactionInfo.isMajorCompaction() ? "major" : "minor", query);
+ DriverUtils.runOnDriver(conf, user, sessionState, query);
+ } catch (Exception ex) {
+ Throwable cause = ex;
+ while (cause != null && !(cause instanceof AlreadyExistsException)) {
+ cause = cause.getCause();
+ }
+ if (cause == null) {
+ throw new IOException(ex);
+ }
+ }
+ }
+ for (String query : compactionQueries) {
+ LOG.info("Running {} compaction via query: {}", compactionInfo.isMajorCompaction() ? "major" : "minor", query);
+ DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
+ }
+ commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId);
+ } catch (HiveException e) {
+ LOG.error("Error doing query based {} compaction", compactionInfo.isMajorCompaction() ? "major" : "minor", e);
+ throw new IOException(e);
+ } finally {
+ try {
+ for (String query : dropQueries) {
+ LOG.info("Running {} compaction query into temp table with query: {}",
+ compactionInfo.isMajorCompaction() ? "major" : "minor", query);
+ DriverUtils.runOnDriver(conf, user, sessionState, query);
+ }
+ } catch (HiveException e) {
+ LOG.error("Unable to drop temp table {} which was created for running {} compaction", tmpTableName,
+ compactionInfo.isMajorCompaction() ? "major" : "minor");
+ LOG.error(ExceptionUtils.getStackTrace(e));
+ }
+ }
+ }
+
/**
* Collection of some helper functions.
*/
@@ -120,5 +205,76 @@ public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor s
static String generateTmpPath(StorageDescriptor sd) {
return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
}
+
+ /**
+ * Check whether the result directory exits and contains compacted result files. If no splits are found, create
+ * an empty directory at the destination path, matching a base/delta directory naming convention.
+ * @param sourcePath the checked source location
+ * @param destPath the destination, where the new directory should be created
+ * @param isMajorCompaction is called from a major compaction
+ * @param isDeleteDelta is the output used as delete delta directory
+ * @param conf hive configuration
+ * @param validWriteIdList maximum transaction id
+ * @return true, if the check was successful
+ * @throws IOException the new directory cannot be created
+ */
+ private static boolean resultHasSplits(Path sourcePath, Path destPath, boolean isMajorCompaction,
+ boolean isDeleteDelta, HiveConf conf, ValidWriteIdList validWriteIdList) throws IOException {
+ FileSystem fs = sourcePath.getFileSystem(conf);
+ long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
+ long highWatermark = validWriteIdList.getHighWatermark();
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta)
+ .isCompressed(false).minimumWriteId(minOpenWriteId)
+ .maximumWriteId(highWatermark).bucket(0).statementId(-1);
+ Path newDeltaDir = AcidUtils.createFilename(destPath, options).getParent();
+ if (!fs.exists(sourcePath)) {
+ LOG.info("{} not found. Assuming 0 splits. Creating {}", sourcePath, newDeltaDir);
+ fs.mkdirs(newDeltaDir);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Create the base/delta directory matching the naming conventions and move the result files of the compaction
+ * into it.
+ * @param sourcePath location of the result files
+ * @param destPath destination path of the result files, without the base/delta directory
+ * @param isMajorCompaction is this called from a major compaction
+ * @param isDeleteDelta is the destination is a delete delta directory
+ * @param conf hive configuration
+ * @param validWriteIdList list of valid write Ids
+ * @param compactorTxnId transaction Id of the compaction
+ * @throws IOException the destination directory cannot be created
+ * @throws HiveException the result files cannot be moved to the destination directory
+ */
+ static void moveContents(Path sourcePath, Path destPath, boolean isMajorCompaction, boolean isDeleteDelta,
+ HiveConf conf, ValidWriteIdList validWriteIdList, long compactorTxnId) throws IOException, HiveException {
+ if (!resultHasSplits(sourcePath, destPath, isMajorCompaction, isDeleteDelta, conf, validWriteIdList)) {
+ return;
+ }
+ LOG.info("Moving contents of {} to {}", sourcePath, destPath);
+ FileSystem fs = sourcePath.getFileSystem(conf);
+ long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
+ long highWatermark = validWriteIdList.getHighWatermark();
+ for (FileStatus fileStatus : fs.listStatus(sourcePath)) {
+ String originalFileName = fileStatus.getPath().getName();
+ if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
+ Optional bucketId = AcidUtils.parseBucketIdFromRow(fs, fileStatus.getPath());
+ if (bucketId.isPresent()) {
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta)
+ .isCompressed(false).minimumWriteId(minOpenWriteId)
+ .maximumWriteId(highWatermark).bucket(bucketId.get()).statementId(-1)
+ .visibilityTxnId(compactorTxnId);
+ Path finalBucketFile = AcidUtils.createFilename(destPath, options);
+ Hive.moveFile(conf, fileStatus.getPath(), finalBucketFile, true, false, false);
+ }
+ }
+ }
+ fs.delete(sourcePath, true);
+ }
+
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
index 41cb4b64fb..2f2bb21a13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -27,11 +27,22 @@
*/
final class QueryCompactorFactory {
+ /**
+ * Factory class, no need to expose constructor.
+ */
private QueryCompactorFactory() {
}
/**
- * Get an instance of {@link QueryCompactor}.
+ * Get an instance of {@link QueryCompactor}. At the moment the following implementors can be fetched:
+ *
+ * {@link MajorQueryCompactor} - handles query based major compaction
+ *
+ * {@link MinorQueryCompactor} - handles query based minor compaction
+ *
+ * {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables
+ *
+ *
* @param table the table, on which the compaction should be running, must be not null.
* @param configuration the hive configuration, must be not null.
* @param compactionInfo provides insight about the type of compaction, must be not null.
@@ -42,8 +53,10 @@ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, Com
.getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
if (compactionInfo.isMajorCompaction()) {
return new MajorQueryCompactor();
- } else {
- throw new RuntimeException("Query based compaction is not currently supported for minor compactions");
+ } else if (!compactionInfo.isMajorCompaction() && "tez"
+ .equalsIgnoreCase(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
+ // query based minor compaction is only supported on tez
+ return new MinorQueryCompactor();
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index d4c9121c9f..dfd63a1b38 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -506,11 +506,12 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
- // Initiate a minor compaction request on the table.
+ // Initiate a major compaction request on the table.
runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'");
// Run worker.
runWorker(hiveConf);
+ verifyDirAndResult(2, true);
// Run Cleaner.
runCleaner(hiveConf);
@@ -519,6 +520,7 @@ public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception
TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
+ verifyDirAndResult(0, true);
}
@Test
@@ -612,19 +614,33 @@ public void testSnapshotIsolationWithAbortedTxnOnMmTable() throws Exception {
}
private void verifyDirAndResult(int expectedDeltas) throws Exception {
+ verifyDirAndResult(expectedDeltas, false);
+ }
+
+ private void verifyDirAndResult(int expectedDeltas, boolean expectBaseDir) throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
// Verify the content of subdirs
FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(TableExtended.MMTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER);
int sawDeltaTimes = 0;
+ int sawBaseTimes = 0;
for (int i = 0; i < status.length; i++) {
- Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
- sawDeltaTimes++;
- FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(1, files.length);
- Assert.assertTrue(files[0].getPath().getName().equals("000000_0"));
+ if (status[i].getPath().getName().matches("delta_.*")) {
+ sawDeltaTimes++;
+ FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+ Assert.assertEquals(1, files.length);
+ Assert.assertEquals("000000_0", files[0].getPath().getName());
+ } else {
+ sawBaseTimes++;
+ }
}
+
Assert.assertEquals(expectedDeltas, sawDeltaTimes);
+ if (expectBaseDir) {
+ Assert.assertEquals("1 base directory expected", 1, sawBaseTimes);
+ } else {
+ Assert.assertEquals("0 base directories expected", 0, sawBaseTimes);
+ }
// Verify query result
int [][] resultData = new int[][] {{1,2}, {3,4}};
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java
index d6435342aa..1599548da0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hive.ql;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.Before;
-import org.junit.Test;
/**
* Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by