commit 9a89b6be264993a881361592cbe3e21f66735913 Author: Sahil Takiar Date: Thu May 31 16:51:33 2018 -0500 HIVE-19752: PerfLogger integration for critical Hive-on-S3 paths diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 3d6315c7dd..111e614ab8 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -46,6 +46,7 @@ public static final String DO_AUTHORIZATION = "doAuthorization"; public static final String DRIVER_EXECUTE = "Driver.execute"; public static final String INPUT_SUMMARY = "getInputSummary"; + public static final String INPUT_PATHS = "getInputPaths"; public static final String GET_SPLITS = "getSplits"; public static final String RUN_TASKS = "runTasks"; public static final String SERIALIZE_PLAN = "serializePlan"; @@ -85,6 +86,11 @@ public static final String SPARK_OPTIMIZE_TASK_TREE = "SparkOptimizeTaskTree"; public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable."; + public static final String FILE_MOVES = "FileMoves"; + public static final String LOAD_TABLE = "LoadTable"; + public static final String LOAD_PARTITION = "LoadPartition"; + public static final String LOAD_DYNAMIC_PARTITIONS = "LoadDynamicPartitions"; + protected final Map startTimes = new HashMap(); protected final Map endTimes = new HashMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index dbda5fdef4..f80a945be5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,9 @@ public MoveTask() { private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) throws HiveException { try { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); + String mesg = "Moving data to " + (isDfsDir ? "" : "local ") + "directory " + targetPath.toString(); String mesg_detail = " from " + sourcePath.toString(); @@ -101,6 +106,8 @@ private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) FileSystem dstFs = FileSystem.getLocal(conf); moveFileFromDfsToLocal(sourcePath, targetPath, fs, dstFs); } + + perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES); } catch (Exception e) { throw new HiveException("Unable to move source " + sourcePath + " to destination " + targetPath, e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 406bea011d..2177c33787 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -3285,6 +3285,9 @@ public static String getVertexCounterName(String counter, String vertexName) { public static List getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx, boolean skipDummy) throws Exception { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_PATHS); + Set pathsProcessed = new HashSet(); List pathsToAdd = new LinkedList(); LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); @@ -3373,6 +3376,8 @@ public static String getVertexCounterName(String counter, String vertexName) { } } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_PATHS); + return finalPathsToAdd; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 107d032eb7..ef7be0361c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1655,6 +1655,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); try { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION); + // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); /** @@ -1690,8 +1693,8 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par newPartPath = oldPartPath; } List newFiles = null; - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin("MoveTask", "FileMoves"); + + perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); // If config is set, table is not temporary and partition being inserted exists, capture // the list of files added. For not yet existing partitions (insert overwrite to new partition // or dynamic partition inserts), the add partition event will capture the list of files added. @@ -1749,7 +1752,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par tbl.getNumBuckets() > 0, isFullAcidTable); } } - perfLogger.PerfLogEnd("MoveTask", "FileMoves"); + perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES); Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); @@ -1833,6 +1836,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } else { setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart); } + perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION); return newTPart; } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); @@ -2125,6 +2129,9 @@ private void constructOneLBLocationMap(FileStatus fSta, final boolean hasFollowingStatsTask, final AcidUtils.Operation operation, boolean isInsertOverwrite) throws HiveException { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); + final Map, Partition> partitionsMap = Collections.synchronizedMap(new LinkedHashMap, Partition>()); @@ -2234,6 +2241,9 @@ public Void call() throws Exception { AcidUtils.toDataOperationType(operation)); } LOG.info("Loaded " + partitionsMap.size() + " partitions"); + + perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); + return partitionsMap; } catch (TException te) { throw new HiveException("Exception updating metastore for acid table " @@ -2267,6 +2277,10 @@ public Void call() throws Exception { public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException { + + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE); + List newFiles = null; Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); @@ -2304,6 +2318,9 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType } Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath + " (replace = " + loadFileType + ")"); + + perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); + if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); @@ -2321,6 +2338,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType throw new HiveException("addFiles: filesystem error in check phase", e); } } + perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES); } if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE); @@ -2354,6 +2372,8 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType alterTable(tbl, environmentContext); fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + + perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_TABLE); } /**