diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 1e0a749..15dbe77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinPersistableTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -44,6 +45,8 @@ public class SparkHashTableSinkOperator extends TerminalOperator implements Serializable { private static final long serialVersionUID = 1L; + private final String CLASS_NAME = this.getClass().getName(); + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); protected static final Log LOG = LogFactory.getLog(SparkHashTableSinkOperator.class.getName()); private HashTableSinkOperator htsOperator; @@ -90,6 +93,7 @@ public void closeOp(boolean abort) throws HiveException { protected void flushToFile(MapJoinPersistableTableContainer tableContainer, byte tag) throws IOException, HiveException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName()); MapredLocalWork localWork = getExecContext().getLocalWork(); BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext(); Path inputPath = getExecContext().getCurrentInputPath(); @@ -151,6 +155,7 @@ protected void flushToFile(MapJoinPersistableTableContainer tableContainer, FileStatus status = fs.getFileStatus(path); htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path + " (" + status.getLen() + " bytes)"); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName()); } public void setTag(byte tag) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 46894ac..5a3eff9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.io.IOContext; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -56,7 +57,6 @@ * */ public class SparkMapRecordHandler extends SparkRecordHandler { - private static final String PLAN_KEY = "__MAP_PLAN__"; private MapOperator mo; public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class); @@ -67,6 +67,7 @@ private ExecMapperContext execContext; public void init(JobConf job, OutputCollector output, Reporter reporter) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); super.init(job, output, reporter); isLogInfoEnabled = l4j.isInfoEnabled(); @@ -136,6 +137,7 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { throw new RuntimeException("Map operator initialization failed", e); } } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index 3f23541..fb08950 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -27,12 +27,16 @@ import java.util.Set; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; import com.google.common.base.Preconditions; public class SparkPlan { + private final String CLASS_NAME = SparkPlan.class.getName(); + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + private final Set rootTrans = new HashSet(); private final Set leafTrans = new HashSet(); private final Map> transGraph = new HashMap>(); @@ -40,6 +44,7 @@ private final Set cachedRDDIds = new HashSet(); public JavaPairRDD generateGraph() throws IllegalStateException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); Map> tranToOutputRDDMap = new HashMap>(); for (SparkTran tran : getAllTrans()) { @@ -75,6 +80,7 @@ } } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); return finalRDD; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 215d53f..61b3a2f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -22,9 +22,16 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; +import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; +import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -32,9 +39,6 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; -import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; -import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; -import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -45,15 +49,14 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import com.google.common.base.Preconditions; public class SparkPlanGenerator { + private final String CLASS_NAME = SparkPlanGenerator.class.getName(); + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class); private JavaSparkContext sc; @@ -82,25 +85,29 @@ public SparkPlanGenerator( } public SparkPlan generate(SparkWork sparkWork) throws Exception { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN); SparkPlan sparkPlan = new SparkPlan(); cloneToWork = sparkWork.getCloneToWork(); workToTranMap.clear(); workToParentWorkTranMap.clear(); for (BaseWork work : sparkWork.getAllWork()) { - SparkTran tran; + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + SparkTran tran = generate(work); SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); - tran = generate(work); sparkPlan.addTran(tran); sparkPlan.connect(parentTran, tran); workToTranMap.put(work, tran); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN); return sparkPlan; } // Generate (possibly get from a cached result) parent SparkTran - private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception { + private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, + BaseWork work) throws Exception { if (cloneToWork.containsKey(work)) { BaseWork originalWork = cloneToWork.get(work); if (workToParentWorkTranMap.containsKey(originalWork)) { @@ -208,15 +215,17 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { // Make sure we'll use a different plan path from the original one HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, ""); try { - cloned.setPartitionerClass((Class) (Class.forName(HiveConf.getVar(cloned, - HiveConf.ConfVars.HIVEPARTITIONER)))); + cloned.setPartitionerClass((Class) + (Class.forName(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER)))); } catch (ClassNotFoundException e) { - String msg = "Could not find partitioner class: " + e.getMessage() + " which is specified by: " + + String msg = "Could not find partitioner class: " + e.getMessage() + + " which is specified by: " + HiveConf.ConfVars.HIVEPARTITIONER.varname; throw new IllegalArgumentException(msg, e); } if (work instanceof MapWork) { - List inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false); + List inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, + scratchDir, context, false); Utilities.setInputPaths(cloned, inputPaths); Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false); Utilities.createTmpDirs(cloned, (MapWork) work); @@ -224,7 +233,8 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { MergeFileWork mergeFileWork = (MergeFileWork) work; cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName()); cloned.set("mapred.input.format.class", mergeFileWork.getInputformat()); - cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class, FileOutputFormat.class); + cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class, + FileOutputFormat.class); } else { cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index a5d73a7..08eb4fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; @@ -33,6 +34,8 @@ import java.util.Iterator; public abstract class SparkRecordHandler { + protected final String CLASS_NAME = this.getClass().getName(); + protected final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final Log LOG = LogFactory.getLog(this.getClass()); // used to log memory usage periodically diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index a9fbf6c..12cf5df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -34,11 +34,11 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -106,6 +106,7 @@ private MapredLocalWork localWork = null; public void init(JobConf job, OutputCollector output, Reporter reporter) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); super.init(job, output, reporter); rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; @@ -230,7 +231,7 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { throw new RuntimeException("Reduce operator initialization failed", e); } } - + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 362072f..78b5a1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; @@ -75,6 +76,8 @@ import com.google.common.collect.Lists; public class SparkTask extends Task { + private final String CLASS_NAME = SparkTask.class.getName(); + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private static final long serialVersionUID = 1L; private transient JobConf job; private transient ContentSummary inputSummary; @@ -100,7 +103,10 @@ public int execute(DriverContext driverContext) { SparkWork sparkWork = getWork(); sparkWork.setRequiredCounterPrefix(getCounterPrefixes()); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (sparkJobStatus != null) { SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); @@ -198,6 +204,7 @@ public String getName() { if (w instanceof MapWork) { List parents = work.getParents(w); boolean candidate = true; + // TODO: since we don't have UnionWork anymore, can we simplify this? for (BaseWork parent: parents) { if (!(parent instanceof UnionWork)) { candidate = false; @@ -290,7 +297,7 @@ private String getTablePrefix(StatsWork work) throws HiveException { } else { tableName = work.getLoadFileDesc().getDestinationCreateTable(); } - Table table = null; + Table table; try { table = db.getTable(tableName); } catch (HiveException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 90a2f9e..ff65418 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.spark.JobExecutionStatus; @@ -41,6 +42,7 @@ private static final Log LOG = LogFactory.getLog(CLASS_NAME); private transient LogHelper console; + private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final int checkInterval = 1000; private final int printInterval = 3000; private long lastPrintTime; @@ -63,6 +65,9 @@ public int startMonitor() { Map lastProgressMap = null; long startTime = -1; + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + while (true) { JobExecutionStatus state = sparkJobStatus.getState(); try { @@ -77,6 +82,7 @@ public int startMonitor() { switch (state) { case RUNNING: if (!running) { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); // print job stages. console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + "] stages:"); @@ -140,10 +146,13 @@ public int startMonitor() { } } } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); return rc; } - private void printStatus(Map progressMap, Map lastProgressMap) { + private void printStatus(Map progressMap, + Map lastProgressMap) { // do not print duplicate status while still in middle of print interval. boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); @@ -170,9 +179,17 @@ private void printStatus(Map progressMap, Map 0 || running > 0 || failed > 0)) { /* stage is started, but not complete */ + if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); + } if (failed > 0) { reportBuffer.append( String.format( diff --git ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index 4e2b130..eedd652 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -65,6 +65,17 @@ public static final String LOAD_HASHTABLE = "LoadHashtable"; public static final String ORC_GET_SPLITS = "OrcGetSplits"; + public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning"; + public static final String SPARK_BUILD_PLAN = "SparkBuildPlan"; + public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph"; + public static final String SPARK_SUBMIT_JOB = "SparkSubmitJob"; + public static final String SPARK_RUN_JOB = "SparkRunJob"; + public static final String SPARK_CREATE_TRAN = "SparkCreateTran."; + public static final String SPARK_RUN_STAGE = "SparkRunStage."; + public static final String SPARK_INIT_OPERATORS = "SparkInitializeOperators"; + public static final String SPARK_GENERATE_TASK_TREE = "SparkGenerateTaskTree"; + public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable."; + protected static final ThreadLocal perfLogger = new ThreadLocal(); protected final Map startTimes = new HashMap(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index b6a7ac2..50ea14c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TypeRule; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; @@ -89,6 +90,8 @@ * TODO: need to complete and make it fit to Spark. */ public class SparkCompiler extends TaskCompiler { + private static final String CLASS_NAME = SparkCompiler.class.getName(); + private static final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private static final Log logger = LogFactory.getLog(SparkCompiler.class); public SparkCompiler() { @@ -141,6 +144,7 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, protected void generateTaskTree(List> rootTasks, ParseContext pCtx, List> mvTask, Set inputs, Set outputs) throws SemanticException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); GenSparkUtils.getUtils().resetSequenceNumber(); ParseContext tempParseContext = getParseContext(pCtx, rootTasks); @@ -212,6 +216,8 @@ public Object process(Node n, Stack s, for (FileSinkOperator fileSink: procCtx.fileSinkSet) { GenSparkUtils.getUtils().processFileSink(procCtx, fileSink); } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); } @Override