commit 07bcf37e98818fe3e61641676eab765f4310ea3e Author: Sahil Takiar Date: Fri Feb 9 15:59:28 2018 -0800 HIVE-18663: Logged Spark Job Id contains a UUID instead of the actual id diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 62daaaa610..76f6ecc2b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -77,7 +77,7 @@ private static final LogHelper console = new LogHelper(LOG); private PerfLogger perfLogger; private static final long serialVersionUID = 1L; - private transient String sparkJobID; + private transient int sparkJobID; private transient SparkStatistics sparkStatistics; private transient long submitTime; private transient long startTime; @@ -123,18 +123,19 @@ public int execute(DriverContext driverContext) { } addToHistory(jobRef); - sparkJobID = jobRef.getJobId(); this.jobID = jobRef.getSparkJobStatus().getAppID(); rc = jobRef.monitorJob(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); + sparkJobID = sparkJobStatus.getJobId(); getSparkJobInfo(sparkJobStatus, rc); if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); + LOG.info(String.format("=====Spark Job[%s] statistics=====", sparkJobID)); logSparkStatistic(sparkStatistics); } - LOG.info("Execution completed successfully"); + LOG.info("Successfully completed Spark Job " + sparkJobID + " with application ID " + + jobID + " and task ID " + getId()); } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, @@ -192,7 +193,8 @@ private void addToHistory(SparkJobRef jobRef) { console.printInfo("Starting Spark Job = " + jobRef.getJobId()); if (SessionState.get() != null) { SessionState.get().getHiveHistory() - .setQueryProperty(queryState.getQueryId(), Keys.SPARK_JOB_ID, jobRef.getJobId()); + .setQueryProperty(queryState.getQueryId(), Keys.SPARK_JOB_ID, + Integer.toString(jobRef.getSparkJobStatus().getJobId())); } } @@ -277,7 +279,7 @@ public String getName() { return ((ReduceWork) children.get(0)).getReducer(); } - public String getSparkJobID() { + public int getSparkJobID() { return sparkJobID; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ce07a9fadd..2b97bab481 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -126,7 +126,7 @@ public SparkStatistics getSparkStatistics() { // add Hive operator level statistics. sparkStatisticsBuilder.add(getCounter()); // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; + String jobIdentifier = "Spark Job[" + getJobId() + "] Metrics"; Map flatJobMetric = SparkMetricsUtils.collectMetrics( metricsCollection.getAllMetrics());