diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 0f1579aa54..ee2c0f3e23 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -17,31 +17,24 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.HashSet; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.regex.Matcher; import java.util.stream.Collectors; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -50,19 +43,13 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.DriverUtils; -import org.apache.hadoop.hive.ql.ddl.table.creation.ShowCreateTableOperation; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -71,10 +58,6 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.util.DirectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -95,9 +78,7 @@ import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.Ref; import org.apache.parquet.Strings; import org.apache.thrift.TException; @@ -238,26 +219,16 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor } /** - * Run major compaction in a HiveQL query (compaction for MM tables handled in runMmCompaction method). - * TODO: + * Run major compaction in a HiveQL query (compaction for MM tables handled in {@link MmMajorQueryCompactor} + * class). + * Find a better way: * 1. A good way to run minor compaction (currently disabled when this config is enabled) * 2. More generic approach to collecting files in the same logical bucket to compact within the same task * (currently we're using Tez split grouping). */ - if (!AcidUtils.isInsertOnlyTable(t.getParameters()) && HiveConf.getBoolVar(conf, - ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { - if (ci.isMajorCompaction()) { - runCrudCompaction(conf, t, p, sd, writeIds, ci); - return; - } else { - throw new RuntimeException("Query based compaction is not currently supported for minor compactions"); - } - } - - if (AcidUtils.isInsertOnlyTable(t.getParameters())) { - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { - runMmCompaction(conf, t, p, sd, writeIds, ci); - } + QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci); + if (queryCompactor != null) { + queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci); return; } @@ -334,82 +305,6 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor su.gatherStats(); } - /** - * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition) - * @param writeIds (valid write ids used to filter rows while they're being read for compaction) - * @throws IOException - */ - private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, - CompactionInfo ci) throws IOException { - AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); - AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, Ref.from( - false), false, t.getParameters(), false); - - if (!isEnoughToCompact(dir, sd)) { - return; - } - - String user = UserGroupInformation.getCurrentUser().getShortUserName(); - SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true); - // Set up the session for driver. - HiveConf conf = new HiveConf(hiveConf); - conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); - /** - * For now, we will group splits on tez so that we end up with all bucket files, - * with same bucket number in one map task. - */ - conf.set(ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); - String tmpPrefix = t.getDbName() + "_tmp_compactor_" + t.getTableName() + "_"; - String tmpTableName = tmpPrefix + System.currentTimeMillis(); - long compactorTxnId = CompactorMap.getCompactorTxnId(conf); - try { - // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234 - String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, t, sd); - LOG.info("Running major compaction query into temp table with create definition: {}", query); - try { - DriverUtils.runOnDriver(conf, user, sessionState, query); - } catch (Exception ex) { - Throwable cause = ex; - while (cause != null && !(cause instanceof AlreadyExistsException)) { - cause = cause.getCause(); - } - if (cause == null) { - throw new IOException(ex); - } - } - query = buildCrudMajorCompactionQuery(conf, t, p, tmpTableName); - LOG.info("Running major compaction via query: {}", query); - /** - * This will create bucket files like: - * db/db_tmp_compactor_tbl_1234/00000_0 - * db/db_tmp_compactor_tbl_1234/00001_0 - */ - DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId); - /** - * This achieves a final layout like (wid is the highest valid write id for this major compaction): - * db/tbl/ptn/base_wid/bucket_00000 - * db/tbl/ptn/base_wid/bucket_00001 - */ - org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); - String tmpLocation = tempTable.getSd().getLocation(); - commitCrudMajorCompaction(t, tmpLocation, tmpTableName, sd.getLocation(), conf, writeIds, compactorTxnId); - } catch (HiveException e) { - LOG.error("Error doing query based major compaction", e); - throw new IOException(e); - } finally { - try { - DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName); - } catch (HiveException e) { - LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName); - LOG.error(ExceptionUtils.getStackTrace(e)); - } - } - } - - private static boolean isEnoughToCompact(AcidUtils.Directory dir, StorageDescriptor sd) { - return isEnoughToCompact(true, dir, sd); - } - private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) { int deltaCount = dir.getCurrentDirectories().size(); int origCount = dir.getOriginalFiles().size(); @@ -442,308 +337,9 @@ private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Di return isEnoughToCompact; } - private void runMmCompaction(HiveConf conf, Table t, Partition p, - StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { - LOG.debug("Going to delete directories for aborted transactions for MM table " - + t.getDbName() + "." + t.getTableName()); - AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), - false, t.getParameters(), false); - removeFilesForMmTable(conf, dir); - - // Then, actually do the compaction. - if (!ci.isMajorCompaction()) { - // Not supported for MM tables right now. - LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction"); - return; - } - - if (!isEnoughToCompact(dir, sd)) { - return; - } - - try { - String tmpLocation = generateTmpPath(sd); - Path baseLocation = new Path(tmpLocation, "_base"); - - // Set up the session for driver. - HiveConf driverConf = new HiveConf(conf); - driverConf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); - driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused - //thinking it already has a txn opened - - String user = UserGroupInformation.getCurrentUser().getShortUserName(); - SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true); - - // Note: we could skip creating the table and just add table type stuff directly to the - // "insert overwrite directory" command if there were no bucketing or list bucketing. - String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_"; - String tmpTableName = null; - while (true) { - tmpTableName = tmpPrefix + System.currentTimeMillis(); - String query = buildMmCompactionCtQuery(tmpTableName, t, - p == null ? t.getSd() : p.getSd(), baseLocation.toString()); - LOG.info("Compacting a MM table into " + query); - try { - DriverUtils.runOnDriver(driverConf, user, sessionState, query); - break; - } catch (Exception ex) { - Throwable cause = ex; - while (cause != null && !(cause instanceof AlreadyExistsException)) { - cause = cause.getCause(); - } - if (cause == null) { - throw new IOException(ex); - } - } - } - String query = buildMmCompactionQuery(conf, t, p, tmpTableName); - LOG.info("Compacting a MM table via " + query); - long compactorTxnId = CompactorMap.getCompactorTxnId(conf); - DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId); - commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds, compactorTxnId); - DriverUtils.runOnDriver(driverConf, user, sessionState, - "drop table if exists " + tmpTableName); - } catch (HiveException e) { - LOG.error("Error compacting a MM table", e); - throw new IOException(e); - } - } - private String generateTmpPath(StorageDescriptor sd) { return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); } - - /** - * Note on ordering of rows in the temp table: - * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). - * (current write id will be the same as original write id). - * We will be achieving the ordering via a custom split grouper for compactor. - * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description. - * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)} - * for details on the mechanism. - */ - private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) { - StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" ("); - // Acid virtual columns - query.append( - "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<"); - List cols = t.getSd().getCols(); - boolean isFirst = true; - // Actual columns - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); - } - query.append(">)"); - query.append(" stored as orc"); - query.append(" tblproperties ('transactional'='false')"); - return query.toString(); - } - - private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) { - String fullName = t.getDbName() + "." + t.getTableName(); - String query = "insert into table " + tmpName + " "; - String filter = ""; - if (p != null) { - filter = filter + " where "; - List vals = p.getValues(); - List keys = t.getPartitionKeys(); - assert keys.size() == vals.size(); - for (int i = 0; i < keys.size(); ++i) { - filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'"); - } - } - query += " select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, " - + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT("; - List cols = t.getSd().getCols(); - for (int i = 0; i < cols.size(); ++i) { - query += (i == 0 ? "'" : ", '") + cols.get(i).getName() + "', " + cols.get(i).getName(); - } - query += ") from " + fullName + filter; - return query; - } - - /** - * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn. - * Since the temp table is a non-transactional table, it has file names in the "original" format. - * Also, due to split grouping in - * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}, - * we will end up with one file per bucket. - */ - private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, HiveConf conf, - ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - Path fromPath = new Path(from); - Path toPath = new Path(to); - Path tmpTablePath = new Path(fromPath, tmpTableName); - FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. - long maxTxn = actualWriteIds.getHighWatermark(); - // Get a base_wid path which will be the new compacted base - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false) - .maximumWriteId(maxTxn).bucket(0).statementId(-1); - Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir); - fs.mkdirs(newBaseDir); - return; - } - LOG.info("Moving contents of {} to {}", tmpTablePath, to); - /** - * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on - * TODO/ToThink: - * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination? - */ - // List buckCols = t.getSd().getBucketCols(); - FileStatus[] children = fs.listStatus(fromPath); - for (FileStatus filestatus : children) { - String originalFileName = filestatus.getPath().getName(); - // This if() may not be required I think... - if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) { - int bucketId = AcidUtils.parseBucketId(filestatus.getPath()); - options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn) - .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId); - Path finalBucketFile = AcidUtils.createFilename(toPath, options); - Hive.moveFile(conf, filestatus.getPath(), finalBucketFile, true, false, false); - } - } - fs.delete(fromPath, true); - } - - private String buildMmCompactionCtQuery( - String fullName, Table t, StorageDescriptor sd, String location) { - StringBuilder query = new StringBuilder("create temporary table ") - .append(fullName).append("("); - List cols = t.getSd().getCols(); - boolean isFirst = true; - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("` ").append(col.getType()); - } - query.append(") "); - - // Bucketing. - List buckCols = t.getSd().getBucketCols(); - if (buckCols.size() > 0) { - query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); - List sortCols = t.getSd().getSortCols(); - if (sortCols.size() > 0) { - query.append("SORTED BY ("); - isFirst = true; - for (Order sortCol : sortCols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder())); - } - query.append(") "); - } - query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS"); - } - - // Stored as directories. We don't care about the skew otherwise. - if (t.getSd().isStoredAsSubDirectories()) { - SkewedInfo skewedInfo = t.getSd().getSkewedInfo(); - if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { - query.append(" SKEWED BY (").append( - StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); - isFirst = true; - for (List colValues : skewedInfo.getSkewedColValues()) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("('").append(StringUtils.join("','", colValues)).append("')"); - } - query.append(") STORED AS DIRECTORIES"); - } - } - - SerDeInfo serdeInfo = sd.getSerdeInfo(); - Map serdeParams = serdeInfo.getParameters(); - query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand( - serdeInfo.getSerializationLib())).append("'"); - String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); - assert sh == null; // Not supposed to be a compactable table. - if (!serdeParams.isEmpty()) { - ShowCreateTableOperation.appendSerdeParams(query, serdeParams); - } - query.append("STORED AS INPUTFORMAT '").append( - HiveStringUtils.escapeHiveCommand(sd.getInputFormat())).append("' OUTPUTFORMAT '").append( - HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("' LOCATION '").append( - HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); - // Exclude all standard table properties. - Set excludes = getHiveMetastoreConstants(); - excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); - isFirst = true; - for (Map.Entry e : t.getParameters().entrySet()) { - if (e.getValue() == null) continue; - if (excludes.contains(e.getKey())) continue; - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("'").append(e.getKey()).append("'='").append( - HiveStringUtils.escapeHiveCommand(e.getValue())).append("'"); - } - if (!isFirst) { - query.append(", "); - } - query.append("'transactional'='false')"); - return query.toString(); - - } - - private static Set getHiveMetastoreConstants() { - HashSet result = new HashSet<>(); - for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { - if (!Modifier.isStatic(f.getModifiers())) continue; - if (!Modifier.isFinal(f.getModifiers())) continue; - if (!String.class.equals(f.getType())) continue; - f.setAccessible(true); - try { - result.add((String)f.get(null)); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - return result; - } - - private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) { - String fullName = t.getDbName() + "." + t.getTableName(); - // TODO: ideally we should make a special form of insert overwrite so that we: - // 1) Could use fast merge path for ORC and RC. - // 2) Didn't have to create a table. - - String query = "insert overwrite table " + tmpName + " "; - String filter = ""; - if (p != null) { - filter = " where "; - List vals = p.getValues(); - List keys = t.getPartitionKeys(); - assert keys.size() == vals.size(); - for (int i = 0; i < keys.size(); ++i) { - filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'"); - } - query += " select "; - // Use table descriptor for columns. - List cols = t.getSd().getCols(); - for (int i = 0; i < cols.size(); ++i) { - query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`"); - } - } else { - query += "select *"; - } - query += " from " + fullName + filter; - return query; - } /** * @param baseDir if not null, it's either table/partition root folder or base_xxxx. @@ -1221,7 +817,7 @@ public void close() throws IOException { deleteEventWriter.close(false); } } - private static long getCompactorTxnId(Configuration jobConf) { + static long getCompactorTxnId(Configuration jobConf) { String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY); if(Strings.isNullOrEmpty(snapshot)) { throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to " @@ -1423,37 +1019,4 @@ public void abortJob(JobContext context, int status) throws IOException { fs.delete(tmpLocation, true); } } - - /** - * Note: similar logic to the main committer; however, no ORC versions and stuff like that. - * @param from The temp directory used for compactor output. Not the actual base/delta. - * @param to The final directory; basically a SD directory. Not the actual base/delta. - * @param compactorTxnId txn that the compactor started - */ - private void commitMmCompaction(String from, String to, Configuration conf, - ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException { - Path fromPath = new Path(from), toPath = new Path(to); - FileSystem fs = fromPath.getFileSystem(conf); - // Assume the high watermark can be used as maximum transaction ID. - //todo: is that true? can it be aborted? does it matter for compaction? probably OK since - //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. - long maxTxn = actualWriteIds.getHighWatermark(); - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) - .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1) - .visibilityTxnId(compactorTxnId); - Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); - if (!fs.exists(fromPath)) { - LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); - fs.mkdirs(newBaseDir); - return; - } - LOG.info("Moving contents of " + from + " to " + to); - FileStatus[] children = fs.listStatus(fromPath); - if (children.length != 1) { - throw new IOException("Unexpected files in the source: " + Arrays.toString(children)); - } - FileStatus dirPath = children[0]; - fs.rename(dirPath.getPath(), newBaseDir); - fs.delete(fromPath, true); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java new file mode 100644 index 0000000000..10681c0202 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Class responsible of running query based major compaction. + */ +class MajorQueryCompactor extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MajorQueryCompactor.class.getName()); + + @Override + void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + AcidUtils + .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); + AcidUtils.Directory dir = AcidUtils + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, + table.getParameters(), false); + + if (!Util.isEnoughToCompact(true, dir, storageDescriptor)) { + return; + } + + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true); + // Set up the session for driver. + HiveConf conf = new HiveConf(hiveConf); + conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + /* + * For now, we will group splits on tez so that we end up with all bucket files, + * with same bucket number in one map task. + */ + conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_"; + String tmpTableName = tmpPrefix + System.currentTimeMillis(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + try { + // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234 + String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, table); + LOG.info("Running major compaction query into temp table with create definition: {}", query); + try { + DriverUtils.runOnDriver(conf, user, sessionState, query); + } catch (Exception ex) { + Throwable cause = ex; + while (cause != null && !(cause instanceof AlreadyExistsException)) { + cause = cause.getCause(); + } + if (cause == null) { + throw new IOException(ex); + } + } + query = buildCrudMajorCompactionQuery(table, partition, tmpTableName); + LOG.info("Running major compaction via query: {}", query); + /* + * This will create bucket files like: + * db/db_tmp_compactor_tbl_1234/00000_0 + * db/db_tmp_compactor_tbl_1234/00001_0 + */ + DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId); + /* + * This achieves a final layout like (wid is the highest valid write id for this major compaction): + * db/tbl/ptn/base_wid/bucket_00000 + * db/tbl/ptn/base_wid/bucket_00001 + */ + org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); + String tmpLocation = tempTable.getSd().getLocation(); + commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds, + compactorTxnId); + } catch (HiveException e) { + LOG.error("Error doing query based major compaction", e); + throw new IOException(e); + } finally { + try { + DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName); + } catch (HiveException e) { + LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName); + LOG.error(ExceptionUtils.getStackTrace(e)); + } + } + } + + /** + * Note on ordering of rows in the temp table: + * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). + * (current write id will be the same as original write id). + * We will be achieving the ordering via a custom split grouper for compactor. + * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description. + * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)} + * for details on the mechanism. + */ + private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t) { + StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" ("); + // Acid virtual columns + query.append( + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, " + + "`row` struct<"); + List cols = t.getSd().getCols(); + boolean isFirst = true; + // Actual columns + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); + } + query.append(">)"); + query.append(" stored as orc"); + query.append(" tblproperties ('transactional'='false')"); + return query.toString(); + } + + private String buildCrudMajorCompactionQuery(Table t, Partition p, String tmpName) { + String fullName = t.getDbName() + "." + t.getTableName(); + StringBuilder query = new StringBuilder("insert into table " + tmpName + " "); + StringBuilder filter = new StringBuilder(); + if (p != null) { + filter.append(" where "); + List vals = p.getValues(); + List keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i)) + .append("'"); + } + } + query.append(" select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, " + + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT("); + List cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ").append(cols.get(i).getName()); + } + query.append(") from ").append(fullName).append(filter); + return query.toString(); + } + + /** + * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn. + * Since the temp table is a non-transactional table, it has file names in the "original" format. + * Also, due to split grouping in + * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}, + * we will end up with one file per bucket. + */ + private void commitCrudMajorCompaction(String from, String tmpTableName, String to, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + Path fromPath = new Path(from); + Path toPath = new Path(to); + Path tmpTablePath = new Path(fromPath, tmpTableName); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + long maxTxn = actualWriteIds.getHighWatermark(); + // Get a base_wid path which will be the new compacted base + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) + .statementId(-1); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir); + fs.mkdirs(newBaseDir); + return; + } + LOG.info("Moving contents of {} to {}", tmpTablePath, to); + /* + * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on + * TODO/ToThink: + * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination? + */ + // List buckCols = t.getSd().getBucketCols(); + FileStatus[] children = fs.listStatus(fromPath); + for (FileStatus filestatus : children) { + String originalFileName = filestatus.getPath().getName(); + // This if() may not be required I think... + if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) { + int bucketId = AcidUtils.parseBucketId(filestatus.getPath()); + options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn) + .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId); + Path finalBucketFile = AcidUtils.createFilename(toPath, options); + Hive.moveFile(conf, filestatus.getPath(), finalBucketFile, true, false, false); + } + } + fs.delete(fromPath, true); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java new file mode 100644 index 0000000000..f7e0a85c1f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.DriverUtils; +import org.apache.hadoop.hive.ql.ddl.table.creation.ShowCreateTableOperation; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.DirectionUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class responsible to run query based major compaction on insert only tables. + */ +class MmMajorQueryCompactor extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MmMajorQueryCompactor.class.getName()); + + @Override + void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table + .getTableName()); + AcidUtils.Directory dir = AcidUtils + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, + table.getParameters(), false); + removeFilesForMmTable(hiveConf, dir); + + // Then, actually do the compaction. + if (!compactionInfo.isMajorCompaction()) { + // Not supported for MM tables right now. + LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction"); + return; + } + + if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) { + return; + } + + try { + String tmpLocation = Util.generateTmpPath(storageDescriptor); + Path baseLocation = new Path(tmpLocation, "_base"); + + // Set up the session for driver. + HiveConf driverConf = new HiveConf(hiveConf); + driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused + //thinking it already has a txn opened + + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true); + + // Note: we could skip creating the table and just add table type stuff directly to the + // "insert overwrite directory" command if there were no bucketing or list bucketing. + String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_"; + String tmpTableName; + while (true) { + tmpTableName = tmpPrefix + System.currentTimeMillis(); + String query = + buildMmCompactionCtQuery(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(), + baseLocation.toString()); + LOG.info("Compacting a MM table into " + query); + try { + DriverUtils.runOnDriver(driverConf, user, sessionState, query); + break; + } catch (Exception ex) { + Throwable cause = ex; + while (cause != null && !(cause instanceof AlreadyExistsException)) { + cause = cause.getCause(); + } + if (cause == null) { + throw new IOException(ex); + } + } + } + String query = buildMmCompactionQuery(table, partition, tmpTableName); + LOG.info("Compacting a MM table via " + query); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(hiveConf); + DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId); + commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId); + DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName); + } catch (HiveException e) { + LOG.error("Error compacting a MM table", e); + throw new IOException(e); + } + } + + // Remove the directories for aborted transactions only + private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException { + // For MM table, we only want to delete delta dirs for aborted txns. + List filesToDelete = dir.getAbortedDirectories(); + if (filesToDelete.size() < 1) { + return; + } + LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); + FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + for (Path dead : filesToDelete) { + LOG.debug("Going to delete path " + dead.toString()); + fs.delete(dead, true); + } + } + + private String buildMmCompactionCtQuery(String fullName, Table t, StorageDescriptor sd, String location) { + StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("("); + List cols = t.getSd().getCols(); + boolean isFirst = true; + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(col.getType()); + } + query.append(") "); + + // Bucketing. + List buckCols = t.getSd().getBucketCols(); + if (buckCols.size() > 0) { + query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); + List sortCols = t.getSd().getSortCols(); + if (sortCols.size() > 0) { + query.append("SORTED BY ("); + isFirst = true; + for (Order sortCol : sortCols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder())); + } + query.append(") "); + } + query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS"); + } + + // Stored as directories. We don't care about the skew otherwise. + if (t.getSd().isStoredAsSubDirectories()) { + SkewedInfo skewedInfo = t.getSd().getSkewedInfo(); + if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { + query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); + isFirst = true; + for (List colValues : skewedInfo.getSkewedColValues()) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("('").append(StringUtils.join("','", colValues)).append("')"); + } + query.append(") STORED AS DIRECTORIES"); + } + } + + SerDeInfo serdeInfo = sd.getSerdeInfo(); + Map serdeParams = serdeInfo.getParameters(); + query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())) + .append("'"); + String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); + assert sh == null; // Not supposed to be a compactable table. + if (!serdeParams.isEmpty()) { + ShowCreateTableOperation.appendSerdeParams(query, serdeParams); + } + query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat())) + .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())) + .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); + // Exclude all standard table properties. + Set excludes = getHiveMetastoreConstants(); + excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); + isFirst = true; + for (Map.Entry e : t.getParameters().entrySet()) { + if (e.getValue() == null) { + continue; + } + if (excludes.contains(e.getKey())) { + continue; + } + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue())) + .append("'"); + } + if (!isFirst) { + query.append(", "); + } + query.append("'transactional'='false')"); + return query.toString(); + + } + + private String buildMmCompactionQuery(Table t, Partition p, String tmpName) { + String fullName = t.getDbName() + "." + t.getTableName(); + // ideally we should make a special form of insert overwrite so that we: + // 1) Could use fast merge path for ORC and RC. + // 2) Didn't have to create a table. + + StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " "); + StringBuilder filter = new StringBuilder(); + if (p != null) { + filter = new StringBuilder(" where "); + List vals = p.getValues(); + List keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i)) + .append("'"); + } + query.append(" select "); + // Use table descriptor for columns. + List cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`"); + } + } else { + query.append("select *"); + } + query.append(" from ").append(fullName).append(filter); + return query.toString(); + } + + /** + * Note: similar logic to the main committer; however, no ORC versions and stuff like that. + * @param from The temp directory used for compactor output. Not the actual base/delta. + * @param to The final directory; basically a SD directory. Not the actual base/delta. + * @param compactorTxnId txn that the compactor started + */ + private void commitMmCompaction(String from, String to, Configuration conf, ValidWriteIdList actualWriteIds, + long compactorTxnId) throws IOException { + Path fromPath = new Path(from), toPath = new Path(to); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + //todo: is that true? can it be aborted? does it matter for compaction? probably OK since + //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs. + long maxTxn = actualWriteIds.getHighWatermark(); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0) + .statementId(-1).visibilityTxnId(compactorTxnId); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); + fs.mkdirs(newBaseDir); + return; + } + LOG.info("Moving contents of " + from + " to " + to); + FileStatus[] children = fs.listStatus(fromPath); + if (children.length != 1) { + throw new IOException("Unexpected files in the source: " + Arrays.toString(children)); + } + FileStatus dirPath = children[0]; + fs.rename(dirPath.getPath(), newBaseDir); + fs.delete(fromPath, true); + } + + private static Set getHiveMetastoreConstants() { + Set result = new HashSet<>(); + for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { + if (!Modifier.isStatic(f.getModifiers())) { + continue; + } + if (!Modifier.isFinal(f.getModifiers())) { + continue; + } + if (!String.class.equals(f.getType())) { + continue; + } + f.setAccessible(true); + try { + result.add((String) f.get(null)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return result; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java new file mode 100644 index 0000000000..80119de22f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Common interface for query based compactions. + */ +abstract class QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class.getName()); + private static final String TMPDIR = "_tmp"; + + /** + * Start a query based compaction. + * @param hiveConf hive configuration + * @param table the table, where the compaction should run + * @param partition the partition, where the compaction should run + * @param storageDescriptor this is the resolved storage descriptor + * @param writeIds valid write IDs used to filter rows while they're being read for compaction + * @param compactionInfo provides info about the type of compaction + * @throws IOException compaction cannot be finished. + */ + abstract void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException; + + /** + * Collection of some helper functions. + */ + static class Util { + /** + * Determine if compaction can run in a specified directory. + * @param isMajorCompaction type of compaction. + * @param dir the delta directory + * @param sd resolved storage descriptor + * @return true, if compaction can run. + */ + static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) { + int deltaCount = dir.getCurrentDirectories().size(); + int origCount = dir.getOriginalFiles().size(); + + StringBuilder deltaInfo = new StringBuilder().append(deltaCount); + boolean isEnoughToCompact; + + if (isMajorCompaction) { + isEnoughToCompact = (origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1); + + } else { + isEnoughToCompact = (deltaCount > 1); + + if (deltaCount == 2) { + Map deltaByType = dir.getCurrentDirectories().stream().collect(Collectors + .groupingBy(delta -> (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX), + Collectors.counting())); + + isEnoughToCompact = (deltaByType.size() != deltaCount); + deltaInfo.append(" ").append(deltaByType); + } + } + + if (!isEnoughToCompact) { + LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", sd.getLocation(), + dir.getBaseDirectory(), deltaInfo, origCount); + } + return isEnoughToCompact; + } + + /** + * Generate a random tmp path, under the provided storage. + * @param sd storage descriptor, must be not null. + * @return path, always not null + */ + static String generateTmpPath(StorageDescriptor sd) { + return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java new file mode 100644 index 0000000000..8f3974962c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidUtils; + +/** + * Simple factory class, which returns an instance of {@link QueryCompactor}. + */ +class QueryCompactorFactory { + + private QueryCompactorFactory() { + } + + /** + * Get an instance of {@link QueryCompactor}. + * @param table the table, on which the compaction should be running, must be not null. + * @param configuration the hive configuration, must be not null. + * @param compactionInfo provides insight about the type of compaction, must be not null. + * @return {@link QueryCompactor} or null. + */ + static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, CompactionInfo compactionInfo) { + if (!AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf + .getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) { + if (compactionInfo.isMajorCompaction()) { + return new MajorQueryCompactor(); + } else { + throw new RuntimeException("Query based compaction is not currently supported for minor compactions"); + } + } + + if (AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf + .getBoolVar(configuration, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { + return new MmMajorQueryCompactor(); + } + + return null; + } + +}