diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 7451ea4..6897336 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -86,6 +86,8 @@ private final IncludesImpl includes; private final SearchArgument sarg; private final VectorizedRowBatchCtx rbCtx; + private final boolean isVectorized; + private VectorizedOrcAcidRowBatchReader acidReader; private final Object[] partitionValues; private final LinkedBlockingQueue queue; @@ -174,6 +176,12 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, partitionValues = null; } + this.isVectorized = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); + if (isAcidScan) { + this.acidReader = new VectorizedOrcAcidRowBatchReader((OrcSplit) split, jobConf, Reporter.NULL, null, rbCtx, + true); + } + // Create the consumer of encoded data; it will coordinate decoding to CVBs. feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes, sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo()); @@ -309,8 +317,6 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime); return false; } - final boolean isVectorized = HiveConf.getBoolVar(jobConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); if (isAcidScan) { vrb.selectedInUse = true; if (isVectorized) { @@ -329,10 +335,7 @@ public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException inputVrb.cols[ixInVrb] = cvb.cols[ixInReadSet]; } inputVrb.size = cvb.size; - // TODO: reuse between calls - @SuppressWarnings("resource") - VectorizedOrcAcidRowBatchReader acidReader = new VectorizedOrcAcidRowBatchReader( - (OrcSplit)split, jobConf, Reporter.NULL, new AcidWrapper(inputVrb), rbCtx, true); + acidReader.setBaseAndInnerReader(new AcidWrapper(inputVrb)); acidReader.next(NullWritable.get(), vrb); } else { // TODO: WTF? The old code seems to just drop the ball here. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index d84d0ee..7fce67f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -315,6 +315,21 @@ public static long parseBase(Path path) { } /** + * Get the bucket id from the file path + * @param bucketFile - bucket file path + * @return - bucket id + */ + public static int parseBucketId(Path bucketFile) { + String filename = bucketFile.getName(); + if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { + return Integer.parseInt(filename.substring(0, filename.indexOf('_'))); + } else if (filename.startsWith(BUCKET_PREFIX)) { + return Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); + } + return -1; + } + + /** * Parse a bucket filename back into the options that would have created * the file. * @param bucketFile the path to a bucket file @@ -326,9 +341,8 @@ public static long parseBase(Path path) { Configuration conf) throws IOException { AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf); String filename = bucketFile.getName(); + int bucket = parseBucketId(bucketFile); if (ORIGINAL_PATTERN.matcher(filename).matches()) { - int bucket = - Integer.parseInt(filename.substring(0, filename.indexOf('_'))); result .setOldStyle(true) .minimumWriteId(0) @@ -338,8 +352,6 @@ public static long parseBase(Path path) { } else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { //todo: define groups in regex and use parseInt(Matcher.group(2)).... - int bucket = - Integer.parseInt(filename.substring(0, filename.indexOf('_'))); int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); result .setOldStyle(true) @@ -350,8 +362,6 @@ else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX)); } else if (filename.startsWith(BUCKET_PREFIX)) { - int bucket = - Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { result .setOldStyle(false) @@ -377,7 +387,7 @@ else if (filename.startsWith(BUCKET_PREFIX)) { .bucket(bucket); } } else { - result.setOldStyle(true).bucket(-1).minimumWriteId(0) + result.setOldStyle(true).bucket(bucket).minimumWriteId(0) .maximumWriteId(0); } return result; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 049dbd3..f461364 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2311,8 +2311,7 @@ private static boolean isStripeSatisfyPredicate( boolean isOriginal, List parsedDeltas, List readerTypes, - UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) - throws IOException { + UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) { List deltas = AcidUtils.serializeDeltas(parsedDeltas); boolean[] covered = new boolean[context.numBuckets]; @@ -2321,9 +2320,7 @@ private static boolean isStripeSatisfyPredicate( long totalFileSize = 0; for (HdfsFileStatusWithId child : baseFiles) { totalFileSize += child.getFileStatus().getLen(); - AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename - (child.getFileStatus().getPath(), context.conf); - int b = opts.getBucketId(); + int b = AcidUtils.parseBucketId(child.getFileStatus().getPath()); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. if (b >= 0 && b < covered.length) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 5655ee9..8c7c72e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -459,9 +459,8 @@ static int encodeBucketId(Configuration conf, int bucketId, int statementId) { AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { - AcidOutputFormat.Options bucketOptions = - AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() != bucketId) { + int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); + if (bucketIdFromPath != bucketId) { continue;//todo: HIVE-16952 } if (haveSeenCurrentFile) { @@ -632,9 +631,8 @@ public void next(OrcStruct next) throws IOException { */ private Reader advanceToNextFile() throws IOException { while(nextFileIndex < originalFiles.size()) { - AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename( - originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() == bucketId) { + int bucketIdFromPath = AcidUtils.parseBucketId(originalFiles.get(nextFileIndex).getFileStatus().getPath()); + if (bucketIdFromPath == bucketId) { break; } //the the bucket we care about here diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 8caa265..d2e1a68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -150,12 +150,12 @@ public float getProgress() throws IOException { * LLAP IO c'tor */ public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter, - org.apache.hadoop.mapred.RecordReader baseReader, - VectorizedRowBatchCtx rbCtx, boolean isFlatPayload) throws IOException { + org.apache.hadoop.mapred.RecordReader baseReader, + VectorizedRowBatchCtx rbCtx, boolean isFlatPayload) throws IOException { this(conf, inputSplit, reporter, rbCtx, isFlatPayload); - this.baseReader = baseReader; - this.innerReader = null; - this.vectorizedRowBatchBase = baseReader.createValue(); + if (baseReader != null) { + setBaseAndInnerReader(baseReader); + } } private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter, @@ -224,6 +224,13 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList); } + public void setBaseAndInnerReader( + final org.apache.hadoop.mapred.RecordReader baseReader) { + this.baseReader = baseReader; + this.innerReader = null; + this.vectorizedRowBatchBase = baseReader.createValue(); + } + /** * Used for generating synthetic ROW__IDs for reading "original" files */ @@ -268,16 +275,15 @@ private OffsetAndBucketProperty computeOffsetAndBucket( long rowIdOffset = 0; OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo = OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(split.getPath(), split.getRootDir(), conf); - int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); + int bucketId = AcidUtils.parseBucketId(split.getPath()); int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf) //statementId is from directory name (or 0 if there is none) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { - AcidOutputFormat.Options bucketOptions = - AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() != bucketId) { + int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); + if (bucketIdFromPath != bucketId) { continue;//HIVE-16952 } if (f.getFileStatus().getPath().equals(split.getPath())) { @@ -653,7 +659,7 @@ DeleteEventRegistry getDeleteEventRegistry() { throws IOException { final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); + int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); @@ -1001,7 +1007,7 @@ public int compareTo(CompressedOwid other) { ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); + int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b28c126..fb2335a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -115,6 +115,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.orc.FileFormatException; import org.apache.orc.OrcConf; import org.apache.orc.OrcProto; import org.apache.orc.StripeInformation; @@ -737,6 +738,70 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { } @Test + public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { + try { + MockFileSystem fs = new MockFileSystem(conf); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); + + conf.set("bucket_count", "4"); + //set up props for read + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); + AcidUtils.setAcidOperationalProperties(conf, true, null); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, + TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, + TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnTypesProperty()); + conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true); + MockPath mockPath = new MockPath(fs, "mock:/a"); + conf.set("mapred.input.dir", mockPath.toString()); + conf.set("fs.defaultFS", "mock:///"); + conf.set("fs.mock.impl", MockFileSystem.class.getName()); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + + OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), + false, null); + List> splitStrategies = createSplitStrategies(context, gen); + assertEquals(1, splitStrategies.size()); + assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); + List splits = ((OrcInputFormat.ACIDSplitStrategy) splitStrategies.get(0)).getSplits(); + // marker comment to look at stats read ops in target/surefire-reports/*-output.txt + System.out.println("STATS TRACE START - " + testCaseName.getMethodName()); + // when creating the reader below there are 2 read ops per bucket file (listStatus and open). + // HIVE-19588 removes listStatus from the code path so there should only be one read ops (open) after HIVE-19588 + int readsBefore = fs.statistics.getReadOps(); + for (OrcSplit split : splits) { + try { + new VectorizedOrcAcidRowBatchReader(split, conf, Reporter.NULL, new VectorizedRowBatchCtx()); + } catch (FileFormatException e) { + // this is expected as these mock files are not valid orc file + } + } + int readsAfter = fs.statistics.getReadOps(); + System.out.println("STATS TRACE END - " + testCaseName.getMethodName()); + int delta = readsAfter - readsBefore; + // 16 without HIVE-19588, 8 with HIVE-19588 + assertEquals(8, delta); + } finally { + MockFileSystem.clearGlobalFiles(); + } + } + + @Test public void testBIStrategySplitBlockBoundary() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); @@ -1182,6 +1247,7 @@ public void touch(MockFile file) { @Override public FSDataInputStream open(Path path, int i) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: open - " + path); checkAccess(); MockFile file = findFile(path); if (file != null) return new FSDataInputStream(new MockInputStream(file)); @@ -1293,6 +1359,7 @@ public LocatedFileStatus next() throws IOException { private List listLocatedFileStatuses(Path path) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: listLocatedFileStatuses - " + path); checkAccess(); path = path.makeQualified(this); List result = new ArrayList<>(); @@ -1316,6 +1383,7 @@ public LocatedFileStatus next() throws IOException { @Override public FileStatus[] listStatus(Path path) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: listStatus - " + path); checkAccess(); path = path.makeQualified(this); List result = new ArrayList(); @@ -1415,6 +1483,7 @@ private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException { @Override public FileStatus getFileStatus(Path path) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: getFileStatus - " + path); checkAccess(); path = path.makeQualified(this); String pathnameAsDir = path.toString() + "/"; @@ -1444,6 +1513,7 @@ public FileStatus getFileStatus(Path path) throws IOException { final boolean updateStats) throws IOException { if (updateStats) { statistics.incrementReadOps(1); + System.out.println("STATS: getFileBlockLocationsImpl - " + stat.getPath()); } checkAccess(); List result = new ArrayList(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 3acc085..e478371 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -63,7 +63,7 @@ private FileSystem fs; private Path root; - static class DummyRow { + public static class DummyRow { LongWritable field; RecordIdentifier ROW__ID;