diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 9b51847f1b..3961baa82a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -83,6 +83,7 @@ import com.google.common.annotations.VisibleForTesting; +import javax.annotation.concurrent.Immutable; import java.nio.charset.Charset; /** @@ -435,16 +436,16 @@ else if (filename.startsWith(BUCKET_PREFIX)) { } public static final class DirectoryImpl implements Directory { - private final List abortedDirectories; + private final List abortedDirectories; private final boolean isBaseInRawFormat; private final List original; - private final List obsolete; + private final List obsolete; private final List deltas; private final Path base; - public DirectoryImpl(List abortedDirectories, + public DirectoryImpl(List abortedDirectories, boolean isBaseInRawFormat, List original, - List obsolete, List deltas, Path base) { + List obsolete, List deltas, Path base) { this.abortedDirectories = abortedDirectories == null ? Collections.emptyList() : abortedDirectories; this.isBaseInRawFormat = isBaseInRawFormat; @@ -475,12 +476,12 @@ public boolean isBaseInRawFormat() { } @Override - public List getObsolete() { + public List getObsolete() { return obsolete; } @Override - public List getAbortedDirectories() { + public List getAbortedDirectories() { return abortedDirectories; } } @@ -740,7 +741,7 @@ public String toString() { /** * Get the list of base and delta directories that are valid and not * obsolete. Not {@code null}. List must be sorted in a specific way. - * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta)} + * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)} * for details. * @return the minimal list of current directories */ @@ -752,13 +753,13 @@ public String toString() { * list of original files, bases, and deltas that have been replaced by * more up to date ones. Not {@code null}. */ - List getObsolete(); + List getObsolete(); /** * Get the list of directories that has nothing but aborted transactions. * @return the list of aborted directories */ - List getAbortedDirectories(); + List getAbortedDirectories(); } /** * Since version 3 but prior to version 4, format of a base is "base_X" where X is a writeId. @@ -804,18 +805,48 @@ public static ParsedBase parseBase(Path path) { Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path); } } + /** - * Immutable + * In addition to {@link ParsedDeltaLight} this knows if the data is in raw format, i.e. doesn't + * have acid metadata columns embedded in the files. To determine this in some cases + * requires looking at the footer of the data file which can be expensive so if this info is + * not needed {@link ParsedDeltaLight} should be used. */ - public static final class ParsedDelta implements Comparable { - private final long minWriteId; - private final long maxWriteId; - private final FileStatus path; + @Immutable + public static final class ParsedDelta extends ParsedDeltaLight { + private final boolean isRawFormat; + /** + * for pre 1.3.x delta files + */ + private ParsedDelta(long min, long max, Path path, boolean isDeleteDelta, + boolean isRawFormat, long visibilityTxnId) { + this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId); + } + private ParsedDelta(long min, long max, Path path, int statementId, + boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) { + super(min, max, path, statementId, isDeleteDelta, visibilityTxnId); + this.isRawFormat = isRawFormat; + } + /** + * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE} + */ + public boolean isRawFormat() { + return isRawFormat; + } + } + /** + * This encapsulates info obtained form the file path. + * See also {@link ParsedDelta}. + */ + @Immutable + public static class ParsedDeltaLight implements Comparable { + final long minWriteId; + final long maxWriteId; + final Path path; //-1 is for internal (getAcidState()) purposes and means the delta dir //had no statement ID - private final int statementId; - private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' - private final boolean isRawFormat; + final int statementId; + final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' /** * transaction Id of txn which created this delta. This dir should be considered * invisible unless this txn is committed @@ -823,23 +854,22 @@ public static ParsedBase parseBase(Path path) { * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc */ - private final long visibilityTxnId; - /** - * for pre 1.3.x delta files - */ - private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta, - boolean isRawFormat, long visibilityTxnId) { - this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId); + final long visibilityTxnId; + + public static ParsedDeltaLight parse(Path deltaDir) { + //passing isRawFormat=false is bogus. This is just to parse the file name. + ParsedDelta pd = parsedDelta(deltaDir, false); + return new ParsedDeltaLight(pd.getMinWriteId(), pd.getMaxWriteId(), deltaDir, + pd.getStatementId(), pd.isDeleteDelta(), pd.getVisibilityTxnId()); } - private ParsedDelta(long min, long max, FileStatus path, int statementId, - boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) { + + private ParsedDeltaLight(long min, long max, Path path, int statementId, + boolean isDeleteDelta, long visibilityTxnId) { this.minWriteId = min; this.maxWriteId = max; this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; - this.isRawFormat = isRawFormat; - assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format"; this.visibilityTxnId = visibilityTxnId; } @@ -852,7 +882,7 @@ public long getMaxWriteId() { } public Path getPath() { - return path.getPath(); + return path; } public int getStatementId() { @@ -862,15 +892,17 @@ public int getStatementId() { public boolean isDeleteDelta() { return isDeleteDelta; } - /** - * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE} - */ - public boolean isRawFormat() { - return isRawFormat; - } public long getVisibilityTxnId() { return visibilityTxnId; } + /** + * Only un-compacted delta_x_y (x != y) (created by streaming ingest with batch size > 1) + * may contain a {@link OrcAcidUtils#getSideFile(Path)}. + * @return + */ + boolean mayContainSideFile() { + return !isDeleteDelta() && getMinWriteId() != getMaxWriteId() && getVisibilityTxnId() <= 0; + } /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with @@ -879,7 +911,7 @@ public long getVisibilityTxnId() { * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) */ @Override - public int compareTo(ParsedDelta parsedDelta) { + public int compareTo(ParsedDeltaLight parsedDelta) { if (minWriteId != parsedDelta.minWriteId) { if (minWriteId < parsedDelta.minWriteId) { return -1; @@ -990,9 +1022,9 @@ public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOExc return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix } - private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix, FileSystem fs) + private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs) throws IOException { - ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs); + ParsedDelta p = parsedDelta(path, deltaPrefix, fs); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); return new ParsedDelta(p.getMinWriteId(), p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId); @@ -1132,9 +1164,9 @@ public static Directory getAcidState(Path directory, // The following 'deltas' includes all kinds of delta files including insert & delete deltas. final List deltas = new ArrayList(); List working = new ArrayList(); - List originalDirectories = new ArrayList(); - final List obsolete = new ArrayList(); - final List abortedDirectories = new ArrayList<>(); + List originalDirectories = new ArrayList<>(); + final List obsolete = new ArrayList<>(); + final List abortedDirectories = new ArrayList<>(); List childrenWithId = null; Boolean val = useFileIds.value; if (val == null || val) { @@ -1169,9 +1201,9 @@ public static Directory getAcidState(Path directory, if (bestBase.status != null) { // Add original files to obsolete list if any for (HdfsFileStatusWithId fswid : original) { - obsolete.add(fswid.getFileStatus()); + obsolete.add(fswid.getFileStatus().getPath()); } - // Add original direcotries to obsolete list if any + // Add original directories to obsolete list if any obsolete.addAll(originalDirectories); // remove the entries so we don't get confused later and think we should // use them. @@ -1180,7 +1212,7 @@ public static Directory getAcidState(Path directory, } else { // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. - for (FileStatus origDir : originalDirectories) { + for (Path origDir : originalDirectories) { findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); } } @@ -1308,9 +1340,9 @@ private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) thr } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, - ValidWriteIdList writeIdList, List working, List originalDirectories, - List original, List obsolete, TxnBase bestBase, - boolean ignoreEmptyFiles, List aborted, Map tblproperties, + ValidWriteIdList writeIdList, List working, List originalDirectories, + List original, List obsolete, TxnBase bestBase, + boolean ignoreEmptyFiles, List aborted, Map tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException { Path p = child.getPath(); String fn = p.getName(); @@ -1322,7 +1354,7 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } if (fn.startsWith(BASE_PREFIX)) { ParsedBase parsedBase = ParsedBase.parseBase(p); - if(!isDirUsable(child, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { + if(!isDirUsable(child.getPath(), parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { return; } final long writeId = parsedBase.getWriteId(); @@ -1338,22 +1370,22 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } } else if (bestBase.writeId < writeId) { if(isValidBase(parsedBase, writeIdList, fs)) { - obsolete.add(bestBase.status); + obsolete.add(bestBase.status.getPath()); bestBase.status = child; bestBase.writeId = writeId; } } else { - obsolete.add(child); + obsolete.add(child.getPath()); } } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; - ParsedDelta delta = parseDelta(child, deltaPrefix, fs); - if(!isDirUsable(child, delta.getVisibilityTxnId(), aborted, validTxnList)) { + ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs); + if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) { return; } if(ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { - aborted.add(child); + aborted.add(child.getPath()); } else if (writeIdList.isWriteIdRangeValid( delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { @@ -1364,16 +1396,16 @@ else if (writeIdList.isWriteIdRangeValid( // do this until we have determined there is no base. This saves time. Plus, // it is possible that the cleaner is running and removing these original files, // in which case recursing through them could cause us to get an error. - originalDirectories.add(child); + originalDirectories.add(child.getPath()); } } /** * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot */ - private static boolean isDirUsable(FileStatus child, long visibilityTxnId, - List aborted, ValidTxnList validTxnList) { + private static boolean isDirUsable(Path child, long visibilityTxnId, + List aborted, ValidTxnList validTxnList) { if(validTxnList == null) { - throw new IllegalArgumentException("No ValidTxnList for " + child.getPath()); + throw new IllegalArgumentException("No ValidTxnList for " + child); } if(!validTxnList.isTxnValid(visibilityTxnId)) { boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId); @@ -1411,19 +1443,18 @@ public Long getFileId() { /** * Find the original files (non-ACID layout) recursively under the partition directory. * @param fs the file system - * @param stat the directory to add + * @param dir the directory to add * @param original the list of original files * @throws IOException */ - public static void findOriginals(FileSystem fs, FileStatus stat, + public static void findOriginals(FileSystem fs, Path dir, List original, Ref useFileIds, boolean ignoreEmptyFiles, boolean recursive) throws IOException { - assert stat.isDir(); List childrenWithId = null; Boolean val = useFileIds.value; if (val == null || val) { try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter); + childrenWithId = SHIMS.listLocatedHdfsStatus(fs, dir, hiddenFileFilter); if (val == null) { useFileIds.value = true; } @@ -1438,7 +1469,8 @@ public static void findOriginals(FileSystem fs, FileStatus stat, for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDirectory()) { if (recursive) { - findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles, true); + findOriginals(fs, child.getFileStatus().getPath(), original, useFileIds, + ignoreEmptyFiles, true); } } else { if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) { @@ -1447,11 +1479,11 @@ public static void findOriginals(FileSystem fs, FileStatus stat, } } } else { - List children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); + List children = HdfsUtils.listLocatedStatus(fs, dir, hiddenFileFilter); for (FileStatus child : children) { - if (child.isDir()) { + if (child.isDirectory()) { if (recursive) { - findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles, true); + findOriginals(fs, child.getPath(), original, useFileIds, ignoreEmptyFiles, true); } } else { if(!ignoreEmptyFiles || child.getLen() > 0) { @@ -1494,7 +1526,9 @@ public static boolean isTablePropertyTransactional(Configuration conf) { public static boolean isDeleteDelta(Path p) { return p.getName().startsWith(DELETE_DELTA_PREFIX); } - + public static boolean isInsertDelta(Path p) { + return p.getName().startsWith(DELTA_PREFIX); + } public static boolean isTransactionalTable(CreateTableDesc table) { if (table == null || table.getTblProps() == null) { return false; @@ -1661,6 +1695,16 @@ public static AcidOperationalProperties getAcidOperationalProperties( * @param file - data file to read/compute splits on */ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException { + Path acidDir = file.getPath().getParent(); //should be base_x or delta_x_y_ + if(AcidUtils.isInsertDelta(acidDir)) { + ParsedDeltaLight pd = ParsedDeltaLight.parse(acidDir); + if(!pd.mayContainSideFile()) { + return file.getLen(); + } + } + else { + return file.getLen(); + } Path lengths = OrcAcidUtils.getSideFile(file.getPath()); if(!fs.exists(lengths)) { /** @@ -2009,11 +2053,33 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce } /** - * Checks if the files in base/delta dir are a result of Load Data statement and thus do not - * have ROW_IDs embedded in the data. + * Checks if the files in base/delta dir are a result of Load Data/Add Partition statement + * and thus do not have ROW_IDs embedded in the data. + * This is only meaningful for full CRUD tables - Insert-only tables have all their data + * in raw format by definition. * @param baseOrDeltaDir base or delta file. */ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have + // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction + // must be native Acid format by definition + if(isDeleteDelta(baseOrDeltaDir)) { + return false; + } + if(isInsertDelta(baseOrDeltaDir)) { + ParsedDeltaLight pd = ParsedDeltaLight.parse(baseOrDeltaDir); + if(pd.getMinWriteId() != pd.getMaxWriteId()) { + //must be either result of streaming or compaction + return false; + } + } + else { + //must be base_x + if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs)) { + return false; + } + } + //if here, have to check the files Path dataFile = chooseFile(baseOrDeltaDir, fs); if (dataFile == null) { //directory is empty or doesn't have any that could have been produced by load data diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 720dbe5284..ca254492a1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1206,7 +1206,7 @@ private AcidDirInfo callInternal() throws IOException { context.conf.getBoolean("mapred.input.dir.recursive", false)); List originals = new ArrayList<>(); List baseFiles = new ArrayList<>(); - AcidUtils.findOriginals(fs, fs.getFileStatus(dir), originals, useFileIds, true, isRecursive); + AcidUtils.findOriginals(fs, dir, originals, useFileIds, true, isRecursive); for (HdfsFileStatusWithId fileId : originals) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 7c4bc4d4b6..62a1061dfd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1107,6 +1107,8 @@ public Options clone() { throw new IllegalStateException(delta + " is not delete delta and is not compacting."); } ReaderKey key = new ReaderKey(); + //todo: only need to know isRawFormat if compacting for acid V2 and V2 should normally run + //in vectorized mode - i.e. this is not a significant perf overhead vs ParsedDeltaLight AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf)); if(deltaDir.isRawFormat()) { assert !deltaDir.isDeleteDelta() : delta.toString(); @@ -1228,8 +1230,7 @@ static TransactionMetaData findWriteIDForSynthetcRowIDs(Path splitPath, Path roo parent); } else { - AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX, - parent.getFileSystem(conf)); + AcidUtils.ParsedDeltaLight pd = AcidUtils.ParsedDeltaLight.parse(parent); return new TransactionMetaData(pd.getMinWriteId(), parent, pd.getStatementId()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 06b0209aa0..57eb506996 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -226,7 +226,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti throws IOException, NoSuchObjectException { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); - List obsoleteDirs = dir.getObsolete(); + List obsoleteDirs = dir.getObsolete(); /** * add anything in 'dir' that only has data from aborted transactions - no one should be * trying to read anything in that dir (except getAcidState() that only reads the name of @@ -239,11 +239,11 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti obsoleteDirs.addAll(dir.getAbortedDirectories()); List filesToDelete = new ArrayList<>(obsoleteDirs.size()); StringBuilder extraDebugInfo = new StringBuilder("["); - for (FileStatus stat : obsoleteDirs) { - filesToDelete.add(stat.getPath()); - extraDebugInfo.append(stat.getPath().getName()).append(","); - if(!FileUtils.isPathWithinSubtree(stat.getPath(), locPath)) { - LOG.info(idWatermark(ci) + " found unexpected file: " + stat.getPath()); + for (Path stat : obsoleteDirs) { + filesToDelete.add(stat); + extraDebugInfo.append(stat.getName()).append(","); + if(!FileUtils.isPathWithinSubtree(stat, locPath)) { + LOG.info(idWatermark(ci) + " found unexpected file: " + stat); } } extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index cde47da718..f52b023b86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -339,13 +339,8 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor } /** - * - * @param conf - * @param t - * @param p * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition) * @param writeIds (valid write ids used to filter rows while they're being read for compaction) - * @param ci * @throws IOException */ private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, @@ -497,8 +492,9 @@ private String generateTmpPath(StorageDescriptor sd) { * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). * (current write id will be the same as original write id). * We will be achieving the ordering via a custom split grouper for compactor. - * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars.SPLIT_GROUPING_MODE} for the config description. - * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups} for details on the mechanism. + * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description. + * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)} + * for details on the mechanism. */ private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) { StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" ("); @@ -547,7 +543,8 @@ private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p /** * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn. * Since the temp table is a non-transactional table, it has file names in the "original" format. - * Also, due to split grouping in {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups}, + * Also, due to split grouping in + * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}, * we will end up with one file per bucket. */ private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, Configuration conf, @@ -822,11 +819,7 @@ private void setColumnTypes(JobConf job, List cols) { // Remove the directories for aborted transactions only private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException { // For MM table, we only want to delete delta dirs for aborted txns. - List abortedDirs = dir.getAbortedDirectories(); - List filesToDelete = new ArrayList<>(abortedDirs.size()); - for (FileStatus stat : abortedDirs) { - filesToDelete.add(stat.getPath()); - } + List filesToDelete = dir.getAbortedDirectories(); if (filesToDelete.size() < 1) { return; } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 02fde2245c..c5faec5e95 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -115,7 +115,7 @@ public void testParsing() throws Exception { AcidUtils.ParsedBase.parseBase(new Path("/tmp/base_000123")).getWriteId()); assertEquals(0, AcidUtils.ParsedBase.parseBase(new Path("/tmp/base_000123")).getVisibilityTxnId()); - Path dir = new Path("/tmp/tbl"); + Path dir = new Path("mock:/tmp/"); AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"), conf); @@ -202,12 +202,12 @@ public void testOriginalDeltas() throws Exception { AcidUtils.getAcidState(new MockPath(fs, "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals(null, dir.getBaseDirectory()); - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); assertEquals("mock:/tbl/part1/delta_025_025", - obsolete.get(0).getPath().toString()); + obsolete.get(0).toString()); assertEquals("mock:/tbl/part1/delta_029_029", - obsolete.get(1).getPath().toString()); + obsolete.get(1).toString()); List result = dir.getOriginalFiles(); assertEquals(5, result.size()); assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString()); @@ -246,13 +246,13 @@ public void testBaseDeltas() throws Exception { AcidUtils.getAcidState(new MockPath(fs, "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(5, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); + assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); + assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).toString()); + assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).toString()); + assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).toString()); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(1, deltas.size()); @@ -276,9 +276,9 @@ public void testObsoleteOriginals() throws Exception { AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":")); // Obsolete list should include the two original bucket files, and the old base dir - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(3, obsolete.size()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).toString()); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); } @@ -299,10 +299,10 @@ public void testOverlapingDelta() throws Exception { AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); - assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).toString()); + assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(1).toString()); List delts = dir.getCurrentDirectories(); assertEquals(4, delts.size()); assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString()); @@ -336,13 +336,13 @@ public void testOverlapingDelta2() throws Exception { AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(5, obsolete.size()); - assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).toString()); + assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).toString()); List delts = dir.getCurrentDirectories(); assertEquals(5, delts.size()); assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString()); @@ -451,15 +451,15 @@ public void testBaseWithDeleteDeltas() throws Exception { AcidUtils.getAcidState(new MockPath(fs, "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(7, obsolete.size()); - assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).getPath().toString()); - assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).getPath().toString()); + assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString()); + assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString()); + assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).toString()); + assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).toString()); + assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).toString()); + assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).toString()); + assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).toString()); assertEquals(0, dir.getOriginalFiles().size()); List deltas = dir.getCurrentDirectories(); assertEquals(2, deltas.size()); @@ -490,11 +490,11 @@ public void testOverlapingDeltaAndDeleteDelta() throws Exception { AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(3, obsolete.size()); - assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).toString()); + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).toString()); + assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).toString()); List delts = dir.getCurrentDirectories(); assertEquals(6, delts.size()); assertEquals("mock:/tbl/part1/delete_delta_40_60", delts.get(0).getPath().toString()); @@ -520,9 +520,9 @@ public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exc new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":")); - List obsolete = dir.getObsolete(); + List obsolete = dir.getObsolete(); assertEquals(1, obsolete.size()); - assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString()); List delts = dir.getCurrentDirectories(); assertEquals(1, delts.size()); assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 91458ea87a..50ebbface0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2808,11 +2808,9 @@ public void testSplitGenReadOps() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: check existence of side file for mock:/mocktable/0_0 - // call-3: open - mock:/mocktable/0_0 - // call-4: check existence of side file for mock:/mocktable/0_1 - // call-5: open - mock:/mocktable/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktable/0_0 + // call-3: open - mock:/mocktable/0_1 + assertEquals(3, readOpsDelta); assertEquals(2, splits.length); // revert back to local fs @@ -2868,11 +2866,9 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: check existence of side file for mock:/mocktbl/0_0 - // call-3: open - mock:/mocktbl/0_0 - // call-4: check existence of side file for mock:/mocktbl/0_1 - // call-5: open - mock:/mocktbl/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl/0_0 + // call-3: open - mock:/mocktbl/0_1 + assertEquals(3, readOpsDelta); // force BI to avoid reading footers conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); @@ -2890,9 +2886,7 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: check existence of side file for mock:/mocktbl/0_0 - // call-3: check existence of side file for mock:/mocktbl/0_1 - assertEquals(3, readOpsDelta); + assertEquals(1, readOpsDelta); // enable cache and use default strategy conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb"); @@ -2911,11 +2905,9 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: check existence of side file for mock:/mocktbl/0_0 - // call-3: open - mock:/mocktbl/0_0 - // call-4: check existence of side file for mock:/mocktbl/0_1 - // call-5: open - mock:/mocktbl/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl/0_0 + // call-3: open - mock:/mocktbl/0_1 + assertEquals(3, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -2985,11 +2977,9 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: check side file for mock:/mocktbl1/0_0 - // call-3: open - mock:/mocktbl1/0_0 - // call-4: check side file for mock:/mocktbl1/0_1 - // call-5: open - mock:/mocktbl1/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl1/0_0 + // call-3: open - mock:/mocktbl1/0_1 + assertEquals(3, readOpsDelta); // change file length and look for cache misses @@ -3026,11 +3016,9 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: check side file for mock:/mocktbl1/0_0 - // call-3: open - mock:/mocktbl1/0_0 - // call-4: check side file for mock:/mocktbl1/0_1 - // call-5: open - mock:/mocktbl1/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl1/0_0 + // call-3: open - mock:/mocktbl1/0_1 + assertEquals(3, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -3101,11 +3089,9 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: check side file for mock:/mocktbl2/0_0 - // call-3: open - mock:/mocktbl2/0_0 - // call-4: check side file for mock:/mocktbl2/0_1 - // call-5: open - mock:/mocktbl2/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl2/0_0 + // call-3: open - mock:/mocktbl2/0_1 + assertEquals(3, readOpsDelta); // change file modification time and look for cache misses FileSystem fs1 = FileSystem.get(conf); @@ -3125,9 +3111,8 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: check side file for mock:/mocktbl2/0_1 - // call-3: open - mock:/mocktbl2/0_1 - assertEquals(3, readOpsDelta); + // call-2: open - mock:/mocktbl2/0_1 + assertEquals(2, readOpsDelta); // touch the next file fs1 = FileSystem.get(conf); @@ -3147,9 +3132,8 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: check side file for mock:/mocktbl2/0_0 - // call-3: open - mock:/mocktbl2/0_0 - assertEquals(3, readOpsDelta); + // call-2: open - mock:/mocktbl2/0_0 + assertEquals(2, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -3690,13 +3674,6 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: open(mock:/mocktable7/0_0) - // call-2: open(mock:/mocktable7/0_0) - // call-3: listLocatedFileStatuses(mock:/mocktable7) - // call-4: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid) - // call-5: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) - // call-6: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid) - // call-7: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) assertEquals(7, readOpsDelta); // revert back to local fs @@ -3771,11 +3748,6 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: open to read data - split 1 => mock:/mocktable8/0_0 - // call-2: listLocatedFileStatus(mock:/mocktable8) - // call-3: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid) - // call-4: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid) - // call-5: open(mock:/mocktable8/delta_0000001_0000001_0000/bucket_00001) assertEquals(5, readOpsDelta); // revert back to local fs diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index e0dfeab985..1656a5b80e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -543,10 +543,10 @@ public void testGetLogicalLength() throws Exception { /*create delta_1_1_0/bucket0 with 1 row and close the file*/ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .inspector(inspector).bucket(BUCKET).writingBase(false).minimumWriteId(1) - .maximumWriteId(1).finalDestination(root); - Path delta1_1_0 = new Path(root, AcidUtils.deltaSubdir( + .maximumWriteId(2).finalDestination(root); + Path delta1_2_0 = new Path(root, AcidUtils.deltaSubdir( options.getMinimumWriteId(), options.getMaximumWriteId(), options.getStatementId())); - Path bucket0 = AcidUtils.createBucketFile(delta1_1_0, BUCKET); + Path bucket0 = AcidUtils.createBucketFile(delta1_2_0, BUCKET); Path bucket0SideFile = OrcAcidUtils.getSideFile(bucket0); RecordUpdater ru = of.getRecordUpdater(root, options);