diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 65264f323f..447babf2e6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2681,6 +2681,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true, "Whether the compactor should compact insert-only tables. A safety switch."), + COMPACTOR_CRUD_QUERY_BASED("hive.compactor.crud.query.based", false, + "Means Major compaction on full CRUD tables is done as a query."), + SPLIT_GROUPING_MODE("hive.split.gropuing.mode", "query", new StringSet("query", "compactor"), + "for SplitGrouper"), /** * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED */ diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 40dd992455..6278a6c605 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.orc.OrcConf; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.junit.After; import org.junit.Assert; diff --git a/pom.xml b/pom.xml index 26b662e4c3..4bb304ba31 100644 --- a/pom.xml +++ b/pom.xml @@ -186,7 +186,7 @@ 0.9.3 2.10.0 2.3 - 1.5.3 + 1.6.0-SNAPSHOT 1.10.19 1.7.4 2.0.0-M5 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index 7f8bd229a6..85b36e3c6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; @@ -33,9 +35,12 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.mapred.FileSplit; @@ -172,7 +177,7 @@ MapWork work = populateMapWork(jobConf, inputName); // ArrayListMultimap is important here to retain the ordering for the splits. - Multimap bucketSplitMultiMap = + Multimap schemaSplitMultiMap = ArrayListMultimap. create(); int i = 0; @@ -184,17 +189,112 @@ ++i; prevSplit = s; } - bucketSplitMultiMap.put(i, s); + schemaSplitMultiMap.put(i, s); } LOG.info("# Src groups for split generation: " + (i + 1)); + // Split grouping for query based compaction + if (HiveConf.getVar(jobConf, HiveConf.ConfVars.SPLIT_GROUPING_MODE).equalsIgnoreCase("compactor")) { + /** + * For each key in bucketSplitMultiMap (which for Acid should be just 1 key since only ORC IF is supported), + * expectation is that each InputSplit is a {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit} + * wrapping an OrcSplit. So group these splits by bucketId and within each bucketId, sort by writeId, stmtId, + * rowIdOffset, splitStart. This should achieve the required sorting invariance needed for Acid tables. + * See: {@link org.apache.hadoop.hive.ql.io.AcidInputFormat} + * Create a TezGroupedSplit for each bucketId and return. + */ + if (!AcidUtils.isFullAcidScan(conf)) { + String errorMessage = "Compactor split grouping is enabled only for transactional tables"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + // Create grouped splits per schema id. + // Splits are grouped such that each writer is responsible for the entire logical bucket, + // across the different base/deltas that are being compacted. + Multimap bucketSplitMultiMap = ArrayListMultimap. create(); + for (int schemaGroupId : schemaSplitMultiMap.keySet()) { + List perWriterIdGroups = getCompactorGroups(schemaSplitMultiMap.get(schemaGroupId), conf); + for (TezGroupedSplit tgs : perWriterIdGroups) { + bucketSplitMultiMap.put(schemaGroupId, tgs); + } + } + return bucketSplitMultiMap; + } + // group them into the chunks we want Multimap groupedSplits = - this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider); - + this.group(jobConf, schemaSplitMultiMap, availableSlots, waves, locationProvider); return groupedSplits; } - + + /** + * Takes a per schema list of {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit}s + * and groups them for Acid Compactor + * @param rawSplits + * @return + */ + List getCompactorGroups(Collection rawSplits, Configuration conf) { + HiveInputFormat.HiveInputSplit[] splits = new HiveInputFormat.HiveInputSplit[rawSplits.size()]; + int i = 0; + for(InputSplit is : rawSplits) { + splits[i++] = (HiveInputFormat.HiveInputSplit)is; + } + Arrays.sort(splits, new ComparatorCompactor(conf)); + List groupedSplits = new ArrayList<>(); + TezGroupedSplit tgs = null; + int writerId = Integer.MIN_VALUE; + for(i = 0; i < splits.length; i++) { + if(writerId != ((OrcSplit)splits[i].getInputSplit()).getBucketId()) { + tgs = new TezGroupedSplit(1, "org.apache.hadoop.hive.ql.io.HiveInputFormat", null, null); + groupedSplits.add(tgs); + writerId = ((OrcSplit)splits[i].getInputSplit()).getBucketId(); + } + tgs.addSplit(splits[i]); + } + return groupedSplits; + } + + static class ComparatorCompactor implements Comparator { + private Configuration conf; + private ComparatorCompactor(Configuration conf) { + this.conf = conf; + } + + @Override + public int compare(HiveInputFormat.HiveInputSplit h1, HiveInputFormat.HiveInputSplit h2) { + if(h1 == h2) { + return 0;//sort: bucketId,writeId,stmtId,rowIdOffset,splitStart + } + //todo: compare by wrapped OrcSplit + OrcSplit o1 = (OrcSplit)h1.getInputSplit(); + OrcSplit o2 = (OrcSplit)h2.getInputSplit(); + try { + o1.parse(conf); + o2.parse(conf); + } catch(IOException ex) { + throw new RuntimeException(ex); + } + if(o1.getBucketId() != o2.getBucketId()) { + return o1.getBucketId() < o2.getBucketId() ? -1 : 1; + } + if(o1.getWriteId() != o2.getWriteId()) { + return o1.getWriteId() < o2.getWriteId() ? -1 : 1; + } + if(o1.getStatementId() != o2.getStatementId()) { + return o1.getStatementId() < o2.getStatementId() ? -1 : 1; + } + long rowOffset1 = o1.getSyntheticAcidProps() == null ? 0 : o1.getSyntheticAcidProps().getRowIdOffset(); + long rowOffset2 = o2.getSyntheticAcidProps() == null ? 0 : o2.getSyntheticAcidProps().getRowIdOffset(); + if(rowOffset1 != rowOffset2) { + //if 2 splits are from the same file (delta/base in fact), they either both have syntheticAcidProps or both do not + return rowOffset1 < rowOffset2 ? -1 : 1; + } + if(o1.getStart() != o2.getStart()) { + return o1.getStart() < o2.getStart() ? -1 : 1; + } + throw new RuntimeException("Found 2 equal splits: " + o1 + " and " + o2); + } + } /** * get the size estimates for each bucket in tasks. This is used to make sure diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 4d55592b63..429920ff66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.hive.ql.io.LlapAwareSplit; @@ -64,6 +65,9 @@ private long projColsUncompressedSize; private transient Object fileKey; private long fileLen; + private long writeId = 0; + private int bucketId = 0; + private int stmtId = 0; /** * This contains the synthetic ROW__ID offset and bucket properties for original file splits in an ACID table. @@ -306,7 +310,7 @@ public boolean canUseLlapIo(Configuration conf) { /** * Used for generating synthetic ROW__IDs for reading "original" files. */ - static final class OffsetAndBucketProperty { + public static final class OffsetAndBucketProperty { private final long rowIdOffset; private final int bucketProperty; private final long syntheticWriteId; @@ -328,6 +332,27 @@ public long getSyntheticWriteId() { return syntheticWriteId; } } + + public long getWriteId() { + return writeId; + } + + public int getStatementId() { + return stmtId; + } + + public int getBucketId() { + return bucketId; + } + + public void parse(Configuration conf) throws IOException { + OrcRawRecordMerger.TransactionMetaData tmd = + OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootDir, conf); + writeId = tmd.syntheticWriteId; + stmtId = tmd.statementId; + AcidOutputFormat.Options opt = AcidUtils.parseBaseOrDeltaBucketFilename(getPath(), conf); + bucketId = opt.getBucketId(); + } @Override public String toString() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 92c74e1d06..27ec093765 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -42,14 +42,12 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; @@ -59,10 +57,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.DriverUtils; -import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -74,8 +69,6 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@ -235,6 +228,18 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } + + /** + * Run major compaction in a HiveQL query + * TODO: + * 1. A good way to run minor compaction + * 2. More generic approach to collecting files in the same logical bucket to compact within the same task + * (currently we're using Tez split grouping). + */ + if (HiveConf.getBoolVar(conf, ConfVars.COMPACTOR_CRUD_QUERY_BASED) && ci.isMajorCompaction()) { + runCrudCompaction(conf, t, p, sd, writeIds, ci); + return; + } if (AcidUtils.isInsertOnlyTable(t.getParameters())) { if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { @@ -321,6 +326,78 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor su.gatherStats(); } + /** + * + * @param conf + * @param t + * @param p + * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition) + * @param writeIds (valid write ids used to filter rows while they're being read for compaction) + * @param ci + * @throws IOException + */ + private void runCrudCompaction(HiveConf conf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, + CompactionInfo ci) throws IOException { + AcidUtils.setAcidOperationalProperties(conf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); + AcidUtils.Directory dir = + AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, t.getParameters()); + int deltaCount = dir.getCurrentDirectories().size(); + int origCount = dir.getOriginalFiles().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { + LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), + dir.getBaseDirectory(), deltaCount, origCount); + return; + } + try { + // Generate a temp path under db/tbl/ptn -> db/tbl/ptn/_tmp_1234 + String tmpLocation = generateTmpPath(sd); + // Set up the session for driver. + conf = new HiveConf(conf); + conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + /** + * For now, we will group splits on tez so that we end up with all bucket files, + * with same bucket number in one map task. + */ + conf.set(ConfVars.SPLIT_GROUPING_MODE.varname, "compactor"); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false); + String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_"; + String tmpTableName = tmpPrefix + System.currentTimeMillis(); + // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234 + String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, t, sd, tmpLocation.toString()); + LOG.info("Running major compaction query into temp table with create definition: {}", query); + try { + DriverUtils.runOnDriver(conf, user, sessionState, query, null); + } catch (Exception ex) { + Throwable cause = ex; + while (cause != null && !(cause instanceof AlreadyExistsException)) { + cause = cause.getCause(); + } + if (cause == null) { + throw new IOException(ex); + } + } + query = buildCrudMajorCompactionQuery(conf, t, p, tmpTableName); + LOG.info("Running major compaction via query: {}", query); + /** + * This will create bucket files like: + * db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234/0000_0 + * db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234/0000_1 + */ + DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds); + /** + * This achieves a final layout like (wid is the highest valid write id for this major compaction): + * db/tbl/ptn/base_wid/bucket_00000 + * db/tbl/ptn/base_wid/bucket_00001 + */ + commitCrudMajorCompaction(t, tmpLocation, tmpTableName, sd.getLocation(), conf, writeIds); + DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null); + } catch (HiveException e) { + LOG.error("Error doing query based major compaction", e); + throw new IOException(e); + } + } + private void runMmCompaction(HiveConf conf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " @@ -393,6 +470,104 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, private String generateTmpPath(StorageDescriptor sd) { return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString(); } + + /** + * Note on ordering of rows in the temp table: + * We need each final bucket file soreted by original write id (ascending), bucket (ascending), row id (ascending), + * and current write id (descending). We will be achieving the ordering via a custom split grouper for compactor. + * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars.SPLIT_GROUPING_MODE} for the config description. + * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups} for details on the mechanism. + */ + private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd, + String location) { + StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" ("); + // Acid virtual columns + query.append( + "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<"); + List cols = t.getSd().getCols(); + boolean isFirst = true; + // Actual columns + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); + } + query.append(">)"); + query.append(" stored as orc"); + query.append(" location '").append(HiveStringUtils.escapeHiveCommand(location)).append("'"); + query.append(" tblproperties ('transactional'='false')"); + return query.toString(); + } + + private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) { + String fullName = t.getDbName() + "." + t.getTableName(); + String query = "insert overwrite table " + tmpName + " "; + String filter = " where "; + if (p != null) { + List vals = p.getValues(); + List keys = t.getPartitionKeys(); + assert keys.size() == vals.size(); + for (int i = 0; i < keys.size(); ++i) { + filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'"); + } + } + query += " select 0, ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT("; + List cols = t.getSd().getCols(); + for (int i = 0; i < cols.size(); ++i) { + query += (i == 0 ? "`" : ", `") + cols.get(i).getName() + "`, " + cols.get(i).getName(); + } + query += ") from " + fullName + filter; + return query; + } + + /** + * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn + */ + private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, Configuration conf, + ValidWriteIdList actualWriteIds) throws IOException { + Path fromPath = new Path(from); + Path toPath = new Path(to); + Path tmpTablePath = new Path(fromPath, tmpTableName); + FileSystem fs = fromPath.getFileSystem(conf); + // Assume the high watermark can be used as maximum transaction ID. + long maxTxn = actualWriteIds.getHighWatermark(); + // Get a base_wid path which will be the new compacted base + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false) + .maximumWriteId(maxTxn).bucket(0).statementId(-1); + Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir); + fs.mkdirs(newBaseDir); + AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs); + return; + } + LOG.info("Moving contents of {} to {}", tmpTablePath, to); + /** + * TODO/ToThink: + * Q. Should the number of files in the temp table match the number of buckets? + * A. Perhaps not: there could be a case where a delete removes all rows in a bucket + * (would there be an empty file created anyway?) + * + * Q. Can file with name 0000_0 under temp table be renamed to bucket_00000 in the destination? + * A. Will need more investigation into how the naming happens in FSOP. + * Perhaps we should override it with our custom name (bucket_00000 and so on)? + */ + // List buckCols = t.getSd().getBucketCols(); + FileStatus[] children = fs.listStatus(fromPath); + // if (buckCols.size() != children.length) { + // throw new IOException("The number of compacted bucket files do not match the number of buckets for the table"); + // } + for (int i = 0; i < children.length; i++) { + options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn) + .bucket(i).statementId(-1); + Path finalBucketFile = AcidUtils.createFilename(toPath, options); + fs.rename(children[i].getPath(), finalBucketFile); + } + AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs); + fs.delete(fromPath, true); + } private String buildMmCompactionCtQuery( String fullName, Table t, StorageDescriptor sd, String location) {