diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index b490325091..7a398afb33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -687,7 +687,7 @@ private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table) * has done it's job before the query ran. */ WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation operation) { - if (tbd.getLoadFileType() == LoadFileType.REPLACE_ALL) { + if (tbd.getLoadFileType() == LoadFileType.REPLACE_ALL || tbd.isInsertOverwrite()) { return WriteEntity.WriteType.INSERT_OVERWRITE; } switch (operation) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 0a82225d4a..eb9b1e5ff6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -240,6 +240,8 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti */ private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) { + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load? + // See setLoadFileType and setIsAcidIow calls elsewhere for an example. LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 70fcd2c142..a371887d98 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -54,6 +56,7 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.JobConf; import org.apache.hive.common.util.Ref; import org.apache.orc.FileFormatException; import org.apache.orc.impl.OrcAcidUtils; @@ -1659,4 +1662,35 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE } } } + + // TODO: when called from an insert or create, this should also account for the + // current transaction, treating it as if it is committed. + public static List getAcidFilesForStats( + Table table, Path dir, JobConf jc, FileSystem fs) throws IOException { + List fileList = new ArrayList<>(); + ValidWriteIdList idList = AcidUtils.getTableValidWriteIdList(jc, + AcidUtils.getFullTableName(table.getDbName(), table.getTableName())); + Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList); + // Assume that for an MM table, or if there's only the base directory, we are good. + if (!acidInfo.getCurrentDirectories().isEmpty() + && AcidUtils.isFullAcidTable(table)) { + Utilities.FILE_OP_LOGGER.warn( + "Computing stats for an ACID table; stats may be inaccurate"); + } + for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) { + fileList.add(hfs.getFileStatus()); + } + for (ParsedDelta delta : acidInfo.getCurrentDirectories()) { + for (FileStatus f : HiveStatsUtils.getFileStatusRecurse(delta.getPath(), -1, fs)) { + fileList.add(f); + } + } + if (acidInfo.getBaseDirectory() != null) { + for (FileStatus f : HiveStatsUtils.getFileStatusRecurse( + acidInfo.getBaseDirectory(), -1, fs)) { + fileList.add(f); + } + } + return fileList; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8b0af3e5c8..f87f7545e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1561,10 +1561,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Path newPartPath = null; if (inheritTableSpecs) { - Path partPath = new Path(tbl.getDataLocation(), - Warehouse.makePartPath(partSpec)); - newPartPath = new Path(tblDataLocationPath.toUri().getScheme(), tblDataLocationPath.toUri().getAuthority(), - partPath.toUri().getPath()); + Path partPath = new Path(tbl.getDataLocation(), Warehouse.makePartPath(partSpec)); + newPartPath = new Path(tblDataLocationPath.toUri().getScheme(), + tblDataLocationPath.toUri().getAuthority(), partPath.toUri().getPath()); if(oldPart != null) { /* @@ -1583,6 +1582,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } else { newPartPath = oldPartPath; } + FileSystem newFs = newPartPath.getFileSystem(conf); List newFiles = null; PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", "FileMoves"); @@ -1592,6 +1592,24 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { newFiles = Collections.synchronizedList(new ArrayList()); } + + + // Note: the stats for ACID tables are not part of ACID, so this may be racy. + Path fastStatsPath = newPartPath; + // This means we inserted into ACID table so we need ACID state to do proper file stats. + // TODO: we should actually use + boolean isMultiDirStats = false; + if (isMmTableWrite) { + // TODO: do the same for ACID? This is not pretty... + fastStatsPath = new Path(newPartPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + if (newFs.exists(fastStatsPath)) { + fastStatsPath = newPartPath; + isMultiDirStats = true; + } else { + fastStatsPath = new Path(newPartPath, AcidUtils.baseDir(writeId)); + } + } + // Note: this assumes both paths are qualified; which they are, currently. if (isMmTableWrite && loadPath.equals(newPartPath)) { // MM insert query, move itself is a no-op. @@ -1612,7 +1630,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Path destPath = newPartPath; if (isMmTableWrite) { // We will load into MM directory, and delete from the parent if needed. + // TODO: this looks invalid after ACID integration. What about base dirs? destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + // TODO: loadFileType for MM table will no longer be REPLACE_ALL filter = (loadFileType == LoadFileType.REPLACE_ALL) ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : filter; } @@ -1627,6 +1647,7 @@ else if(!isAcidIUDoperation && isFullAcidTable) { //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + // TODO: this should never run for MM tables anymore. Remove the flag, and maybe the filter? replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite, !tbl.isTemporary()); } else { @@ -1675,7 +1696,10 @@ else if(!isAcidIUDoperation && isFullAcidTable) { StatsSetupConst.setStatsStateForCreateTable(newTPart.getParameters(), MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE); } - MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); + // Note: we are creating a brand new the partition, so this is going to be valid for ACID. + FileStatus[] filesForStats = HiveStatsUtils.getFileStatusRecurse( + fastStatsPath, -1, fastStatsPath.getFileSystem(conf)); + MetaStoreUtils.populateQuickStats(filesForStats, newTPart.getParameters()); try { LOG.debug("Adding new partition " + newTPart.getSpec()); getSynchronizedMSC().add_partition(newTPart.getTPartition()); @@ -1691,6 +1715,11 @@ else if(!isAcidIUDoperation && isFullAcidTable) { // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException // In that case, we want to retry with alterPartition. LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); + if (isMultiDirStats) { + // TODO: we should use AcidUtils.getAcidFilesForStats, but it has to account for the + // current transaction, treating it as if it is committed. + MetaStoreUtils.clearQuickStats(newTPart.getParameters()); + } setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart); } catch (Exception e) { try { @@ -2154,9 +2183,13 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList()); } + // Note: this assumes both paths are qualified; which they are, currently. if (isMmTable && loadPath.equals(tbl.getPath())) { - Utilities.FILE_OP_LOGGER.debug("not moving " + loadPath + " to " + tbl.getPath()); + if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) { + Utilities.FILE_OP_LOGGER.debug( + "not moving " + loadPath + " to " + tbl.getPath() + " (MM)"); + } newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId); } else { // Either a non-MM query, or a load into MM table from an external source. @@ -2166,7 +2199,9 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (isMmTable) { assert !isAcidIUDoperation; // We will load into MM directory, and delete from the parent if needed. + // TODO: this looks invalid after ACID integration. What about base dirs? destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + // TODO: loadFileType for MM table will no longer be REPLACE_ALL filter = loadFileType == LoadFileType.REPLACE_ALL ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : filter; } @@ -2179,6 +2214,7 @@ else if(!isAcidIUDoperation && isFullAcidTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 //todo: should probably do the same for MM IOW boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + // TODO: this should never run for MM tables anymore. Remove the flag, and maybe the filter? replaceFiles(tblPath, loadPath, destPath, tblPath, sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable?true:false, !tbl.isTemporary()); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 67d05e65dd..1b1a687a67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -520,6 +520,8 @@ private static boolean isAcid(Long writeId) { Task addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for import? + // See setLoadFileType and setIsAcidIow calls elsewhere for an example. LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), partSpec.getPartSpec(), replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 7d2de75315..fb3bfdacb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -329,8 +329,9 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement(); } - LoadTableDesc loadTableWork; - loadTableWork = new LoadTableDesc(new Path(fromURI), + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load? + // See setLoadFileType and setIsAcidIow calls elsewhere for an example. + LoadTableDesc loadTableWork = new LoadTableDesc(new Path(fromURI), Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, writeId); loadTableWork.setStmtId(stmtId); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index cd6f1ee692..feb4bfc8b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6906,10 +6906,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, writeId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up - LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsTransactional) + boolean isInsertInto = qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName()); + LoadFileType loadType = (!isInsertInto && !destTableIsTransactional) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); + ltd.setInsertOverwrite(!isInsertInto); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -6995,10 +6997,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, writeId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up - LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsTransactional) // // Both Full-acid and MM tables are excluded. + boolean isInsertInto = qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName()); + LoadFileType loadType = (!isInsertInto && !destTableIsTransactional) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); + ltd.setInsertOverwrite(!isInsertInto); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); @@ -7008,7 +7012,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_tab.getTableName() + "@" + dest_part.getName())); } - break; + break; } case QBMetaData.DEST_LOCAL_FILE: isLocal = true; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java index a4e770ce95..004537002c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java @@ -172,9 +172,15 @@ public boolean isTargetRewritten() { return true; } // INSERT OVERWRITE - if (getLoadTableDesc() != null && getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL) { + LoadTableDesc ltd = getLoadTableDesc(); + if (ltd != null && (ltd.getLoadFileType() == LoadFileType.REPLACE_ALL || ltd.isInsertOverwrite())) { return true; } + + return isTargetCreated(); + } + + public boolean isTargetCreated() { // CREATE TABLE ... AS if (getLoadFileDesc() != null && getLoadFileDesc().getCtasCreateTableDesc() != null) { return true; diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java index 946c300750..92772ad759 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -161,11 +163,18 @@ public void run() { long rawDataSize = 0; long fileSize = 0; long numFiles = 0; - LOG.debug("Aggregating stats for {}", dir); - FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs); + // Note: this code would be invalid for transactional tables of any kind. + Utilities.FILE_OP_LOGGER.debug("Aggregating stats for {}", dir); + List fileList = null; + if (partish.getTable() != null + && AcidUtils.isTransactionalTable(partish.getTable())) { + fileList = AcidUtils.getAcidFilesForStats(partish.getTable(), dir, jc, fs); + } else { + fileList = Lists.newArrayList(HiveStatsUtils.getFileStatusRecurse(dir, -1, fs)); + } for (FileStatus file : fileList) { - LOG.debug("Computing stats for {}", file); + Utilities.FILE_OP_LOGGER.debug("Computing stats for {}", file); if (!file.isDirectory()) { InputFormat inputFormat = ReflectionUtil.newInstance(partish.getInputFormatClass(), jc); InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { partish.getLocation() }); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java index 1d7660e8b2..1c14b01cb2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java @@ -124,11 +124,10 @@ public BasicStatsProcessor(Partish partish, BasicStatsWork work, HiveConf conf, public Object process(StatsAggregator statsAggregator) throws HiveException, MetaException { Partish p = partish; Map parameters = p.getPartParameters(); - if (p.isAcid()) { - StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); - } - - if (work.isTargetRewritten()) { + if (p.isTransactionalTable()) { + // Only consider ACID stats valid for a brand new table. Even with IOW, there will still be old directories to throw us off. + StatsSetupConst.setBasicStatsState(parameters, work.isTargetCreated() ? StatsSetupConst.TRUE : StatsSetupConst.FALSE); + } else if (work.isTargetRewritten()) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE); } @@ -153,7 +152,15 @@ public Object process(StatsAggregator statsAggregator) throws HiveException, Met StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); } - updateQuickStats(parameters, partfileStatus); + if (!p.isTransactionalTable() || work.isTargetCreated()) { + // Only update file stats for ACID when first creating the table/partition. + // Currently partfileStatus is not valid for ACID inserts. + // TODO: we should use AcidUtils.getAcidFilesForStats, but it has to account for the + // current transaction, treating it as if it is committed. + MetaStoreUtils.populateQuickStats(partfileStatus, parameters); + } else if (p.isTransactionalTable()) { + MetaStoreUtils.clearQuickStats(parameters); + } if (StatsSetupConst.areBasicStatsUptoDate(parameters)) { if (statsAggregator != null) { String prefix = getAggregationPrefix(p.getTable(), p.getPartition()); @@ -165,13 +172,10 @@ public Object process(StatsAggregator statsAggregator) throws HiveException, Met } public void collectFileStatus(Warehouse wh) throws MetaException { + // TODO: this is invalid for any transactional tables... partfileStatus = wh.getFileStatusesForSD(partish.getPartSd()); } - private void updateQuickStats(Map parameters, FileStatus[] partfileStatus) throws MetaException { - MetaStoreUtils.populateQuickStats(partfileStatus, parameters); - } - private String getAggregationPrefix(Table table, Partition partition) throws MetaException { String prefix = getAggregationPrefix0(table, partition); String aggKey = prefix.endsWith(Path.SEPARATOR) ? prefix : prefix + Path.SEPARATOR; diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java index 05b0474e90..47810e2c34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java @@ -51,6 +51,10 @@ public final boolean isAcid() { return AcidUtils.isFullAcidTable(getTable()); } + public final boolean isTransactionalTable() { + return AcidUtils.isTransactionalTable(getTable()); + } + public abstract Table getTable(); public abstract Map getPartParameters(); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index d84cf136d5..ed593e5e1a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -65,6 +65,7 @@ public boolean accept(Path file) { } }); for (FileStatus file : status) { + Utilities.FILE_OP_LOGGER.trace("About to read stats file {} ", file.getPath()); Input in = new Input(fs.open(file.getPath())); Kryo kryo = SerializationUtilities.borrowKryo(); try { @@ -72,6 +73,7 @@ public boolean accept(Path file) { } finally { SerializationUtilities.releaseKryo(kryo); } + Utilities.FILE_OP_LOGGER.trace("Read : {}", statsMap); statsList.add(statsMap); in.close(); } @@ -86,7 +88,7 @@ public boolean accept(Path file) { @Override public String aggregateStats(String partID, String statType) { long counter = 0; - LOG.debug("Part ID: " + partID + "\t" + statType); + Utilities.FILE_OP_LOGGER.debug("Part ID: " + partID + "\t" + statType); for (Map> statsMap : statsList) { Map partStat = statsMap.get(partID); if (null == partStat) { // not all partitions are scanned in all mappers, so this could be null. @@ -98,7 +100,7 @@ public String aggregateStats(String partID, String statType) { } counter += Long.parseLong(statVal); } - LOG.info("Read stats for : " + partID + "\t" + statType + "\t" + counter); + Utilities.FILE_OP_LOGGER.info("Read stats for : " + partID + "\t" + statType + "\t" + counter); return String.valueOf(counter); } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 89354a2d34..0dd3eb1017 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -297,7 +297,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, !isPartitionedTable) { Database db = msdb.getDatabase(newDbName); // Update table stats. For partitioned table, we update stats in alterPartition() - MetaStoreUtils.updateTableStatsFast(db, newt, wh, false, true, environmentContext); + MetaStoreUtils.updateTableStatsFast(db, newt, wh, false, true, environmentContext, false); } if (isPartitionedTable) { @@ -436,23 +436,25 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String .currentTimeMillis() / 1000)); } - Table tbl = msdb.getTable(dbname, name); - if (tbl == null) { - throw new InvalidObjectException( - "Unable to alter partition because table or database does not exist."); - } //alter partition if (part_vals == null || part_vals.size() == 0) { try { msdb.openTransaction(); + + Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partition because table or database does not exist."); + } oldPart = msdb.getPartition(dbname, name, new_part.getValues()); if (MetaStoreUtils.requireCalStats(oldPart, new_part, tbl, environmentContext)) { // if stats are same, no need to update if (MetaStoreUtils.isFastStatsSame(oldPart, new_part)) { MetaStoreUtils.updateBasicState(environmentContext, new_part.getParameters()); } else { - MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); + MetaStoreUtils.updatePartitionStatsFast( + new_part, tbl, wh, false, true, environmentContext, false); } } @@ -494,6 +496,11 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String boolean dataWasMoved = false; try { msdb.openTransaction(); + Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partition because table or database does not exist."); + } try { oldPart = msdb.getPartition(dbname, name, part_vals); } catch (NoSuchObjectException e) { @@ -581,7 +588,8 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String } if (MetaStoreUtils.requireCalStats(oldPart, new_part, tbl, environmentContext)) { - MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true, environmentContext); + MetaStoreUtils.updatePartitionStatsFast( + new_part, tbl, wh, false, true, environmentContext, false); } String newPartName = Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()); @@ -650,15 +658,16 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String transactionalListeners = handler.getTransactionalListeners(); } - Table tbl = msdb.getTable(dbname, name); - if (tbl == null) { - throw new InvalidObjectException( - "Unable to alter partitions because table or database does not exist."); - } boolean success = false; try { msdb.openTransaction(); + + Table tbl = msdb.getTable(dbname, name); + if (tbl == null) { + throw new InvalidObjectException( + "Unable to alter partitions because table or database does not exist."); + } for (Partition tmpPart: new_parts) { // Set DDL time to now if not specified if (tmpPart.getParameters() == null || @@ -677,7 +686,8 @@ public Partition alterPartition(final RawStore msdb, Warehouse wh, final String if (MetaStoreUtils.isFastStatsSame(oldTmpPart, tmpPart)) { MetaStoreUtils.updateBasicState(environmentContext, tmpPart.getParameters()); } else { - MetaStoreUtils.updatePartitionStatsFast(tmpPart, wh, false, true, environmentContext); + MetaStoreUtils.updatePartitionStatsFast( + tmpPart, tbl, wh, false, true, environmentContext, false); } } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c6e34a8a22..72aa5adc52 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -1490,7 +1490,7 @@ private void create_table_core(final RawStore ms, final Table tbl, } if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, envContext); + MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, false, envContext, true); } // set create time @@ -2633,7 +2633,7 @@ private Partition append_partition_common(RawStore ms, String dbName, String tab if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir, envContext); + MetaStoreUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, false, envContext, true); } if (ms.addPartition(part)) { @@ -3191,7 +3191,7 @@ private void initializeAddedPartition( final Table tbl, final PartitionSpecProxy.PartitionIterator part, boolean madeDir) throws MetaException { if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir, false, null); + MetaStoreUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, false, null, true); } // set create time diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 50f873a013..3d007e09e7 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -604,17 +604,14 @@ public static boolean isFastStatsSame(Partition oldPart, Partition newPart) { return false; } - public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, - boolean madeDir, EnvironmentContext environmentContext) throws MetaException { - return updateTableStatsFast(db, tbl, wh, madeDir, false, environmentContext); - } - - public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, - boolean madeDir, boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException { + public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, boolean madeDir, + boolean forceRecompute, EnvironmentContext environmentContext, boolean isCreate) throws MetaException { if (tbl.getPartitionKeysSize() == 0) { // Update stats only when unpartitioned + // TODO: this is also invalid for ACID tables, except for the create case by coincidence FileStatus[] fileStatuses = wh.getFileStatusesForUnpartitionedTable(db, tbl); - return updateTableStatsFast(tbl, fileStatuses, madeDir, forceRecompute, environmentContext); + return updateTableStatsFast( + tbl, fileStatuses, madeDir, forceRecompute, environmentContext, isCreate); } else { return false; } @@ -631,7 +628,8 @@ public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, * @return true if the stats were updated, false otherwise */ public static boolean updateTableStatsFast(Table tbl, FileStatus[] fileStatus, boolean newDir, - boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException { + boolean forceRecompute, EnvironmentContext environmentContext, + boolean isCreate) throws MetaException { Map params = tbl.getParameters(); @@ -644,35 +642,42 @@ public static boolean updateTableStatsFast(Table tbl, FileStatus[] fileStatus, b } } - boolean updated = false; - if (forceRecompute || - params == null || - !containsAllFastStats(params)) { - if (params == null) { - params = new HashMap<>(); - } - if (!newDir) { - // The table location already exists and may contain data. - // Let's try to populate those stats that don't require full scan. - LOG.info("Updating table stats fast for " + tbl.getTableName()); - populateQuickStats(fileStatus, params); - LOG.info("Updated size of table " + tbl.getTableName() +" to "+ params.get(StatsSetupConst.TOTAL_SIZE)); - if (environmentContext != null - && environmentContext.isSetProperties() - && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( - StatsSetupConst.STATS_GENERATED))) { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); - } else { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); - } - } + if (!forceRecompute && params != null && containsAllFastStats(params)) return false; + if (params == null) { + params = new HashMap<>(); + } + if (!isCreate && MetaStoreUtils.isTransactionalTable(tbl.getParameters())) { + // TODO: we should use AcidUtils.getAcidFilesForStats, but it has to account for the + // current transaction, treating it as if it is committed. + LOG.warn("Not updating fast stats for a transactional table " + tbl.getTableName()); tbl.setParameters(params); - updated = true; + return true; + } + if (!newDir) { + // The table location already exists and may contain data. + // Let's try to populate those stats that don't require full scan. + LOG.info("Updating table stats fast for " + tbl.getTableName()); + populateQuickStats(fileStatus, params); + LOG.info("Updated size of table " + tbl.getTableName() +" to "+ params.get(StatsSetupConst.TOTAL_SIZE)); + if (environmentContext != null + && environmentContext.isSetProperties() + && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( + StatsSetupConst.STATS_GENERATED))) { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); + } else { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); + } } - return updated; + tbl.setParameters(params); + return true; } + /** This method is invalid for MM and ACID tables unless fileStatus comes from AcidUtils. */ + // TODO: use AcidUtils.getAcidFilesForStats, accounting for current transaction where necessary, + // treating it as if it is committed, wherever this is called. public static void populateQuickStats(FileStatus[] fileStatus, Map params) { + // Why is this even in metastore? + LOG.trace("Populating quick stats based on {} files", fileStatus.length); int numFiles = 0; long tableSize = 0L; for (FileStatus status : fileStatus) { @@ -686,6 +691,11 @@ public static void populateQuickStats(FileStatus[] fileStatus, Map params) { + params.remove(StatsSetupConst.NUM_FILES); + params.remove(StatsSetupConst.TOTAL_SIZE); + } + public static boolean areSameColumns(List oldCols, List newCols) { return ListUtils.isEqualList(oldCols, newCols); } @@ -705,16 +715,6 @@ public static void updateBasicState(EnvironmentContext environmentContext, Map params = part.getParameters(); - boolean updated = false; - if (forceRecompute || - params == null || - !containsAllFastStats(params)) { - if (params == null) { - params = new HashMap<>(); - } - if (!madeDir) { - // The partition location already existed and may contain data. Lets try to - // populate those statistics that don't require a full scan of the data. - LOG.warn("Updating partition stats fast for: " + part.getTableName()); - FileStatus[] fileStatus = wh.getFileStatusesForLocation(part.getLocation()); - populateQuickStats(fileStatus, params); - LOG.warn("Updated size to " + params.get(StatsSetupConst.TOTAL_SIZE)); - updateBasicState(environmentContext, params); - } + if (!forceRecompute && params != null && containsAllFastStats(params)) return false; + if (params == null) { + params = new HashMap<>(); + } + if (!isCreate && MetaStoreUtils.isTransactionalTable(table.getParameters())) { + // TODO: implement? + LOG.warn("Not updating fast stats for a transactional table " + table.getTableName()); part.setParameters(params); - updated = true; + return true; } - return updated; + if (!madeDir) { + // The partition location already existed and may contain data. Lets try to + // populate those statistics that don't require a full scan of the data. + LOG.warn("Updating partition stats fast for: " + part.getTableName()); + FileStatus[] fileStatus = wh.getFileStatusesForLocation(part.getLocation()); + populateQuickStats(fileStatus, params); + LOG.warn("Updated size to " + params.get(StatsSetupConst.TOTAL_SIZE)); + updateBasicState(environmentContext, params); + } + part.setParameters(params); + return true; } /* @@ -790,6 +793,12 @@ public static boolean columnsIncludedByNameType(List oldCols, } /** Duplicates AcidUtils; used in a couple places in metastore. */ + public static boolean isTransactionalTable(Map params) { + String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return (transactionalProp != null && "true".equalsIgnoreCase(transactionalProp)); + } + + /** Duplicates AcidUtils; used in a couple places in metastore. */ public static boolean isInsertOnlyTableParam(Map params) { String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp)); diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java index 2599ab103e..7091c5b2f5 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java @@ -489,7 +489,7 @@ private static Partition makePartitionObject(String dbName, String tblName, part4.setSd(tbl.getSd().deepCopy()); part4.getSd().setSerdeInfo(tbl.getSd().getSerdeInfo().deepCopy()); part4.getSd().setLocation(tbl.getSd().getLocation() + ptnLocationSuffix); - MetaStoreUtils.updatePartitionStatsFast(part4, warehouse, null); + MetaStoreUtils.updatePartitionStatsFast(part4, tbl, warehouse, false, false, null, true); return part4; }