diff --git pom.xml pom.xml index bd79ede870..57a30e5dd2 100644 --- pom.xml +++ pom.xml @@ -184,7 +184,8 @@ 0.9.3 2.10.0 2.3 - 1.5.2 + + 1.5.3-SNAPSHOT 1.10.19 2.0.0-M5 4.1.17.Final diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 7818efbbf5..44dd6ec0e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -876,6 +876,11 @@ else if(statementId != parsedDelta.statementId) { return results.toArray(new Path[results.size()]); } + /** + * This will look at a footer of one of the files in the delta to see if the file is in Acid + * format, i.e. has acid metadata columns. The assumption is that for any dir, either all files + * are acid or all are not. + */ public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException { String deltaDirName = deltaDir.getName(); if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) { @@ -899,23 +904,38 @@ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSys if (filename.startsWith(deltaPrefix)) { //small optimization - delete delta can't be in raw format boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs); - String rest = filename.substring(deltaPrefix.length()); - int split = rest.indexOf('_'); - int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId - long min = Long.parseLong(rest.substring(0, split)); - long max = split2 == -1 ? - Long.parseLong(rest.substring(split + 1)) : - Long.parseLong(rest.substring(split + 1, split2)); - if(split2 == -1) { - return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat); - } - int statementId = Integer.parseInt(rest.substring(split2 + 1)); - return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat); + return parsedDelta(deltaDir, isRawFormat); } throw new IllegalArgumentException(deltaDir + " does not start with " + deltaPrefix); } + /** + * This method just parses the file name. It relies on caller to figure if the file is in + * Acid format (i.e. has acid metadata columns) or not. + * {@link #parsedDelta(Path, FileSystem)} + */ + public static ParsedDelta parsedDelta(Path deltaDir, boolean isRawFormat) { + String filename = deltaDir.getName(); + boolean isDeleteDelta = filename.startsWith(DELETE_DELTA_PREFIX); + //make sure it's null for delete delta no matter what was passed in - this doesn't apply to + //delete deltas + isRawFormat = isDeleteDelta ? false : isRawFormat; + String rest = filename.substring((isDeleteDelta ? DELETE_DELTA_PREFIX : DELTA_PREFIX).length()); + int split = rest.indexOf('_'); + int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId + long min = Long.parseLong(rest.substring(0, split)); + long max = split2 == -1 ? + Long.parseLong(rest.substring(split + 1)) : + Long.parseLong(rest.substring(split + 1, split2)); + if(split2 == -1) { + return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat); + } + int statementId = Integer.parseInt(rest.substring(split2 + 1)); + return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat); + + } + /** * Is the given directory in ACID format? * @param directory the partition directory to check diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 1841cfaa2e..f58dc92f59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -87,6 +87,12 @@ * something further in the data pipeline wants {@link VirtualColumn#ROWID} */ private final boolean rowIdProjected; + /** + * if false, we don't need any acid medadata columns from the file because we + * know all data in the split is valid (wrt to visible writeIDs/delete events) + * and ROW_ID is not needed higher up in the operator pipeline + */ + private final boolean includeAcidColumns; /** * partition/table root */ @@ -230,6 +236,38 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte rootPath = orcSplit.getRootDir(); //why even compute syntheticProps if !isOriginal??? syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList); + + /*figure out if we can skip reading acid metadata columns: + * isOriginal - don't have meta columns - nothing to skip + * there no relevant delete events && ROW__ID is not needed higher up (e.g. + * this is not a delete statement)*/ + if(!isOriginal && deleteEventRegistry.isEmpty() && !rowIdProjected) { + Path parent = orcSplit.getPath().getParent(); + while(parent != null && !rootPath.equals(parent)) { + if(parent.getName().startsWith(AcidUtils.BASE_PREFIX)) { + /** + * The assumption here is that any base_x is filtered out by + * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} so if we see it + * here it's valid. + * {@link AcidUtils#isValidBase(long, ValidWriteIdList, Path, FileSystem)} can check but + * it makes a {@link FileSystem} call. Should really move all this to split-generation... + */ + readerOptions.includeAcidColumns(false); + break; + } + else { + AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, isOriginal); + if(validWriteIdList.isWriteIdRangeValid(pd.getMinWriteId(), pd.getMaxWriteId()) == + ValidWriteIdList.RangeResponse.ALL) { + //all write IDs in range are committed (and visible in current snapshot) + readerOptions.includeAcidColumns(false); + break; + } + } + parent = parent.getParent(); + } + } + includeAcidColumns = readerOptions.getIncludeAcidColumns(); } public void setBaseAndInnerReader( @@ -409,7 +447,16 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } catch (Exception e) { throw new IOException("error iterating", e); } - + if(!includeAcidColumns) { + //if here, we don't need to filter anything wrt acid metadata columns + //in fact, they are not even read from file + value.size = vectorizedRowBatchBase.size; + value.selected = vectorizedRowBatchBase.selected; + value.selectedInUse = vectorizedRowBatchBase.selectedInUse; + copyFromBase(value); + progress = baseReader.getProgress(); + return true; + } // Once we have read the VectorizedRowBatchBase from the file, there are two kinds of cases // for which we might have to discard rows from the batch: // Case 1- when the row is created by a transaction that is not valid, or @@ -465,22 +512,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti /* Just copy the payload. {@link recordIdColumnVector} has already been populated */ System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount()); } else { - int payloadCol = OrcRecordUpdater.ROW; - if (isFlatPayload) { - // Ignore the struct column and just copy all the following data columns. - System.arraycopy(vectorizedRowBatchBase.cols, payloadCol + 1, value.cols, 0, - vectorizedRowBatchBase.cols.length - payloadCol - 1); - } else { - StructColumnVector payloadStruct = - (StructColumnVector) vectorizedRowBatchBase.cols[payloadCol]; - // Transfer columnVector objects from base batch to outgoing batch. - System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); - } - if (rowIdProjected) { - recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID]; - recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; - recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; - } + copyFromBase(value); } if (rowIdProjected) { int ix = rbCtx.findVirtualColumnNum(VirtualColumn.ROWID); @@ -490,6 +522,25 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti return true; } + private void copyFromBase(VectorizedRowBatch value) { + assert !isOriginal; + int payloadCol = OrcRecordUpdater.ROW; + if (isFlatPayload) { + // Ignore the struct column and just copy all the following data columns. + System.arraycopy(vectorizedRowBatchBase.cols, payloadCol + 1, value.cols, 0, + vectorizedRowBatchBase.cols.length - payloadCol - 1); + } else { + StructColumnVector payloadStruct = + (StructColumnVector) vectorizedRowBatchBase.cols[payloadCol]; + // Transfer columnVector objects from base batch to outgoing batch. + System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); + } + if (rowIdProjected) { + recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID]; + recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; + recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; + } + } private ColumnVector[] handleOriginalFile( BitSet selectedBitSet, ColumnVector[] innerRecordIdColumnVector) throws IOException { /*