diff --git pom.xml pom.xml
index bd79ede870..57a30e5dd2 100644
--- pom.xml
+++ pom.xml
@@ -184,7 +184,8 @@
0.9.3
2.10.0
2.3
- 1.5.2
+
+ 1.5.3-SNAPSHOT
1.10.19
2.0.0-M5
4.1.17.Final
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 7818efbbf5..44dd6ec0e7 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -876,6 +876,11 @@ else if(statementId != parsedDelta.statementId) {
return results.toArray(new Path[results.size()]);
}
+ /**
+ * This will look at a footer of one of the files in the delta to see if the file is in Acid
+ * format, i.e. has acid metadata columns. The assumption is that for any dir, either all files
+ * are acid or all are not.
+ */
public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
String deltaDirName = deltaDir.getName();
if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
@@ -899,23 +904,38 @@ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSys
if (filename.startsWith(deltaPrefix)) {
//small optimization - delete delta can't be in raw format
boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs);
- String rest = filename.substring(deltaPrefix.length());
- int split = rest.indexOf('_');
- int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
- long min = Long.parseLong(rest.substring(0, split));
- long max = split2 == -1 ?
- Long.parseLong(rest.substring(split + 1)) :
- Long.parseLong(rest.substring(split + 1, split2));
- if(split2 == -1) {
- return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat);
- }
- int statementId = Integer.parseInt(rest.substring(split2 + 1));
- return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat);
+ return parsedDelta(deltaDir, isRawFormat);
}
throw new IllegalArgumentException(deltaDir + " does not start with " +
deltaPrefix);
}
+ /**
+ * This method just parses the file name. It relies on caller to figure if the file is in
+ * Acid format (i.e. has acid metadata columns) or not.
+ * {@link #parsedDelta(Path, FileSystem)}
+ */
+ public static ParsedDelta parsedDelta(Path deltaDir, boolean isRawFormat) {
+ String filename = deltaDir.getName();
+ boolean isDeleteDelta = filename.startsWith(DELETE_DELTA_PREFIX);
+ //make sure it's null for delete delta no matter what was passed in - this doesn't apply to
+ //delete deltas
+ isRawFormat = isDeleteDelta ? false : isRawFormat;
+ String rest = filename.substring((isDeleteDelta ? DELETE_DELTA_PREFIX : DELTA_PREFIX).length());
+ int split = rest.indexOf('_');
+ int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
+ long min = Long.parseLong(rest.substring(0, split));
+ long max = split2 == -1 ?
+ Long.parseLong(rest.substring(split + 1)) :
+ Long.parseLong(rest.substring(split + 1, split2));
+ if(split2 == -1) {
+ return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat);
+ }
+ int statementId = Integer.parseInt(rest.substring(split2 + 1));
+ return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat);
+
+ }
+
/**
* Is the given directory in ACID format?
* @param directory the partition directory to check
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 1841cfaa2e..f58dc92f59 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -87,6 +87,12 @@
* something further in the data pipeline wants {@link VirtualColumn#ROWID}
*/
private final boolean rowIdProjected;
+ /**
+ * if false, we don't need any acid medadata columns from the file because we
+ * know all data in the split is valid (wrt to visible writeIDs/delete events)
+ * and ROW_ID is not needed higher up in the operator pipeline
+ */
+ private final boolean includeAcidColumns;
/**
* partition/table root
*/
@@ -230,6 +236,38 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte
rootPath = orcSplit.getRootDir();
//why even compute syntheticProps if !isOriginal???
syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList);
+
+ /*figure out if we can skip reading acid metadata columns:
+ * isOriginal - don't have meta columns - nothing to skip
+ * there no relevant delete events && ROW__ID is not needed higher up (e.g.
+ * this is not a delete statement)*/
+ if(!isOriginal && deleteEventRegistry.isEmpty() && !rowIdProjected) {
+ Path parent = orcSplit.getPath().getParent();
+ while(parent != null && !rootPath.equals(parent)) {
+ if(parent.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ /**
+ * The assumption here is that any base_x is filtered out by
+ * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} so if we see it
+ * here it's valid.
+ * {@link AcidUtils#isValidBase(long, ValidWriteIdList, Path, FileSystem)} can check but
+ * it makes a {@link FileSystem} call. Should really move all this to split-generation...
+ */
+ readerOptions.includeAcidColumns(false);
+ break;
+ }
+ else {
+ AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, isOriginal);
+ if(validWriteIdList.isWriteIdRangeValid(pd.getMinWriteId(), pd.getMaxWriteId()) ==
+ ValidWriteIdList.RangeResponse.ALL) {
+ //all write IDs in range are committed (and visible in current snapshot)
+ readerOptions.includeAcidColumns(false);
+ break;
+ }
+ }
+ parent = parent.getParent();
+ }
+ }
+ includeAcidColumns = readerOptions.getIncludeAcidColumns();
}
public void setBaseAndInnerReader(
@@ -409,7 +447,16 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti
} catch (Exception e) {
throw new IOException("error iterating", e);
}
-
+ if(!includeAcidColumns) {
+ //if here, we don't need to filter anything wrt acid metadata columns
+ //in fact, they are not even read from file
+ value.size = vectorizedRowBatchBase.size;
+ value.selected = vectorizedRowBatchBase.selected;
+ value.selectedInUse = vectorizedRowBatchBase.selectedInUse;
+ copyFromBase(value);
+ progress = baseReader.getProgress();
+ return true;
+ }
// Once we have read the VectorizedRowBatchBase from the file, there are two kinds of cases
// for which we might have to discard rows from the batch:
// Case 1- when the row is created by a transaction that is not valid, or
@@ -465,22 +512,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti
/* Just copy the payload. {@link recordIdColumnVector} has already been populated */
System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount());
} else {
- int payloadCol = OrcRecordUpdater.ROW;
- if (isFlatPayload) {
- // Ignore the struct column and just copy all the following data columns.
- System.arraycopy(vectorizedRowBatchBase.cols, payloadCol + 1, value.cols, 0,
- vectorizedRowBatchBase.cols.length - payloadCol - 1);
- } else {
- StructColumnVector payloadStruct =
- (StructColumnVector) vectorizedRowBatchBase.cols[payloadCol];
- // Transfer columnVector objects from base batch to outgoing batch.
- System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
- }
- if (rowIdProjected) {
- recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID];
- recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
- recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
- }
+ copyFromBase(value);
}
if (rowIdProjected) {
int ix = rbCtx.findVirtualColumnNum(VirtualColumn.ROWID);
@@ -490,6 +522,25 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti
return true;
}
+ private void copyFromBase(VectorizedRowBatch value) {
+ assert !isOriginal;
+ int payloadCol = OrcRecordUpdater.ROW;
+ if (isFlatPayload) {
+ // Ignore the struct column and just copy all the following data columns.
+ System.arraycopy(vectorizedRowBatchBase.cols, payloadCol + 1, value.cols, 0,
+ vectorizedRowBatchBase.cols.length - payloadCol - 1);
+ } else {
+ StructColumnVector payloadStruct =
+ (StructColumnVector) vectorizedRowBatchBase.cols[payloadCol];
+ // Transfer columnVector objects from base batch to outgoing batch.
+ System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
+ }
+ if (rowIdProjected) {
+ recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID];
+ recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
+ recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
+ }
+ }
private ColumnVector[] handleOriginalFile(
BitSet selectedBitSet, ColumnVector[] innerRecordIdColumnVector) throws IOException {
/*