diff --git common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java index df77a4a2f2..09343e5616 100644 --- common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java +++ common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java @@ -54,17 +54,17 @@ * @return array of FileStatus * @throws IOException */ - public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) + public static List getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException { return getFileStatusRecurse(path, level, fs, FileUtils.HIDDEN_FILES_PATH_FILTER, false); } - public static FileStatus[] getFileStatusRecurse( + public static List getFileStatusRecurse( Path path, int level, FileSystem fs, PathFilter filter) throws IOException { return getFileStatusRecurse(path, level, fs, filter, false); } - public static FileStatus[] getFileStatusRecurse( + public static List getFileStatusRecurse( Path path, int level, FileSystem fs, PathFilter filter, boolean allLevelsBelow) throws IOException { @@ -79,9 +79,9 @@ // does not exist. But getFileStatus() throw IOException. To mimic the // similar behavior we will return empty array on exception. For external // tables, the path of the table will not exists during table creation - return new FileStatus[0]; + return new ArrayList<>(0); } - return result.toArray(new FileStatus[result.size()]); + return result; } // construct a path pattern (e.g., /*/*) to find all dynamically generated paths @@ -91,7 +91,7 @@ } Path pathPattern = new Path(path, sb.toString()); if (!allLevelsBelow) { - return fs.globStatus(pathPattern, filter); + return Lists.newArrayList(fs.globStatus(pathPattern, filter)); } LinkedList queue = new LinkedList<>(); List results = new ArrayList(); @@ -114,7 +114,7 @@ } } } - return results.toArray(new FileStatus[results.size()]); + return results; } public static int getNumBitVectorsForNDVEstimation(Configuration conf) throws Exception { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index b490325091..7eba5e88d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -687,7 +687,7 @@ private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table) * has done it's job before the query ran. */ WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation operation) { - if (tbd.getLoadFileType() == LoadFileType.REPLACE_ALL) { + if (tbd.getLoadFileType() == LoadFileType.REPLACE_ALL || tbd.isInsertOverwrite()) { return WriteEntity.WriteType.INSERT_OVERWRITE; } switch (operation) { @@ -730,13 +730,13 @@ private void updatePartitionBucketSortColumns(Hive db, Table table, Partition pa // have the correct buckets. The existing code discards the inferred data when the // reducers don't produce enough files; we'll do the same for MM tables for now. FileSystem fileSys = partn.getDataLocation().getFileSystem(conf); - FileStatus[] fileStatus = HiveStatsUtils.getFileStatusRecurse( + List fileStatus = HiveStatsUtils.getFileStatusRecurse( partn.getDataLocation(), 1, fileSys); // Verify the number of buckets equals the number of files // This will not hold for dynamic partitions where not every reducer produced a file for // those partitions. In this case the table is not bucketed as Hive requires a files for // each bucket. - if (fileStatus.length == numBuckets) { + if (fileStatus.size() == numBuckets) { List newBucketCols = new ArrayList(); updateBucketCols = true; for (BucketCol bucketCol : bucketCols) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index fd8423129f..eca3606b22 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1493,8 +1493,9 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, } // Remove duplicates from tmpPath - FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse( + List statusList = HiveStatsUtils.getFileStatusRecurse( tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); + FileStatus[] statuses = statusList.toArray(new FileStatus[statusList.size()]); if(statuses != null && statuses.length > 0) { PerfLogger perfLogger = SessionState.getPerfLogger(); Set filesKept = new HashSet(); @@ -1601,8 +1602,9 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean if (path == null) { return null; } - FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path, + List statusList = HiveStatsUtils.getFileStatusRecurse(path, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); + FileStatus[] stats = statusList.toArray(new FileStatus[statusList.size()]); return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf, isBaseDir); } @@ -2675,9 +2677,9 @@ public boolean skipProcessing(Task task) { Path loadPath = dpCtx.getRootPath(); FileSystem fs = loadPath.getFileSystem(conf); int numDPCols = dpCtx.getNumDPCols(); - FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(loadPath, numDPCols, fs); + List status = HiveStatsUtils.getFileStatusRecurse(loadPath, numDPCols, fs); - if (status.length == 0) { + if (status.isEmpty()) { LOG.warn("No partition is generated by dynamic partitioning"); return null; } @@ -2690,9 +2692,9 @@ public boolean skipProcessing(Task task) { // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that - for (int i = 0; i < status.length; ++i) { + for (int i = 0; i < status.size(); ++i) { // get the dynamically created directory - Path partPath = status[i].getPath(); + Path partPath = status.get(i).getPath(); assert fs.getFileStatus(partPath).isDir() : "partitions " + partPath + " is not a directory !"; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 0a82225d4a..eb9b1e5ff6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -240,6 +240,8 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti */ private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) { + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load? + // See setLoadFileType and setIsAcidIow calls elsewhere for an example. LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 70fcd2c142..e1f22aa028 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -54,6 +56,7 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.JobConf; import org.apache.hive.common.util.Ref; import org.apache.orc.FileFormatException; import org.apache.orc.impl.OrcAcidUtils; @@ -1097,7 +1100,13 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi FileSystem fs) throws IOException { Path p = child.getPath(); String fn = p.getName(); - if (fn.startsWith(BASE_PREFIX) && child.isDir()) { + if (!child.isDirectory()) { + if (!ignoreEmptyFiles || child.getLen() != 0) { + original.add(createOriginalObj(childWithId, child)); + } + return; + } + if (fn.startsWith(BASE_PREFIX)) { long writeId = parseBase(p); if(bestBase.oldestBaseWriteId > writeId) { //keep track for error reporting @@ -1118,28 +1127,25 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi } else { obsolete.add(child); } - } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) - && child.isDir()) { - String deltaPrefix = - (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; + } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) { + String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); + // TODO: what does this mean? why is this specific for MM tables? if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) && - ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { + ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted( + delta.minWriteId, delta.maxWriteId)) { aborted.add(child); } - if (writeIdList.isWriteIdRangeValid(delta.minWriteId, - delta.maxWriteId) != - ValidWriteIdList.RangeResponse.NONE) { + if (writeIdList.isWriteIdRangeValid( + delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) { working.add(delta); } - } else if (child.isDir()) { + } else { // This is just the directory. We need to recurse and find the actual files. But don't // do this until we have determined there is no base. This saves time. Plus, // it is possible that the cleaner is running and removing these original files, // in which case recursing through them could cause us to get an error. originalDirectories.add(child); - } else if (!ignoreEmptyFiles || child.getLen() != 0){ - original.add(createOriginalObj(childWithId, child)); } } @@ -1659,4 +1665,42 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE } } } + + // TODO: when called from an insert or create, this should also account for the + // current transaction, treating it as if it is committed. + public static List getAcidFilesForStats( + Table table, Path dir, Configuration jc, FileSystem fs) throws IOException { + List fileList = new ArrayList<>(); + ValidWriteIdList idList = AcidUtils.getTableValidWriteIdList(jc, + AcidUtils.getFullTableName(table.getDbName(), table.getTableName())); + if (idList == null) { + // TODO: ACID does not actually include write ID for insert. Wtf? + LOG.warn("Cannot get ACID state for " + table.getDbName() + "." + table.getTableName()); + return null; + } + Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList); + // Assume that for an MM table, or if there's only the base directory, we are good. + if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) { + Utilities.FILE_OP_LOGGER.warn( + "Computing stats for an ACID table; stats may be inaccurate"); + } + if (fs == null) { + fs = dir.getFileSystem(jc); + } + for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) { + fileList.add(hfs.getFileStatus()); + } + for (ParsedDelta delta : acidInfo.getCurrentDirectories()) { + for (FileStatus f : HiveStatsUtils.getFileStatusRecurse(delta.getPath(), -1, fs)) { + fileList.add(f); + } + } + if (acidInfo.getBaseDirectory() != null) { + for (FileStatus f : HiveStatsUtils.getFileStatusRecurse( + acidInfo.getBaseDirectory(), -1, fs)) { + fileList.add(f); + } + } + return fileList; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java index 1a63d3f971..07abd378c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java @@ -161,15 +161,14 @@ public void resolveConcatenateMerge(HiveConf conf) { Path dirPath = inputPaths.get(0); try { FileSystem inpFs = dirPath.getFileSystem(conf); - FileStatus[] status = - HiveStatsUtils.getFileStatusRecurse(dirPath, listBucketingCtx - .getSkewedColNames().size(), inpFs); + List status = HiveStatsUtils.getFileStatusRecurse( + dirPath, listBucketingCtx.getSkewedColNames().size(), inpFs); List newInputPath = new ArrayList(); boolean succeed = true; - for (int i = 0; i < status.length; ++i) { - if (status[i].isDir()) { + for (FileStatus s : status) { + if (s.isDir()) { // Add the lb path to the list of input paths - newInputPath.add(status[i].getPath()); + newInputPath.add(s.getPath()); } else { // find file instead of dir. dont change inputpath succeed = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8b0af3e5c8..5f7bfc34f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1561,10 +1561,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Path newPartPath = null; if (inheritTableSpecs) { - Path partPath = new Path(tbl.getDataLocation(), - Warehouse.makePartPath(partSpec)); - newPartPath = new Path(tblDataLocationPath.toUri().getScheme(), tblDataLocationPath.toUri().getAuthority(), - partPath.toUri().getPath()); + Path partPath = new Path(tbl.getDataLocation(), Warehouse.makePartPath(partSpec)); + newPartPath = new Path(tblDataLocationPath.toUri().getScheme(), + tblDataLocationPath.toUri().getAuthority(), partPath.toUri().getPath()); if(oldPart != null) { /* @@ -1583,6 +1582,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } else { newPartPath = oldPartPath; } + FileSystem newFs = newPartPath.getFileSystem(conf); List newFiles = null; PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", "FileMoves"); @@ -1592,6 +1592,10 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { newFiles = Collections.synchronizedList(new ArrayList()); } + + + // Note: the stats for ACID tables are not part of ACID, so this may be racy. + // Note: this assumes both paths are qualified; which they are, currently. if (isMmTableWrite && loadPath.equals(newPartPath)) { // MM insert query, move itself is a no-op. @@ -1612,7 +1616,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Path destPath = newPartPath; if (isMmTableWrite) { // We will load into MM directory, and delete from the parent if needed. + // TODO: this looks invalid after ACID integration. What about base dirs? destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + // TODO: loadFileType for MM table will no longer be REPLACE_ALL filter = (loadFileType == LoadFileType.REPLACE_ALL) ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : filter; } @@ -1627,6 +1633,7 @@ else if(!isAcidIUDoperation && isFullAcidTable) { //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + // TODO: this should never run for MM tables anymore. Remove the flag, and maybe the filter? replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite, !tbl.isTemporary()); } else { @@ -1675,7 +1682,21 @@ else if(!isAcidIUDoperation && isFullAcidTable) { StatsSetupConst.setStatsStateForCreateTable(newTPart.getParameters(), MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE); } - MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); + // Note: we are creating a brand new the partition, so this is going to be valid for ACID. + List filesForStats = null; + if (isFullAcidTable || isMmTableWrite) { + filesForStats = AcidUtils.getAcidFilesForStats( + newTPart.getTable(), newPartPath, conf, null); + } else { + filesForStats = HiveStatsUtils.getFileStatusRecurse( + newPartPath, -1, newPartPath.getFileSystem(conf)); + } + if (filesForStats != null) { + MetaStoreUtils.populateQuickStats(filesForStats, newTPart.getParameters()); + } else { + // The ACID state is probably absent. Warning is logged in the get method. + MetaStoreUtils.clearQuickStats(newTPart.getParameters()); + } try { LOG.debug("Adding new partition " + newTPart.getSpec()); getSynchronizedMSC().add_partition(newTPart.getTPartition()); @@ -1932,7 +1953,7 @@ private void constructOneLBLocationMap(FileStatus fSta, try { FileSystem fs = loadPath.getFileSystem(conf); if (!isMmTable) { - FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); + List leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); // Check for empty partitions for (FileStatus s : leafStatus) { if (!s.isDirectory()) { @@ -2154,9 +2175,13 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList()); } + // Note: this assumes both paths are qualified; which they are, currently. if (isMmTable && loadPath.equals(tbl.getPath())) { - Utilities.FILE_OP_LOGGER.debug("not moving " + loadPath + " to " + tbl.getPath()); + if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) { + Utilities.FILE_OP_LOGGER.debug( + "not moving " + loadPath + " to " + tbl.getPath() + " (MM)"); + } newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId); } else { // Either a non-MM query, or a load into MM table from an external source. @@ -2166,7 +2191,9 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (isMmTable) { assert !isAcidIUDoperation; // We will load into MM directory, and delete from the parent if needed. + // TODO: this looks invalid after ACID integration. What about base dirs? destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + // TODO: loadFileType for MM table will no longer be REPLACE_ALL filter = loadFileType == LoadFileType.REPLACE_ALL ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : filter; } @@ -2179,6 +2206,7 @@ else if(!isAcidIUDoperation && isFullAcidTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 //todo: should probably do the same for MM IOW boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + // TODO: this should never run for MM tables anymore. Remove the flag, and maybe the filter? replaceFiles(tblPath, loadPath, destPath, tblPath, sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable?true:false, !tbl.isTemporary()); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 67d05e65dd..1b1a687a67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -520,6 +520,8 @@ private static boolean isAcid(Long writeId) { Task addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for import? + // See setLoadFileType and setIsAcidIow calls elsewhere for an example. LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), partSpec.getPartSpec(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 7d2de75315..fb3bfdacb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -329,8 +329,9 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); } - LoadTableDesc loadTableWork; - loadTableWork = new LoadTableDesc(new Path(fromURI), + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load? + // See setLoadFileType and setIsAcidIow calls elsewhere for an example. + LoadTableDesc loadTableWork = new LoadTableDesc(new Path(fromURI), Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, writeId); loadTableWork.setStmtId(stmtId); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index cd6f1ee692..feb4bfc8b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6906,10 +6906,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, writeId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up - LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsTransactional) + boolean isInsertInto = qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName()); + LoadFileType loadType = (!isInsertInto && !destTableIsTransactional) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); + ltd.setInsertOverwrite(!isInsertInto); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -6995,10 +6997,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, writeId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up - LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsTransactional) // // Both Full-acid and MM tables are excluded. + boolean isInsertInto = qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName()); + LoadFileType loadType = (!isInsertInto && !destTableIsTransactional) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); + ltd.setInsertOverwrite(!isInsertInto); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); @@ -7008,7 +7012,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_tab.getTableName() + "@" + dest_part.getName())); } - break; + break; } case QBMetaData.DEST_LOCAL_FILE: isLocal = true; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java index a4e770ce95..55d05a1dd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java @@ -172,9 +172,11 @@ public boolean isTargetRewritten() { return true; } // INSERT OVERWRITE - if (getLoadTableDesc() != null && getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL) { + LoadTableDesc ltd = getLoadTableDesc(); + if (ltd != null && (ltd.getLoadFileType() == LoadFileType.REPLACE_ALL || ltd.isInsertOverwrite())) { return true; } + // CREATE TABLE ... AS if (getLoadFileDesc() != null && getLoadFileDesc().getCtasCreateTableDesc() != null) { return true; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 8ce0cb05b6..80f77b9f0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -231,7 +231,8 @@ private void generateActualTasks(HiveConf conf, List statusList = HiveStatsUtils.getFileStatusRecurse(dirPath, dpLbLevel, inpFs); + FileStatus[] status = statusList.toArray(new FileStatus[statusList.size()]); // cleanup pathToPartitionInfo Map ptpi = work.getPathToPartitionInfo(); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java index 946c300750..d4d46a3671 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -161,11 +163,18 @@ public void run() { long rawDataSize = 0; long fileSize = 0; long numFiles = 0; - LOG.debug("Aggregating stats for {}", dir); - FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs); + // Note: this code would be invalid for transactional tables of any kind. + Utilities.FILE_OP_LOGGER.debug("Aggregating stats for {}", dir); + List fileList = null; + if (partish.getTable() != null + && AcidUtils.isTransactionalTable(partish.getTable())) { + fileList = AcidUtils.getAcidFilesForStats(partish.getTable(), dir, jc, fs); + } else { + fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs); + } for (FileStatus file : fileList) { - LOG.debug("Computing stats for {}", file); + Utilities.FILE_OP_LOGGER.debug("Computing stats for {}", file); if (!file.isDirectory()) { InputFormat inputFormat = ReflectionUtil.newInstance(partish.getInputFormatClass(), jc); InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { partish.getLocation() }); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java index 1d7660e8b2..635bc41f88 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.stats; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -111,7 +113,8 @@ public String getName() { private static class BasicStatsProcessor { private Partish partish; - private FileStatus[] partfileStatus; + private List partfileStatus; + private boolean isMissingAcidState = false; private BasicStatsWork work; private boolean followedColStats1; @@ -124,11 +127,10 @@ public BasicStatsProcessor(Partish partish, BasicStatsWork work, HiveConf conf, public Object process(StatsAggregator statsAggregator) throws HiveException, MetaException { Partish p = partish; Map parameters = p.getPartParameters(); - if (p.isAcid()) { + if (p.isTransactionalTable()) { + // TODO: this should also happen on any error. Right now this task will just fail. StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); - } - - if (work.isTargetRewritten()) { + } else if (work.isTargetRewritten()) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); } @@ -140,8 +142,15 @@ public Object process(StatsAggregator statsAggregator) throws HiveException, Met StatsSetupConst.clearColumnStatsState(parameters); } - if(partfileStatus == null){ - LOG.warn("Partition/partfiles is null for: " + partish.getPartition().getSpec()); + if (partfileStatus == null) { + // This may happen if ACID state is absent from config. + String spec = partish.getPartition() == null ? partish.getTable().getTableName() + : partish.getPartition().getSpec().toString(); + LOG.warn("Partition/partfiles is null for: " + spec); + if (isMissingAcidState) { + MetaStoreUtils.clearQuickStats(parameters); + return p.getOutput(); + } return null; } @@ -153,7 +162,8 @@ public Object process(StatsAggregator statsAggregator) throws HiveException, Met StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); } - updateQuickStats(parameters, partfileStatus); + MetaStoreUtils.populateQuickStats(partfileStatus, parameters); + if (StatsSetupConst.areBasicStatsUptoDate(parameters)) { if (statsAggregator != null) { String prefix = getAggregationPrefix(p.getTable(), p.getPartition()); @@ -164,12 +174,14 @@ public Object process(StatsAggregator statsAggregator) throws HiveException, Met return p.getOutput(); } - public void collectFileStatus(Warehouse wh) throws MetaException { - partfileStatus = wh.getFileStatusesForSD(partish.getPartSd()); - } - - private void updateQuickStats(Map parameters, FileStatus[] partfileStatus) throws MetaException { - MetaStoreUtils.populateQuickStats(partfileStatus, parameters); + public void collectFileStatus(Warehouse wh, HiveConf conf) throws MetaException, IOException { + if (!partish.isTransactionalTable()) { + partfileStatus = wh.getFileStatusesForSD(partish.getPartSd()); + } else { + Path path = new Path(partish.getPartSd().getLocation()); + partfileStatus = AcidUtils.getAcidFilesForStats(partish.getTable(), path, conf, null); + isMissingAcidState = true; + } } private String getAggregationPrefix(Table table, Partition partition) throws MetaException { @@ -247,7 +259,7 @@ private int aggregateStats(Hive db) { partishes.add(p = new Partish.PTable(table)); BasicStatsProcessor basicStatsProcessor = new BasicStatsProcessor(p, work, conf, followedColStats); - basicStatsProcessor.collectFileStatus(wh); + basicStatsProcessor.collectFileStatus(wh, conf); Table res = (Table) basicStatsProcessor.process(statsAggregator); if (res == null) { return 0; @@ -280,7 +292,7 @@ private int aggregateStats(Hive db) { futures.add(pool.submit(new Callable() { @Override public Void call() throws Exception { - bsp.collectFileStatus(wh); + bsp.collectFileStatus(wh, conf); return null; } })); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java index 7591c0681b..d4cfd0ad62 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java @@ -184,4 +184,4 @@ public int persistColumnStats(Hive db, Table tbl) throws HiveException, MetaExce public void setDpPartSpecs(Collection dpPartSpecs) { } -} \ No newline at end of file +} diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java index 05b0474e90..47810e2c34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java @@ -51,6 +51,10 @@ public final boolean isAcid() { return AcidUtils.isFullAcidTable(getTable()); } + public final boolean isTransactionalTable() { + return AcidUtils.isTransactionalTable(getTable()); + } + public abstract Table getTable(); public abstract Map getPartParameters(); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index d84cf136d5..ed593e5e1a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -65,6 +65,7 @@ public boolean accept(Path file) { } }); for (FileStatus file : status) { + Utilities.FILE_OP_LOGGER.trace("About to read stats file {} ", file.getPath()); Input in = new Input(fs.open(file.getPath())); Kryo kryo = SerializationUtilities.borrowKryo(); try { @@ -72,6 +73,7 @@ public boolean accept(Path file) { } finally { SerializationUtilities.releaseKryo(kryo); } + Utilities.FILE_OP_LOGGER.trace("Read : {}", statsMap); statsList.add(statsMap); in.close(); } @@ -86,7 +88,7 @@ public boolean accept(Path file) { @Override public String aggregateStats(String partID, String statType) { long counter = 0; - LOG.debug("Part ID: " + partID + "\t" + statType); + Utilities.FILE_OP_LOGGER.debug("Part ID: " + partID + "\t" + statType); for (Map> statsMap : statsList) { Map partStat = statsMap.get(partID); if (null == partStat) { // not all partitions are scanned in all mappers, so this could be null. @@ -98,7 +100,7 @@ public String aggregateStats(String partID, String statType) { } counter += Long.parseLong(statVal); } - LOG.info("Read stats for : " + partID + "\t" + statType + "\t" + counter); + Utilities.FILE_OP_LOGGER.info("Read stats for : " + partID + "\t" + statType + "\t" + counter); return String.valueOf(counter); } diff --git ql/src/test/results/clientpositive/autoColumnStats_4.q.out ql/src/test/results/clientpositive/autoColumnStats_4.q.out index 9c0e020351..e40994b79d 100644 --- ql/src/test/results/clientpositive/autoColumnStats_4.q.out +++ ql/src/test/results/clientpositive/autoColumnStats_4.q.out @@ -194,10 +194,10 @@ Retention: 0 #### A masked pattern was here #### Table Type: MANAGED_TABLE Table Parameters: - numFiles 2 + numFiles numRows 0 rawDataSize 0 - totalSize 1798 + totalSize transactional true transactional_properties default #### A masked pattern was here #### diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java index 59190893e6..78ea01d968 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java @@ -270,6 +270,7 @@ public static void clearColumnStatsState(Map params) { if (params == null) { return; } + ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); stats.columnStats.clear(); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 89354a2d34..0dd3eb1017 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -297,7 +297,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, !isPartitionedTable) { Database db = msdb.getDatabase(newDbName); // Update table stats. For partitioned table, we update stats in alterPartition() - MetaStoreUtils.updateTableStatsFast(db, newt, wh, false, true, environmentContext); + MetaStoreUtils.updateTableStatsFast(db, newt, wh, false, true, environmentContext, false); } if (isPartitionedTable) { @@ -436,23 +436,25 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String .currentTimeMillis() / 1000)); } - Table tbl = msdb.getTable(dbname, name); - if (tbl == null) { - throw new InvalidObjectException( - "Unable to alter partition because table or database does not exist."); - } //alter partition if (part_vals == null || part_vals.size() == 0) { try { msdb.openTransaction(); + + Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partition because table or database does not exist."); + } oldPart = msdb.getPartition(dbname, name, new_part.getValues()); if (MetaStoreUtils.requireCalStats(oldPart, new_part, tbl, environmentContext)) { // if stats are same, no need to update if (MetaStoreUtils.isFastStatsSame(oldPart, new_part)) { MetaStoreUtils.updateBasicState(environmentContext, new_part.getParameters()); } else { - MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); + MetaStoreUtils.updatePartitionStatsFast( + new_part, tbl, wh, false, true, environmentContext, false); } } @@ -494,6 +496,11 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String boolean dataWasMoved = false; try { msdb.openTransaction(); + Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partition because table or database does not exist."); + } try { oldPart = msdb.getPartition(dbname, name, part_vals); } catch (NoSuchObjectException e) { @@ -581,7 +588,8 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } if (MetaStoreUtils.requireCalStats(oldPart, new_part, tbl, environmentContext)) { - MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); + MetaStoreUtils.updatePartitionStatsFast( + new_part, tbl, wh, false, true, environmentContext, false); } String newPartName = Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()); @@ -650,15 +658,16 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String transactionalListeners = handler.getTransactionalListeners(); } - Table tbl = msdb.getTable(dbname, name); - if (tbl == null) { - throw new InvalidObjectException( - "Unable to alter partitions because table or database does not exist."); - } boolean success = false; try { msdb.openTransaction(); + + Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partitions because table or database does not exist."); + } for (Partition tmpPart: new_parts) { // Set DDL time to now if not specified if (tmpPart.getParameters() == null || @@ -677,7 +686,8 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String if (MetaStoreUtils.isFastStatsSame(oldTmpPart, tmpPart)) { MetaStoreUtils.updateBasicState(environmentContext, tmpPart.getParameters()); } else { - MetaStoreUtils.updatePartitionStatsFast(tmpPart, wh, false, true, environmentContext); + MetaStoreUtils.updatePartitionStatsFast( + tmpPart, tbl, wh, false, true, environmentContext, false); } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c6e34a8a22..72aa5adc52 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -1490,7 +1490,7 @@ private void create_table_core(final RawStore ms, final Table tbl, } if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, envContext); + MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, false, envContext, true); } // set create time @@ -2633,7 +2633,7 @@ private Partition append_partition_common(RawStore ms, String dbName, String tab if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir, envContext); + MetaStoreUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, false, envContext, true); } if (ms.addPartition(part)) { @@ -3191,7 +3191,7 @@ private void initializeAddedPartition( final Table tbl, final PartitionSpecProxy.PartitionIterator part, boolean madeDir) throws MetaException { if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir, false, null); + MetaStoreUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, false, null, true); } // set create time diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 20c10607bb..445a7b8ad2 100755 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -543,7 +543,7 @@ public static String makePartName(List partCols, * @return array of FileStatus objects corresponding to the files * making up the passed storage description */ - public FileStatus[] getFileStatusesForSD(StorageDescriptor desc) + public List getFileStatusesForSD(StorageDescriptor desc) throws MetaException { return getFileStatusesForLocation(desc.getLocation()); } @@ -553,7 +553,7 @@ public static String makePartName(List partCols, * @return array of FileStatus objects corresponding to the files * making up the passed storage description */ - public FileStatus[] getFileStatusesForLocation(String location) + public List getFileStatusesForLocation(String location) throws MetaException { try { Path path = new Path(location); @@ -571,7 +571,7 @@ public static String makePartName(List partCols, * @return array of FileStatus objects corresponding to the files making up the passed * unpartitioned table */ - public FileStatus[] getFileStatusesForUnpartitionedTable(Database db, Table table) + public List getFileStatusesForUnpartitionedTable(Database db, Table table) throws MetaException { Path tablePath = getDnsPath(new Path(table.getSd().getLocation())); try { diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java index b44ff8ce47..4138fa5b70 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.utils; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; @@ -314,11 +315,11 @@ public static String unescapePathName(String path) { * @return array of FileStatus * @throws IOException */ - public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) + public static List getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException { // if level is <0, the return all files/directories under the specified path - if ( level < 0) { + if (level < 0) { List result = new ArrayList<>(); try { FileStatus fileStatus = fs.getFileStatus(path); @@ -328,9 +329,9 @@ public static String unescapePathName(String path) { // does not exist. But getFileStatus() throw IOException. To mimic the // similar behavior we will return empty array on exception. For external // tables, the path of the table will not exists during table creation - return new FileStatus[0]; + return new ArrayList<>(0); } - return result.toArray(new FileStatus[result.size()]); + return result; } // construct a path pattern (e.g., /*/*) to find all dynamically generated paths @@ -339,7 +340,7 @@ public static String unescapePathName(String path) { sb.append(Path.SEPARATOR).append("*"); } Path pathPattern = new Path(path, sb.toString()); - return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER); + return Lists.newArrayList(fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER)); } /** diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 50f873a013..1a23111ee0 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -604,20 +604,14 @@ public static boolean isFastStatsSame(Partition oldPart, Partition newPart) { return false; } - public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, - boolean madeDir, EnvironmentContext environmentContext) throws MetaException { - return updateTableStatsFast(db, tbl, wh, madeDir, false, environmentContext); - } - - public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, - boolean madeDir, boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException { - if (tbl.getPartitionKeysSize() == 0) { - // Update stats only when unpartitioned - FileStatus[] fileStatuses = wh.getFileStatusesForUnpartitionedTable(db, tbl); - return updateTableStatsFast(tbl, fileStatuses, madeDir, forceRecompute, environmentContext); - } else { - return false; - } + public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, boolean madeDir, + boolean forceRecompute, EnvironmentContext environmentContext, boolean isCreate) throws MetaException { + if (tbl.getPartitionKeysSize() != 0) return false; + // Update stats only when unpartitioned + // TODO: this is also invalid for ACID tables, except for the create case by coincidence + List fileStatuses = wh.getFileStatusesForUnpartitionedTable(db, tbl); + return updateTableStatsFast( + tbl, fileStatuses, madeDir, forceRecompute, environmentContext, isCreate); } /** @@ -630,8 +624,9 @@ public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, * these parameters set * @return true if the stats were updated, false otherwise */ - public static boolean updateTableStatsFast(Table tbl, FileStatus[] fileStatus, boolean newDir, - boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException { + private static boolean updateTableStatsFast(Table tbl, List fileStatus, + boolean newDir, boolean forceRecompute, EnvironmentContext environmentContext, + boolean isCreate) throws MetaException { Map params = tbl.getParameters(); @@ -644,39 +639,43 @@ public static boolean updateTableStatsFast(Table tbl, FileStatus[] fileStatus, b } } - boolean updated = false; - if (forceRecompute || - params == null || - !containsAllFastStats(params)) { - if (params == null) { - params = new HashMap<>(); - } - if (!newDir) { - // The table location already exists and may contain data. - // Let's try to populate those stats that don't require full scan. - LOG.info("Updating table stats fast for " + tbl.getTableName()); - populateQuickStats(fileStatus, params); - LOG.info("Updated size of table " + tbl.getTableName() +" to "+ params.get(StatsSetupConst.TOTAL_SIZE)); - if (environmentContext != null - && environmentContext.isSetProperties() - && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( - StatsSetupConst.STATS_GENERATED))) { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); - } else { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); - } - } + if (!forceRecompute && params != null && containsAllFastStats(params)) return false; + if (params == null) { + params = new HashMap<>(); + } + if (!isCreate && MetaStoreUtils.isTransactionalTable(tbl.getParameters())) { + // TODO: we should use AcidUtils.getAcidFilesForStats, but cannot access it from metastore. + LOG.warn("Not updating fast stats for a transactional table " + tbl.getTableName()); tbl.setParameters(params); - updated = true; + return true; } - return updated; + if (!newDir) { + // The table location already exists and may contain data. + // Let's try to populate those stats that don't require full scan. + LOG.info("Updating table stats fast for " + tbl.getTableName()); + populateQuickStats(fileStatus, params); + LOG.info("Updated size of table " + tbl.getTableName() +" to "+ params.get(StatsSetupConst.TOTAL_SIZE)); + if (environmentContext != null + && environmentContext.isSetProperties() + && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( + StatsSetupConst.STATS_GENERATED))) { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); + } else { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); + } + } + tbl.setParameters(params); + return true; } - public static void populateQuickStats(FileStatus[] fileStatus, Map params) { + /** This method is invalid for MM and ACID tables unless fileStatus comes from AcidUtils. */ + public static void populateQuickStats(List fileStatus, Map params) { + // Why is this even in metastore? + LOG.trace("Populating quick stats based on {} files", fileStatus.size()); int numFiles = 0; long tableSize = 0L; for (FileStatus status : fileStatus) { - // don't take directories into account for quick stats + // don't take directories into account for quick stats TODO: wtf? if (!status.isDir()) { tableSize += status.getLen(); numFiles += 1; @@ -685,6 +684,12 @@ public static void populateQuickStats(FileStatus[] fileStatus, Map params) { + params.put(StatsSetupConst.NUM_FILES, ""); + params.put(StatsSetupConst.TOTAL_SIZE, ""); + } + public static boolean areSameColumns(List oldCols, List newCols) { return ListUtils.isEqualList(oldCols, newCols); @@ -705,16 +710,6 @@ public static void updateBasicState(EnvironmentContext environmentContext, Map params = part.getParameters(); - boolean updated = false; - if (forceRecompute || - params == null || - !containsAllFastStats(params)) { - if (params == null) { - params = new HashMap<>(); - } - if (!madeDir) { - // The partition location already existed and may contain data. Lets try to - // populate those statistics that don't require a full scan of the data. - LOG.warn("Updating partition stats fast for: " + part.getTableName()); - FileStatus[] fileStatus = wh.getFileStatusesForLocation(part.getLocation()); - populateQuickStats(fileStatus, params); - LOG.warn("Updated size to " + params.get(StatsSetupConst.TOTAL_SIZE)); - updateBasicState(environmentContext, params); - } + if (!forceRecompute && params != null && containsAllFastStats(params)) return false; + if (params == null) { + params = new HashMap<>(); + } + if (!isCreate && MetaStoreUtils.isTransactionalTable(table.getParameters())) { + // TODO: implement? + LOG.warn("Not updating fast stats for a transactional table " + table.getTableName()); part.setParameters(params); - updated = true; + return true; } - return updated; + if (!madeDir) { + // The partition location already existed and may contain data. Lets try to + // populate those statistics that don't require a full scan of the data. + LOG.warn("Updating partition stats fast for: " + part.getTableName()); + List fileStatus = wh.getFileStatusesForLocation(part.getLocation()); + // TODO: this is invalid for ACID tables, and we cannot access AcidUtils here. + populateQuickStats(fileStatus, params); + LOG.warn("Updated size to " + params.get(StatsSetupConst.TOTAL_SIZE)); + updateBasicState(environmentContext, params); + } + part.setParameters(params); + return true; } /* @@ -790,6 +789,12 @@ public static boolean columnsIncludedByNameType(List oldCols, } /** Duplicates AcidUtils; used in a couple places in metastore. */ + public static boolean isTransactionalTable(Map params) { + String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return (transactionalProp != null && "true".equalsIgnoreCase(transactionalProp)); + } + + /** Duplicates AcidUtils; used in a couple places in metastore. */ public static boolean isInsertOnlyTableParam(Map params) { String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp)); diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 2599ab103e..7091c5b2f5 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -489,7 +489,7 @@ private static Partition makePartitionObject(String dbName, String tblName, part4.setSd(tbl.getSd().deepCopy()); part4.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo().deepCopy()); part4.getSd().setLocation(tbl.getSd().getLocation() + ptnLocationSuffix); - MetaStoreUtils.updatePartitionStatsFast(part4, warehouse, null); + MetaStoreUtils.updatePartitionStatsFast(part4, tbl, warehouse, false, false, null, true); return part4; }