diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 270c5909fce..bf332bc0b84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -2610,12 +2610,15 @@ public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOExc * All data files produced by Acid write should have this (starting with Hive 3.0), including * those written by compactor. This is more for sanity checking in case someone moved the files * around or something like that. + * + * Methods for getting/reading the version from files were moved to test class TestTxnCommands + * which is the only place they are used, in order to keep devs out of temptation, since they + * access the FileSystem which is expensive. */ public static final class OrcAcidVersion { - private static final String ACID_VERSION_KEY = "hive.acid.version"; + public static final String ACID_VERSION_KEY = "hive.acid.version"; public static final String ACID_FORMAT = "_orc_acid_version"; private static final Charset UTF8 = Charset.forName("UTF-8"); - public static final int ORC_ACID_VERSION_DEFAULT = 0; /** * 2 is the version of Acid released in Hive 3.0. */ @@ -2628,28 +2631,7 @@ public static void setAcidVersionInDataFile(Writer writer) { //so that we know which version wrote the file writer.addUserMetadata(ACID_VERSION_KEY, UTF8.encode(String.valueOf(ORC_ACID_VERSION))); } - /** - * This is smart enough to handle streaming ingest where there could be a - * {@link OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX} side file. - * @param dataFile - ORC acid data file - * @return version property from file if there, - * {@link #ORC_ACID_VERSION_DEFAULT} otherwise - */ - @VisibleForTesting - public static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException { - FileStatus fileStatus = fs.getFileStatus(dataFile); - Reader orcReader = OrcFile.createReader(dataFile, - OrcFile.readerOptions(fs.getConf()) - .filesystem(fs) - //make sure to check for side file in case streaming ingest died - .maxLength(getLogicalLength(fs, fileStatus))); - if (orcReader.hasMetadataValue(ACID_VERSION_KEY)) { - char[] versionChar = UTF8.decode(orcReader.getMetadataValue(ACID_VERSION_KEY)).array(); - String version = new String(versionChar); - return Integer.valueOf(version); - } - return ORC_ACID_VERSION_DEFAULT; - } + /** * This creates a version file in {@code deltaOrBaseDir} * @param deltaOrBaseDir - where to create the version file @@ -2668,28 +2650,6 @@ public static void writeVersionFile(Path deltaOrBaseDir, FileSystem fs) throws public static Path getVersionFilePath(Path deltaOrBase) { return new Path(deltaOrBase, ACID_FORMAT); } - @VisibleForTesting - public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) - throws IOException { - Path formatFile = getVersionFilePath(deltaOrBaseDir); - if(!fs.exists(formatFile)) { - LOG.debug(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT); - return ORC_ACID_VERSION_DEFAULT; - } - try (FSDataInputStream inputStream = fs.open(formatFile)) { - byte[] bytes = new byte[1]; - int read = inputStream.read(bytes); - if (read != -1) { - String version = new String(bytes, UTF8); - return Integer.valueOf(version); - } - return ORC_ACID_VERSION_DEFAULT; - } - catch(IOException ex) { - LOG.error(formatFile + " is unreadable due to: " + ex.getMessage(), ex); - throw ex; - } - } } public static List getAcidFilesForStats( diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index a1f59a89277..c7b41862ec9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -34,6 +36,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -63,6 +66,8 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -1253,7 +1258,7 @@ public void testVersioning() throws Exception { FileSystem fs = FileSystem.get(hiveConf); Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.DELTA_PREFIX)); Path filePath = new Path(rs.get(0)); - int version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs); + int version = getAcidVersionFromDataFile(filePath, fs); //check it has expected version marker Assert.assertEquals("Unexpected version marker in " + filePath, AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version); @@ -1261,8 +1266,7 @@ public void testVersioning() throws Exception { //check that delta dir has a version file with expected value filePath = filePath.getParent(); Assert.assertTrue(filePath.getName().startsWith(AcidUtils.DELTA_PREFIX)); - int versionFromMetaFile = AcidUtils.OrcAcidVersion - .getAcidVersionFromMetaFile(filePath, fs); + int versionFromMetaFile = getAcidVersionFromMetaFile(filePath, fs); Assert.assertEquals("Unexpected version marker in " + filePath, AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile); @@ -1282,7 +1286,7 @@ public void testVersioning() throws Exception { Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.BASE_PREFIX)); filePath = new Path(rs.get(0)); - version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs); + version = getAcidVersionFromDataFile(filePath, fs); //check that files produced by compaction still have the version marker Assert.assertEquals("Unexpected version marker in " + filePath, AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version); @@ -1290,9 +1294,53 @@ public void testVersioning() throws Exception { //check that compacted base dir has a version file with expected value filePath = filePath.getParent(); Assert.assertTrue(filePath.getName().startsWith(AcidUtils.BASE_PREFIX)); - versionFromMetaFile = AcidUtils.OrcAcidVersion.getAcidVersionFromMetaFile( - filePath, fs); + versionFromMetaFile = getAcidVersionFromMetaFile(filePath, fs); Assert.assertEquals("Unexpected version marker in " + filePath, AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile); } + + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final int ORC_ACID_VERSION_DEFAULT = 0; + /** + * This is smart enough to handle streaming ingest where there could be a + * {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX} side file. + * @param dataFile - ORC acid data file + * @return version property from file if there, + * {@link #ORC_ACID_VERSION_DEFAULT} otherwise + */ + private static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException { + FileStatus fileStatus = fs.getFileStatus(dataFile); + Reader orcReader = OrcFile.createReader(dataFile, + OrcFile.readerOptions(fs.getConf()) + .filesystem(fs) + //make sure to check for side file in case streaming ingest died + .maxLength(AcidUtils.getLogicalLength(fs, fileStatus))); + if (orcReader.hasMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)) { + char[] versionChar = + UTF8.decode(orcReader.getMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)).array(); + String version = new String(versionChar); + return Integer.valueOf(version); + } + return ORC_ACID_VERSION_DEFAULT; + } + + private static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) + throws IOException { + Path formatFile = AcidUtils.OrcAcidVersion.getVersionFilePath(deltaOrBaseDir); + try (FSDataInputStream inputStream = fs.open(formatFile)) { + byte[] bytes = new byte[1]; + int read = inputStream.read(bytes); + if (read != -1) { + String version = new String(bytes, UTF8); + return Integer.valueOf(version); + } + return ORC_ACID_VERSION_DEFAULT; + } catch (FileNotFoundException fnf) { + LOG.debug(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT); + return ORC_ACID_VERSION_DEFAULT; + } catch(IOException ex) { + LOG.error(formatFile + " is unreadable due to: " + ex.getMessage(), ex); + throw ex; + } + } }