diff --git common/src/java/org/apache/hadoop/hive/common/JavaUtils.java common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index 45abd2fb30..e09dec1de5 100644 --- common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -28,8 +28,7 @@ import java.util.Arrays; import java.util.List; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +37,6 @@ * Hive. */ public final class JavaUtils { - - public static final String BASE_PREFIX = "base"; - public static final String DELTA_PREFIX = "delta"; - public static final String DELTA_DIGITS = "%07d"; - public static final int DELTA_DIGITS_LEN = 7; - public static final String STATEMENT_DIGITS = "%04d"; private static final Logger LOG = LoggerFactory.getLogger(JavaUtils.class); private static final Method SUN_MISC_UTIL_RELEASE; @@ -166,64 +159,4 @@ public static String txnIdsToString(List txnIds) { private JavaUtils() { // prevent instantiation } - - public static Long extractWriteId(Path file) { - String fileName = file.getName(); - String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 - if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) { - LOG.debug("Cannot extract write ID for a MM table: " + file - + " (" + Arrays.toString(parts) + ")"); - return null; - } - long writeId = -1; - try { - writeId = Long.parseLong(parts[1]); - } catch (NumberFormatException ex) { - LOG.debug("Cannot extract write ID for a MM table: " + file - + "; parsing " + parts[1] + " got " + ex.getMessage()); - return null; - } - return writeId; - } - - public static class IdPathFilter implements PathFilter { - private String baseDirName, deltaDirName; - private final boolean isDeltaPrefix; - - public IdPathFilter(long writeId, int stmtId) { - String deltaDirName = null; - deltaDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + - String.format(DELTA_DIGITS, writeId); - isDeltaPrefix = (stmtId < 0); - if (!isDeltaPrefix) { - deltaDirName += "_" + String.format(STATEMENT_DIGITS, stmtId); - } - - this.baseDirName = BASE_PREFIX + "_" + String.format(DELTA_DIGITS, writeId); - this.deltaDirName = deltaDirName; - } - - @Override - public boolean accept(Path path) { - String name = path.getName(); - return name.equals(baseDirName) || (isDeltaPrefix && name.startsWith(deltaDirName)) - || (!isDeltaPrefix && name.equals(deltaDirName)); - } - } - - public static class AnyIdDirFilter implements PathFilter { - @Override - public boolean accept(Path path) { - String name = path.getName(); - //todo: what if this is a base? - if (!name.startsWith(DELTA_PREFIX + "_")) return false; - String idStr = name.substring(DELTA_PREFIX.length() + 1, DELTA_PREFIX.length() + 1 + DELTA_DIGITS_LEN); - try { - Long.parseLong(idStr);//what for? sanity check? - } catch (NumberFormatException ex) { - return false; - } - return true; - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index ce683c8a8d..b0ec5abcce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -112,7 +113,7 @@ protected int copyOnePath(Path fromPath, Path toPath) { if (!fs.exists(path)) return null; if (!isSourceMm) return matchFilesOneDir(fs, path, null); // Note: this doesn't handle list bucketing properly; neither does the original code. - FileStatus[] mmDirs = fs.listStatus(path, new JavaUtils.AnyIdDirFilter()); + FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter()); if (mmDirs == null || mmDirs.length == 0) return null; List allFiles = new ArrayList(); for (FileStatus mmDir : mmDirs) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 4e10649136..b8d40b3ae5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4384,47 +4384,6 @@ private void checkMmLb(Partition part) throws HiveException { + " to MM is not supported. Please re-create a table in the desired format."); } - private void handleRemoveMm( - Path path, ValidWriteIdList validWriteIdList, List result) throws HiveException { - // Note: doesn't take LB into account; that is not presently supported here (throws above). - try { - FileSystem fs = path.getFileSystem(conf); - for (FileStatus file : fs.listStatus(path)) { - Path childPath = file.getPath(); - if (!file.isDirectory()) { - ensureDelete(fs, childPath, "a non-directory file"); - continue; - } - Long writeId = JavaUtils.extractWriteId(childPath); - if (writeId == null) { - ensureDelete(fs, childPath, "an unknown directory"); - } else if (!validWriteIdList.isWriteIdValid(writeId)) { - // Assume no concurrent active writes - we rely on locks here. We could check and fail. - ensureDelete(fs, childPath, "an uncommitted directory"); - } else { - result.add(childPath); - } - } - } catch (IOException ex) { - throw new HiveException(ex); - } - } - - private static void ensureDelete(FileSystem fs, Path path, String what) throws IOException { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Deleting " + what + " " + path); - } - try { - if (!fs.delete(path, true)) { - throw new IOException("delete returned false"); - } - } catch (Exception ex) { - String error = "Couldn't delete " + path + "; cannot remove MM setting from the table"; - LOG.error(error, ex); - throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); - } - } - private List> generateAddMmTasks(Table tbl, Long writeId) throws HiveException { // We will move all the files in the table/partition directories into the first MM // directory, then commit the first write ID. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6395c31ec7..25035433c7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4076,7 +4076,7 @@ private static void tryDelete(FileSystem fs, Path path) { Boolean isBaseDir) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { - filter = new JavaUtils.IdPathFilter(writeId, stmtId); + filter = new AcidUtils.IdPathFilter(writeId, stmtId); } if (skipLevels == 0) { return statusToPath(fs.listStatus(path, filter)); @@ -4084,7 +4084,7 @@ private static void tryDelete(FileSystem fs, Path path) { // TODO: for some reason, globStatus doesn't work for masks like "...blah/*/delta_0000007_0000007*" // the last star throws it off. So, for now, if stmtId is missing use recursion. // For the same reason, we cannot use it if we don't know isBaseDir. Currently, we don't - // /want/ to know isBaseDir because that is error prone; so, it ends up never being used. + // /want/ to know isBaseDir because that is error prone; so, it ends up never being used. if (stmtId < 0 || isBaseDir == null || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); @@ -4183,7 +4183,7 @@ private static boolean isS3(FileSystem fs) { } private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, - int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter, long writeId, int stmtId, + int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId, Configuration conf) throws IOException { Path[] files = getMmDirectoryCandidates( fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, conf, null); @@ -4250,7 +4250,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con FileSystem fs = specPath.getFileSystem(hconf); Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(writeId, stmtId); + AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, filter, writeId, stmtId, hconf); return; @@ -4275,7 +4275,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath); - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(writeId, stmtId); + AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); if (isMmCtas && !fs.exists(specPath)) { Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); @@ -4405,7 +4405,7 @@ private static void deleteUncommitedFile(Path childPath, FileSystem fs) for (int i = 0; i < children.length; ++i) { FileStatus file = children[i]; Path childPath = file.getPath(); - Long writeId = JavaUtils.extractWriteId(childPath); + Long writeId = AcidUtils.extractWriteId(childPath); if (!file.isDirectory() || writeId == null || !validWriteIdList.isWriteIdValid(writeId)) { Utilities.FILE_OP_LOGGER.debug("Skipping path {}", childPath); if (result == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 212e0a60a8..72d76dcce0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,7 +18,21 @@ package org.apache.hadoop.hive.ql.io; -import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -27,8 +41,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -40,18 +52,14 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.Writer; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.mapred.JobConf; import org.apache.hive.common.util.Ref; import org.apache.orc.FileFormatException; import org.apache.orc.impl.OrcAcidUtils; @@ -59,19 +67,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Pattern; - -import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; +import com.google.common.annotations.VisibleForTesting; /** * Utilities that are shared by all of the ACID input and output formats. They @@ -1821,4 +1817,60 @@ public static boolean isAcidEnabled(HiveConf hiveConf) { } return false; } + + public static class AnyIdDirFilter implements PathFilter { + @Override + public boolean accept(Path path) { + return extractWriteId(path) != null; + } + } + + public static class IdPathFilter implements PathFilter { + private String baseDirName, deltaDirName; + private final boolean isDeltaPrefix; + + public IdPathFilter(long writeId, int stmtId) { + String deltaDirName = null; + deltaDirName = DELTA_PREFIX + String.format(DELTA_DIGITS, writeId) + "_" + + String.format(DELTA_DIGITS, writeId); + isDeltaPrefix = (stmtId < 0); + if (!isDeltaPrefix) { + deltaDirName += "_" + String.format(STATEMENT_DIGITS, stmtId); + } + + this.baseDirName = BASE_PREFIX + String.format(DELTA_DIGITS, writeId); + this.deltaDirName = deltaDirName; + } + + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.equals(baseDirName) || (isDeltaPrefix && name.startsWith(deltaDirName)) + || (!isDeltaPrefix && name.equals(deltaDirName)); + } + } + + + public static Long extractWriteId(Path file) { + String fileName = file.getName(); + if (!fileName.startsWith(DELTA_PREFIX) && !fileName.startsWith(BASE_PREFIX)) { + LOG.trace("Cannot extract write ID for a MM table: {}", file); + return null; + } + String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 + if (parts.length < 2) { + LOG.debug("Cannot extract write ID for a MM table: " + file + + " (" + Arrays.toString(parts) + ")"); + return null; + } + long writeId = -1; + try { + writeId = Long.parseLong(parts[1]); + } catch (NumberFormatException ex) { + LOG.debug("Cannot extract write ID for a MM table: " + file + + "; parsing " + parts[1] + " got " + ex.getMessage()); + return null; + } + return writeId; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 7987c4e70e..611a4c346b 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -588,7 +588,7 @@ private static void processForWriteIds(Path dir, JobConf conf, } if (!file.isDirectory()) { Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " + path); - } else if (JavaUtils.extractWriteId(path) == null) { + } else if (AcidUtils.extractWriteId(path) == null) { subdirs.add(path); } else if (!hadAcidState) { AcidUtils.Directory dirInfo