diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index badcc55c91..aa7c47fdb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; +import static org.apache.hadoop.hive.ql.exec.Utilities.reduceFieldNameList; import java.io.IOException; import java.io.Serializable; @@ -774,12 +775,14 @@ public String toString() { public static final class ParsedBase { private final long writeId; private final long visibilityTxnId; - ParsedBase(long writeId) { - this(writeId, 0); + private final Path baseDirPath; + ParsedBase(long writeId, Path baseDirPath) { + this(writeId, 0, baseDirPath); } - ParsedBase(long writeId, long visibilityTxnId) { + ParsedBase(long writeId, long visibilityTxnId, Path baseDirPath) { this.writeId = writeId; this.visibilityTxnId = visibilityTxnId; + this.baseDirPath = baseDirPath; } public long getWriteId() { return writeId; @@ -787,6 +790,9 @@ public long getWriteId() { public long getVisibilityTxnId() { return visibilityTxnId; } + public Path getBaseDirPath() { + return baseDirPath; + } public static ParsedBase parseBase(Path path) { String filename = path.getName(); if(!filename.startsWith(BASE_PREFIX)) { @@ -794,10 +800,10 @@ public static ParsedBase parseBase(Path path) { } int idxOfv = filename.indexOf(VISIBILITY_PREFIX); if(idxOfv < 0) { - return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length()))); + return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())), path); } return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)), - Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length()))); + Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path); } } /** @@ -1229,7 +1235,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId } if(bestBase.oldestBase != null && bestBase.status == null && - MetaDataFile.isCompacted(bestBase.oldestBase, fs)) { + isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs)) { /** * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus @@ -1279,20 +1285,29 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId * causes anything written previously is ignored (hence the overwrite). In this case, base_x * is visible if writeid:x is committed for current reader. */ - private static boolean isValidBase(long baseWriteId, ValidWriteIdList writeIdList, Path baseDir, + private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList, FileSystem fs) throws IOException { - if(baseWriteId == Long.MIN_VALUE) { + if(parsedBase.getWriteId() == Long.MIN_VALUE) { //such base is created by 1st compaction in case of non-acid to acid table conversion //By definition there are no open txns with id < 1. return true; } - if(!MetaDataFile.isCompacted(baseDir, fs)) { - //this is the IOW case - return writeIdList.isWriteIdValid(baseWriteId); + if(isCompactedBase(parsedBase, fs)) { + return writeIdList.isValidBase(parsedBase.getWriteId()); } - return writeIdList.isValidBase(baseWriteId); + return writeIdList.isWriteIdValid(parsedBase.getWriteId()); } + /** + * Returns {@code true} if {@code parsedBase} was created by compaction. + * As of Hive 4.0 we can tell if a directory is a result of compaction based on the + * presence of {@link AcidUtils#VISIBILITY_PATTERN} suffix. Base directories written prior to + * that, have to rely on the {@link MetaDataFile} in the directory. So parse the filename first + * since that is the cheaper test.*/ + private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException { + return parsedBase.getVisibilityTxnId() > 0 || + MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs); + } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, @@ -1307,23 +1322,23 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi return; } if (fn.startsWith(BASE_PREFIX)) { - ParsedBase parsedBase = ParsedBase.parseBase(p); + ParsedBase parsedBase = ParsedBase.parseBase(p); if(!isDirUsable(child, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) { return; } - long writeId = parsedBase.getWriteId(); + final long writeId = parsedBase.getWriteId(); if(bestBase.oldestBaseWriteId > writeId) { //keep track for error reporting bestBase.oldestBase = p; bestBase.oldestBaseWriteId = writeId; } if (bestBase.status == null) { - if(isValidBase(writeId, writeIdList, p, fs)) { + if(isValidBase(parsedBase, writeIdList, fs)) { bestBase.status = child; bestBase.writeId = writeId; } } else if (bestBase.writeId < writeId) { - if(isValidBase(writeId, writeIdList, p, fs)) { + if(isValidBase(parsedBase, writeIdList, fs)) { obsolete.add(bestBase.status); bestBase.status = child; bestBase.writeId = writeId; @@ -1947,6 +1962,11 @@ public static String getFullTableName(String dbName, String tableName) { * @param baseOrDeltaDir detla or base dir, must exist */ public static void createCompactorMarker(Path baseOrDeltaDir, FileSystem fs) throws IOException { + if(true) { + //keeping the method itself here as documentation of what was written there before 4.0 + //i.e. what MetaDataFile.isCompacted() is expecting + throw new IllegalStateException("Should not be used since Hive 4.0"); + } /** * create _meta_data json file in baseOrDeltaDir * write thisFileVersion, dataFormat diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 7d5ee4a59f..6e4e1c05b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -1002,7 +1002,7 @@ public void close() throws IOException { deleteEventWriter.close(false); } } - private long getCompactorTxnId() { + private static long getCompactorTxnId(Configuration jobConf) { String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY); if(Strings.isNullOrEmpty(snapshot)) { throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to " @@ -1026,7 +1026,7 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(bucket) .statementId(-1)//setting statementId == -1 makes compacted delta files use - .visibilityTxnId(getCompactorTxnId()); + .visibilityTxnId(getCompactorTxnId(jobConf)); //delta_xxxx_yyyy format // Instantiate the underlying output format @@ -1050,7 +1050,7 @@ private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket) .statementId(-1)//setting statementId == -1 makes compacted delta files use // delta_xxxx_yyyy format - .visibilityTxnId(getCompactorTxnId()); + .visibilityTxnId(getCompactorTxnId(jobConf)); // Instantiate the underlying output format @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class @@ -1171,12 +1171,12 @@ public void commitJob(JobContext context) throws IOException { .minimumWriteId(conf.getLong(MIN_TXN, Long.MAX_VALUE)) .maximumWriteId(conf.getLong(MAX_TXN, Long.MIN_VALUE)) .bucket(0) - .statementId(-1); + .statementId(-1) + .visibilityTxnId(CompactorMap.getCompactorTxnId(conf)); Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent(); LOG.info(context.getJobID() + ": " + tmpLocation + " not found. Assuming 0 splits. Creating " + newDeltaDir); fs.mkdirs(newDeltaDir); - createCompactorMarker(conf, newDeltaDir, fs); AcidUtils.OrcAcidVersion.writeVersionFile(newDeltaDir, fs); return; } @@ -1193,16 +1193,9 @@ public void commitJob(JobContext context) throws IOException { * meta files which will create base_x/ (i.e. B)...*/ fs.rename(fileStatus.getPath(), newPath); AcidUtils.OrcAcidVersion.writeVersionFile(newPath, fs); - createCompactorMarker(conf, newPath, fs); } fs.delete(tmpLocation, true); } - private void createCompactorMarker(JobConf conf, Path finalLocation, FileSystem fs) - throws IOException { - if(conf.getBoolean(IS_MAJOR, false)) { - AcidUtils.MetaDataFile.createCompactorMarker(finalLocation, fs); - } - } @Override public void abortJob(JobContext context, int status) throws IOException { @@ -1228,12 +1221,12 @@ private void commitMmCompaction(String from, String to, Configuration conf, // Assume the high watermark can be used as maximum transaction ID. long maxTxn = actualWriteIds.getHighWatermark(); AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) - .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1); + .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1) + .visibilityTxnId(CompactorMap.getCompactorTxnId(conf)); Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent(); if (!fs.exists(fromPath)) { LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir); fs.mkdirs(newBaseDir); - AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs); return; } LOG.info("Moving contents of " + from + " to " + to); @@ -1243,7 +1236,6 @@ private void commitMmCompaction(String from, String to, Configuration conf, } FileStatus dirPath = children[0]; fs.rename(dirPath.getPath(), newBaseDir); - AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs); fs.delete(fromPath, true); } }