diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 5dbf634825..922634b77b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -83,6 +83,7 @@ import com.google.common.annotations.VisibleForTesting; +import javax.annotation.concurrent.Immutable; import java.nio.charset.Charset; /** @@ -740,7 +741,7 @@ public String toString() { /** * Get the list of base and delta directories that are valid and not * obsolete. Not {@code null}. List must be sorted in a specific way. - * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta)} + * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)} * for details. * @return the minimal list of current directories */ @@ -804,26 +805,16 @@ public static ParsedBase parseBase(Path path) { Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path); } } + /** - * Immutable + * In addition to {@link ParsedDeltaLight} this knows if the data is in raw format, i.e. doesn't + * have acid metadata columns embedded in the files. To determine this in some cases + * requires looking at the footer of the data file which can be expensive so if this info is + * not needed {@link ParsedDeltaLight} should be used. */ - public static final class ParsedDelta implements Comparable { - private final long minWriteId; - private final long maxWriteId; - private final FileStatus path; - //-1 is for internal (getAcidState()) purposes and means the delta dir - //had no statement ID - private final int statementId; - private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + @Immutable + public static final class ParsedDelta extends ParsedDeltaLight { private final boolean isRawFormat; - /** - * transaction Id of txn which created this delta. This dir should be considered - * invisible unless this txn is committed - * - * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments - * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc - */ - private final long visibilityTxnId; /** * for pre 1.3.x delta files */ @@ -833,13 +824,52 @@ private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta, } private ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) { + super(min, max, path, statementId, isDeleteDelta, visibilityTxnId); + this.isRawFormat = isRawFormat; + } + /** + * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE} + */ + public boolean isRawFormat() { + return isRawFormat; + } + } + /** + * This encapsulates info obtained form the file path. + * See also {@link ParsedDelta}. + */ + @Immutable + public static class ParsedDeltaLight implements Comparable { + final long minWriteId; + final long maxWriteId; + final FileStatus path; + //-1 is for internal (getAcidState()) purposes and means the delta dir + //had no statement ID + final int statementId; + final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + /** + * transaction Id of txn which created this delta. This dir should be considered + * invisible unless this txn is committed + * + * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments + * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc + */ + final long visibilityTxnId; + + public static ParsedDeltaLight parse(FileStatus deltaDir) { + //passing isRawFormat=false is bogus. This is just to parse the file name. + ParsedDelta pd = parsedDelta(deltaDir.getPath(), false); + return new ParsedDeltaLight(pd.getMinWriteId(), pd.getMaxWriteId(), deltaDir, + pd.getStatementId(), pd.isDeleteDelta(), pd.getVisibilityTxnId()); + } + + private ParsedDeltaLight(long min, long max, FileStatus path, int statementId, + boolean isDeleteDelta, long visibilityTxnId) { this.minWriteId = min; this.maxWriteId = max; this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; - this.isRawFormat = isRawFormat; - assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format"; this.visibilityTxnId = visibilityTxnId; } @@ -862,15 +892,17 @@ public int getStatementId() { public boolean isDeleteDelta() { return isDeleteDelta; } - /** - * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE} - */ - public boolean isRawFormat() { - return isRawFormat; - } public long getVisibilityTxnId() { return visibilityTxnId; } + /** + * Only un-compacted delta_x_y (x != y) (created by streaming ingest with batch size > 1) + * may contain a {@link OrcAcidUtils#getSideFile(Path)}. + * @return + */ + boolean mayContainSideFile() { + return !isDeleteDelta() && getMinWriteId() != getMaxWriteId() && getVisibilityTxnId() <= 0; + } /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with @@ -879,7 +911,7 @@ public long getVisibilityTxnId() { * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) */ @Override - public int compareTo(ParsedDelta parsedDelta) { + public int compareTo(ParsedDeltaLight parsedDelta) { if (minWriteId != parsedDelta.minWriteId) { if (minWriteId < parsedDelta.minWriteId) { return -1; @@ -1494,7 +1526,9 @@ public static boolean isTablePropertyTransactional(Configuration conf) { public static boolean isDeleteDelta(Path p) { return p.getName().startsWith(DELETE_DELTA_PREFIX); } - + public static boolean isInsertDelta(Path p) { + return p.getName().startsWith(DELTA_PREFIX); + } public static boolean isTransactionalTable(CreateTableDesc table) { if (table == null || table.getTblProps() == null) { return false; @@ -1657,6 +1691,16 @@ public static AcidOperationalProperties getAcidOperationalProperties( * @param file - data file to read/compute splits on */ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException { + Path acidDir = file.getPath().getParent(); //should be base_x or delta_x_y_ + if(AcidUtils.isInsertDelta(acidDir)) { + ParsedDeltaLight pd = ParsedDeltaLight.parse(fs.getFileStatus(acidDir)); + if(!pd.mayContainSideFile()) { + return file.getLen(); + } + } + else { + return file.getLen(); + } Path lengths = OrcAcidUtils.getSideFile(file.getPath()); if(!fs.exists(lengths)) { /** @@ -2005,11 +2049,33 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce } /** - * Checks if the files in base/delta dir are a result of Load Data statement and thus do not - * have ROW_IDs embedded in the data. + * Checks if the files in base/delta dir are a result of Load Data/Add Partition statement + * and thus do not have ROW_IDs embedded in the data. + * This is only meaningful for full CRUD tables - Insert-only tables have all their data + * in raw format by definition. * @param baseOrDeltaDir base or delta file. */ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have + // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction + // must be native Acid format by definition + if(isDeleteDelta(baseOrDeltaDir)) { + return false; + } + if(isInsertDelta(baseOrDeltaDir)) { + ParsedDeltaLight pd = ParsedDeltaLight.parse(fs.getFileStatus(baseOrDeltaDir)); + if(pd.getMinWriteId() != pd.getMaxWriteId()) { + //must be either result of streaming or compaction + return false; + } + } + else { + //must be base_x + if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs)) { + return false; + } + } + //if here, have to check the files Path dataFile = chooseFile(baseOrDeltaDir, fs); if (dataFile == null) { //directory is empty or doesn't have any that could have been produced by load data diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index fbb931cbcd..4578140b29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1107,6 +1107,8 @@ public Options clone() { throw new IllegalStateException(delta + " is not delete delta and is not compacting."); } ReaderKey key = new ReaderKey(); + //todo: only need to know isRawFormat if compacting for acid V2 and V2 should normally run + //in vectorized mode - i.e. this is not a significant perf overhead vs ParsedDeltaLight AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf)); if(deltaDir.isRawFormat()) { assert !deltaDir.isDeleteDelta() : delta.toString(); @@ -1228,8 +1230,8 @@ static TransactionMetaData findWriteIDForSynthetcRowIDs(Path splitPath, Path roo parent); } else { - AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX, - parent.getFileSystem(conf)); + AcidUtils.ParsedDeltaLight pd = AcidUtils.ParsedDeltaLight + .parse(parent.getFileSystem(conf).getFileStatus(parent)); assert pd.getMinWriteId() == pd.getMaxWriteId() : "This a delta with raw non acid schema, must be result of single write, no compaction: " + splitPath; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 02fde2245c..432ce9be60 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -115,7 +115,7 @@ public void testParsing() throws Exception { AcidUtils.ParsedBase.parseBase(new Path("/tmp/base_000123")).getWriteId()); assertEquals(0, AcidUtils.ParsedBase.parseBase(new Path("/tmp/base_000123")).getVisibilityTxnId()); - Path dir = new Path("/tmp/tbl"); + Path dir = new Path("mock:/tmp/"); AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"), conf); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 91458ea87a..29b4740a66 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2808,11 +2808,9 @@ public void testSplitGenReadOps() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: check existence of side file for mock:/mocktable/0_0 - // call-3: open - mock:/mocktable/0_0 - // call-4: check existence of side file for mock:/mocktable/0_1 - // call-5: open - mock:/mocktable/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktable/0_0 + // call-3: open - mock:/mocktable/0_1 + assertEquals(3, readOpsDelta); assertEquals(2, splits.length); // revert back to local fs @@ -2868,11 +2866,9 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: check existence of side file for mock:/mocktbl/0_0 - // call-3: open - mock:/mocktbl/0_0 - // call-4: check existence of side file for mock:/mocktbl/0_1 - // call-5: open - mock:/mocktbl/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl/0_0 + // call-3: open - mock:/mocktbl/0_1 + assertEquals(3, readOpsDelta); // force BI to avoid reading footers conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); @@ -2890,9 +2886,7 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: check existence of side file for mock:/mocktbl/0_0 - // call-3: check existence of side file for mock:/mocktbl/0_1 - assertEquals(3, readOpsDelta); + assertEquals(1, readOpsDelta); // enable cache and use default strategy conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb"); @@ -2911,11 +2905,9 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: check existence of side file for mock:/mocktbl/0_0 - // call-3: open - mock:/mocktbl/0_0 - // call-4: check existence of side file for mock:/mocktbl/0_1 - // call-5: open - mock:/mocktbl/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl/0_0 + // call-3: open - mock:/mocktbl/0_1 + assertEquals(3, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -2985,11 +2977,9 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: check side file for mock:/mocktbl1/0_0 - // call-3: open - mock:/mocktbl1/0_0 - // call-4: check side file for mock:/mocktbl1/0_1 - // call-5: open - mock:/mocktbl1/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl1/0_0 + // call-3: open - mock:/mocktbl1/0_1 + assertEquals(3, readOpsDelta); // change file length and look for cache misses @@ -3026,11 +3016,9 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: check side file for mock:/mocktbl1/0_0 - // call-3: open - mock:/mocktbl1/0_0 - // call-4: check side file for mock:/mocktbl1/0_1 - // call-5: open - mock:/mocktbl1/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl1/0_0 + // call-3: open - mock:/mocktbl1/0_1 + assertEquals(3, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -3101,11 +3089,9 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: check side file for mock:/mocktbl2/0_0 - // call-3: open - mock:/mocktbl2/0_0 - // call-4: check side file for mock:/mocktbl2/0_1 - // call-5: open - mock:/mocktbl2/0_1 - assertEquals(5, readOpsDelta); + // call-2: open - mock:/mocktbl2/0_0 + // call-3: open - mock:/mocktbl2/0_1 + assertEquals(3, readOpsDelta); // change file modification time and look for cache misses FileSystem fs1 = FileSystem.get(conf); @@ -3125,9 +3111,8 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: check side file for mock:/mocktbl2/0_1 - // call-3: open - mock:/mocktbl2/0_1 - assertEquals(3, readOpsDelta); + // call-2: open - mock:/mocktbl2/0_1 + assertEquals(2, readOpsDelta); // touch the next file fs1 = FileSystem.get(conf); @@ -3147,9 +3132,8 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: check side file for mock:/mocktbl2/0_0 - // call-3: open - mock:/mocktbl2/0_0 - assertEquals(3, readOpsDelta); + // call-2: open - mock:/mocktbl2/0_0 + assertEquals(2, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -3690,14 +3674,7 @@ public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: open(mock:/mocktable7/0_0) - // call-2: open(mock:/mocktable7/0_0) - // call-3: listLocatedFileStatuses(mock:/mocktable7) - // call-4: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid) - // call-5: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) - // call-6: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid) - // call-7: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001) - assertEquals(7, readOpsDelta); + assertEquals(9, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); @@ -3771,12 +3748,7 @@ public void testACIDReaderFooterSerializeWithDeltas() throws Exception { readOpsDelta = statistics.getReadOps() - readOpsBefore; } } - // call-1: open to read data - split 1 => mock:/mocktable8/0_0 - // call-2: listLocatedFileStatus(mock:/mocktable8) - // call-3: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid) - // call-4: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid) - // call-5: open(mock:/mocktable8/delta_0000001_0000001_0000/bucket_00001) - assertEquals(5, readOpsDelta); + assertEquals(7, readOpsDelta); // revert back to local fs conf.set("fs.defaultFS", "file:///"); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index e0dfeab985..1656a5b80e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -543,10 +543,10 @@ public void testGetLogicalLength() throws Exception { /*create delta_1_1_0/bucket0 with 1 row and close the file*/ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .inspector(inspector).bucket(BUCKET).writingBase(false).minimumWriteId(1) - .maximumWriteId(1).finalDestination(root); - Path delta1_1_0 = new Path(root, AcidUtils.deltaSubdir( + .maximumWriteId(2).finalDestination(root); + Path delta1_2_0 = new Path(root, AcidUtils.deltaSubdir( options.getMinimumWriteId(), options.getMaximumWriteId(), options.getStatementId())); - Path bucket0 = AcidUtils.createBucketFile(delta1_1_0, BUCKET); + Path bucket0 = AcidUtils.createBucketFile(delta1_2_0, BUCKET); Path bucket0SideFile = OrcAcidUtils.getSideFile(bucket0); RecordUpdater ru = of.getRecordUpdater(root, options);