diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 5dbf634825..922634b77b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -83,6 +83,7 @@ import com.google.common.annotations.VisibleForTesting; +import javax.annotation.concurrent.Immutable; import java.nio.charset.Charset; /** @@ -740,7 +741,7 @@ public String toString() { /** * Get the list of base and delta directories that are valid and not * obsolete. Not {@code null}. List must be sorted in a specific way. - * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta)} + * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)} * for details. * @return the minimal list of current directories */ @@ -804,26 +805,16 @@ public static ParsedBase parseBase(Path path) { Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path); } } + /** - * Immutable + * In addition to {@link ParsedDeltaLight} this knows if the data is in raw format, i.e. doesn't + * have acid metadata columns embedded in the files. To determine this in some cases + * requires looking at the footer of the data file which can be expensive so if this info is + * not needed {@link ParsedDeltaLight} should be used. */ - public static final class ParsedDelta implements Comparable { - private final long minWriteId; - private final long maxWriteId; - private final FileStatus path; - //-1 is for internal (getAcidState()) purposes and means the delta dir - //had no statement ID - private final int statementId; - private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + @Immutable + public static final class ParsedDelta extends ParsedDeltaLight { private final boolean isRawFormat; - /** - * transaction Id of txn which created this delta. This dir should be considered - * invisible unless this txn is committed - * - * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments - * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc - */ - private final long visibilityTxnId; /** * for pre 1.3.x delta files */ @@ -833,13 +824,52 @@ private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta, } private ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) { + super(min, max, path, statementId, isDeleteDelta, visibilityTxnId); + this.isRawFormat = isRawFormat; + } + /** + * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE} + */ + public boolean isRawFormat() { + return isRawFormat; + } + } + /** + * This encapsulates info obtained form the file path. + * See also {@link ParsedDelta}. + */ + @Immutable + public static class ParsedDeltaLight implements Comparable { + final long minWriteId; + final long maxWriteId; + final FileStatus path; + //-1 is for internal (getAcidState()) purposes and means the delta dir + //had no statement ID + final int statementId; + final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...' + /** + * transaction Id of txn which created this delta. This dir should be considered + * invisible unless this txn is committed + * + * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments + * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc + */ + final long visibilityTxnId; + + public static ParsedDeltaLight parse(FileStatus deltaDir) { + //passing isRawFormat=false is bogus. This is just to parse the file name. + ParsedDelta pd = parsedDelta(deltaDir.getPath(), false); + return new ParsedDeltaLight(pd.getMinWriteId(), pd.getMaxWriteId(), deltaDir, + pd.getStatementId(), pd.isDeleteDelta(), pd.getVisibilityTxnId()); + } + + private ParsedDeltaLight(long min, long max, FileStatus path, int statementId, + boolean isDeleteDelta, long visibilityTxnId) { this.minWriteId = min; this.maxWriteId = max; this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; - this.isRawFormat = isRawFormat; - assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format"; this.visibilityTxnId = visibilityTxnId; } @@ -862,15 +892,17 @@ public int getStatementId() { public boolean isDeleteDelta() { return isDeleteDelta; } - /** - * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE} - */ - public boolean isRawFormat() { - return isRawFormat; - } public long getVisibilityTxnId() { return visibilityTxnId; } + /** + * Only un-compacted delta_x_y (x != y) (created by streaming ingest with batch size > 1) + * may contain a {@link OrcAcidUtils#getSideFile(Path)}. + * @return + */ + boolean mayContainSideFile() { + return !isDeleteDelta() && getMinWriteId() != getMaxWriteId() && getVisibilityTxnId() <= 0; + } /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with @@ -879,7 +911,7 @@ public long getVisibilityTxnId() { * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) */ @Override - public int compareTo(ParsedDelta parsedDelta) { + public int compareTo(ParsedDeltaLight parsedDelta) { if (minWriteId != parsedDelta.minWriteId) { if (minWriteId < parsedDelta.minWriteId) { return -1; @@ -1494,7 +1526,9 @@ public static boolean isTablePropertyTransactional(Configuration conf) { public static boolean isDeleteDelta(Path p) { return p.getName().startsWith(DELETE_DELTA_PREFIX); } - + public static boolean isInsertDelta(Path p) { + return p.getName().startsWith(DELTA_PREFIX); + } public static boolean isTransactionalTable(CreateTableDesc table) { if (table == null || table.getTblProps() == null) { return false; @@ -1657,6 +1691,16 @@ public static AcidOperationalProperties getAcidOperationalProperties( * @param file - data file to read/compute splits on */ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException { + Path acidDir = file.getPath().getParent(); //should be base_x or delta_x_y_ + if(AcidUtils.isInsertDelta(acidDir)) { + ParsedDeltaLight pd = ParsedDeltaLight.parse(fs.getFileStatus(acidDir)); + if(!pd.mayContainSideFile()) { + return file.getLen(); + } + } + else { + return file.getLen(); + } Path lengths = OrcAcidUtils.getSideFile(file.getPath()); if(!fs.exists(lengths)) { /** @@ -2005,11 +2049,33 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce } /** - * Checks if the files in base/delta dir are a result of Load Data statement and thus do not - * have ROW_IDs embedded in the data. + * Checks if the files in base/delta dir are a result of Load Data/Add Partition statement + * and thus do not have ROW_IDs embedded in the data. + * This is only meaningful for full CRUD tables - Insert-only tables have all their data + * in raw format by definition. * @param baseOrDeltaDir base or delta file. */ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException { + //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have + // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction + // must be native Acid format by definition + if(isDeleteDelta(baseOrDeltaDir)) { + return false; + } + if(isInsertDelta(baseOrDeltaDir)) { + ParsedDeltaLight pd = ParsedDeltaLight.parse(fs.getFileStatus(baseOrDeltaDir)); + if(pd.getMinWriteId() != pd.getMaxWriteId()) { + //must be either result of streaming or compaction + return false; + } + } + else { + //must be base_x + if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs)) { + return false; + } + } + //if here, have to check the files Path dataFile = chooseFile(baseOrDeltaDir, fs); if (dataFile == null) { //directory is empty or doesn't have any that could have been produced by load data diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index fbb931cbcd..4578140b29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1107,6 +1107,8 @@ public Options clone() { throw new IllegalStateException(delta + " is not delete delta and is not compacting."); } ReaderKey key = new ReaderKey(); + //todo: only need to know isRawFormat if compacting for acid V2 and V2 should normally run + //in vectorized mode - i.e. this is not a significant perf overhead vs ParsedDeltaLight AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf)); if(deltaDir.isRawFormat()) { assert !deltaDir.isDeleteDelta() : delta.toString(); @@ -1228,8 +1230,8 @@ static TransactionMetaData findWriteIDForSynthetcRowIDs(Path splitPath, Path roo parent); } else { - AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX, - parent.getFileSystem(conf)); + AcidUtils.ParsedDeltaLight pd = AcidUtils.ParsedDeltaLight + .parse(parent.getFileSystem(conf).getFileStatus(parent)); assert pd.getMinWriteId() == pd.getMaxWriteId() : "This a delta with raw non acid schema, must be result of single write, no compaction: " + splitPath;