diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 183515a..e7288f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -313,6 +313,21 @@ public static long parseBase(Path path) { } /** + * Get the bucket id from the file path + * @param bucketFile - bucket file path + * @return - bucket id + */ + public static int parseBucketId(Path bucketFile) { + String filename = bucketFile.getName(); + if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { + return Integer.parseInt(filename.substring(0, filename.indexOf('_'))); + } else if (filename.startsWith(BUCKET_PREFIX)) { + return Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); + } + return -1; + } + + /** * Parse a bucket filename back into the options that would have created * the file. * @param bucketFile the path to a bucket file @@ -324,9 +339,8 @@ public static long parseBase(Path path) { Configuration conf) throws IOException { AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf); String filename = bucketFile.getName(); + int bucket = parseBucketId(bucketFile); if (ORIGINAL_PATTERN.matcher(filename).matches()) { - int bucket = - Integer.parseInt(filename.substring(0, filename.indexOf('_'))); result .setOldStyle(true) .minimumWriteId(0) @@ -336,8 +350,6 @@ public static long parseBase(Path path) { } else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { //todo: define groups in regex and use parseInt(Matcher.group(2)).... - int bucket = - Integer.parseInt(filename.substring(0, filename.indexOf('_'))); int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); result .setOldStyle(true) @@ -348,8 +360,6 @@ else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) { .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX)); } else if (filename.startsWith(BUCKET_PREFIX)) { - int bucket = - Integer.parseInt(filename.substring(filename.indexOf('_') + 1)); if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { result .setOldStyle(false) @@ -375,7 +385,7 @@ else if (filename.startsWith(BUCKET_PREFIX)) { .bucket(bucket); } } else { - result.setOldStyle(true).bucket(-1).minimumWriteId(0) + result.setOldStyle(true).bucket(bucket).minimumWriteId(0) .maximumWriteId(0); } return result; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 2337a35..51d72fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2290,8 +2290,7 @@ private static boolean isStripeSatisfyPredicate( boolean isOriginal, List parsedDeltas, List readerTypes, - UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) - throws IOException { + UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) { List deltas = AcidUtils.serializeDeltas(parsedDeltas); boolean[] covered = new boolean[context.numBuckets]; @@ -2300,9 +2299,7 @@ private static boolean isStripeSatisfyPredicate( long totalFileSize = 0; for (HdfsFileStatusWithId child : baseFiles) { totalFileSize += child.getFileStatus().getLen(); - AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename - (child.getFileStatus().getPath(), context.conf); - int b = opts.getBucketId(); + int b = AcidUtils.parseBucketId(child.getFileStatus().getPath()); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. if (b >= 0 && b < covered.length) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 5655ee9..8c7c72e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -459,9 +459,8 @@ static int encodeBucketId(Configuration conf, int bucketId, int statementId) { AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { - AcidOutputFormat.Options bucketOptions = - AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() != bucketId) { + int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); + if (bucketIdFromPath != bucketId) { continue;//todo: HIVE-16952 } if (haveSeenCurrentFile) { @@ -632,9 +631,8 @@ public void next(OrcStruct next) throws IOException { */ private Reader advanceToNextFile() throws IOException { while(nextFileIndex < originalFiles.size()) { - AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename( - originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() == bucketId) { + int bucketIdFromPath = AcidUtils.parseBucketId(originalFiles.get(nextFileIndex).getFileStatus().getPath()); + if (bucketIdFromPath == bucketId) { break; } //the the bucket we care about here diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 8caa265..9fbc8bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -268,16 +268,15 @@ private OffsetAndBucketProperty computeOffsetAndBucket( long rowIdOffset = 0; OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo = OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(split.getPath(), split.getRootDir(), conf); - int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); + int bucketId = AcidUtils.parseBucketId(split.getPath()); int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf) //statementId is from directory name (or 0 if there is none) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { - AcidOutputFormat.Options bucketOptions = - AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); - if (bucketOptions.getBucketId() != bucketId) { + int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); + if (bucketIdFromPath != bucketId) { continue;//HIVE-16952 } if (f.getFileStatus().getPath().equals(split.getPath())) { @@ -653,7 +652,7 @@ DeleteEventRegistry getDeleteEventRegistry() { throws IOException { final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit); if (deleteDeltas.length > 0) { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); + int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); @@ -1001,7 +1000,7 @@ public int compareTo(CompressedOwid other) { ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException { - int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId(); + int bucket = AcidUtils.parseBucketId(orcSplit.getPath()); String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b28c126..fb2335a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -115,6 +115,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.orc.FileFormatException; import org.apache.orc.OrcConf; import org.apache.orc.OrcProto; import org.apache.orc.StripeInformation; @@ -737,6 +738,70 @@ public void testACIDSplitStrategyForSplitUpdate() throws Exception { } @Test + public void testFSCallsVectorizedOrcAcidRowBatchReader() throws IOException { + try { + MockFileSystem fs = new MockFileSystem(conf); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1"))); + MockFileSystem.addGlobalFile( + new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1"))); + + conf.set("bucket_count", "4"); + //set up props for read + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); + AcidUtils.setAcidOperationalProperties(conf, true, null); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, + TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, + TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnTypesProperty()); + conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true); + MockPath mockPath = new MockPath(fs, "mock:/a"); + conf.set("mapred.input.dir", mockPath.toString()); + conf.set("fs.defaultFS", "mock:///"); + conf.set("fs.mock.impl", MockFileSystem.class.getName()); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf); + + OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), + false, null); + List> splitStrategies = createSplitStrategies(context, gen); + assertEquals(1, splitStrategies.size()); + assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); + List splits = ((OrcInputFormat.ACIDSplitStrategy) splitStrategies.get(0)).getSplits(); + // marker comment to look at stats read ops in target/surefire-reports/*-output.txt + System.out.println("STATS TRACE START - " + testCaseName.getMethodName()); + // when creating the reader below there are 2 read ops per bucket file (listStatus and open). + // HIVE-19588 removes listStatus from the code path so there should only be one read ops (open) after HIVE-19588 + int readsBefore = fs.statistics.getReadOps(); + for (OrcSplit split : splits) { + try { + new VectorizedOrcAcidRowBatchReader(split, conf, Reporter.NULL, new VectorizedRowBatchCtx()); + } catch (FileFormatException e) { + // this is expected as these mock files are not valid orc file + } + } + int readsAfter = fs.statistics.getReadOps(); + System.out.println("STATS TRACE END - " + testCaseName.getMethodName()); + int delta = readsAfter - readsBefore; + // 16 without HIVE-19588, 8 with HIVE-19588 + assertEquals(8, delta); + } finally { + MockFileSystem.clearGlobalFiles(); + } + } + + @Test public void testBIStrategySplitBlockBoundary() throws Exception { conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); OrcInputFormat.Context context = new OrcInputFormat.Context(conf); @@ -1182,6 +1247,7 @@ public void touch(MockFile file) { @Override public FSDataInputStream open(Path path, int i) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: open - " + path); checkAccess(); MockFile file = findFile(path); if (file != null) return new FSDataInputStream(new MockInputStream(file)); @@ -1293,6 +1359,7 @@ public LocatedFileStatus next() throws IOException { private List listLocatedFileStatuses(Path path) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: listLocatedFileStatuses - " + path); checkAccess(); path = path.makeQualified(this); List result = new ArrayList<>(); @@ -1316,6 +1383,7 @@ public LocatedFileStatus next() throws IOException { @Override public FileStatus[] listStatus(Path path) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: listStatus - " + path); checkAccess(); path = path.makeQualified(this); List result = new ArrayList(); @@ -1415,6 +1483,7 @@ private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException { @Override public FileStatus getFileStatus(Path path) throws IOException { statistics.incrementReadOps(1); + System.out.println("STATS: getFileStatus - " + path); checkAccess(); path = path.makeQualified(this); String pathnameAsDir = path.toString() + "/"; @@ -1444,6 +1513,7 @@ public FileStatus getFileStatus(Path path) throws IOException { final boolean updateStats) throws IOException { if (updateStats) { statistics.incrementReadOps(1); + System.out.println("STATS: getFileBlockLocationsImpl - " + stat.getPath()); } checkAccess(); List result = new ArrayList(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 3acc085..e478371 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -63,7 +63,7 @@ private FileSystem fs; private Path root; - static class DummyRow { + public static class DummyRow { LongWritable field; RecordIdentifier ROW__ID;