diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 8a6499b..bdc289d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -125,7 +125,8 @@ public ExecDriver() { super(); console = new LogHelper(LOG); - this.jobExecHelper = new HadoopJobExecHelper(queryState, job, console, this, this); + job = new JobConf(ExecDriver.class); + this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } @Override @@ -169,7 +170,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives); } conf.stripHiddenConfigurations(job); - this.jobExecHelper = new HadoopJobExecHelper(queryState, job, console, this, this); + this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } /** @@ -179,7 +180,7 @@ public ExecDriver(MapredWork plan, JobConf job, boolean isSilent) throws HiveExc setWork(plan); this.job = job; console = new LogHelper(LOG, isSilent); - this.jobExecHelper = new HadoopJobExecHelper(queryState, job, console, this, this); + this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } /** @@ -681,6 +682,7 @@ public static void main(String[] args) throws IOException, HiveException { String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID, "").trim(); if(queryId.isEmpty()) { queryId = "unknown-" + System.currentTimeMillis(); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYID, queryId); } System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 5656f9a..cfb4a28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.MapRedStats; -import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; @@ -78,7 +77,7 @@ public transient JobID jobId; private final LogHelper console; private final HadoopJobExecHook callBackObj; - private final QueryState queryState; + private final String queryId; /** * Update counters relevant to this task. @@ -139,9 +138,9 @@ public void setJobId(JobID jobId) { this.jobId = jobId; } - public HadoopJobExecHelper(QueryState queryState, JobConf job, LogHelper console, + public HadoopJobExecHelper(JobConf job, LogHelper console, Task task, HadoopJobExecHook hookCallBack) { - this.queryState = queryState; + this.queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID, "unknown-" + System.currentTimeMillis()); this.job = job; this.console = console; this.task = task; @@ -259,7 +258,6 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx String logMapper; String logReducer; - String queryId = queryState.getQueryId(); TaskReport[] mappers = jc.getMapTaskReports(rj.getID()); if (mappers == null) { logMapper = "no information for number of mappers; "; @@ -364,11 +362,11 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx String output = report.toString(); SessionState ss = SessionState.get(); if (ss != null) { - ss.getHiveHistory().setTaskCounters(queryState.getQueryId(), getId(), ctrs); - ss.getHiveHistory().setTaskProperty(queryState.getQueryId(), getId(), + ss.getHiveHistory().setTaskCounters(queryId, getId(), ctrs); + ss.getHiveHistory().setTaskProperty(queryId, getId(), Keys.TASK_HADOOP_PROGRESS, output); if (ss.getConf().getBoolVar(HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS)) { - ss.getHiveHistory().progressTask(queryState.getQueryId(), this.task); + ss.getHiveHistory().progressTask(queryId, this.task); this.callBackObj.logPlanProgress(ss); } } @@ -397,7 +395,7 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx } else { SessionState ss = SessionState.get(); if (ss != null) { - ss.getHiveHistory().setTaskCounters(queryState.getQueryId(), getId(), ctrs); + ss.getHiveHistory().setTaskCounters(queryId, getId(), ctrs); } success = rj.isSuccessful(); } @@ -441,7 +439,7 @@ public void jobInfo(RunningJob rj) { console.printInfo("Job running in-process (local Hadoop)"); } else { if (SessionState.get() != null) { - SessionState.get().getHiveHistory().setTaskProperty(queryState.getQueryId(), + SessionState.get().getHiveHistory().setTaskProperty(queryId, getId(), Keys.TASK_HADOOP_ID, rj.getID().toString()); } console.printInfo(getJobStartMsg(rj.getID()) + ", Tracking URL = " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 24bf506..38d48c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -128,7 +128,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext job = new JobConf(conf, ExecDriver.class); execContext = new ExecMapperContext(job); //we don't use the HadoopJobExecHooks for local tasks - this.jobExecHelper = new HadoopJobExecHelper(queryState, job, console, this, null); + this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null); } public static String now() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 0fedd48..376bab2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -65,7 +65,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) { super.initialize(queryState, queryPlan, driverContext, opContext); job = new JobConf(conf, MergeFileTask.class); - jobExecHelper = new HadoopJobExecHelper(queryState, job, this.console, this, this); + jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 6771b3e..6131581 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -89,7 +89,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) { super.initialize(queryState, queryPlan, driverContext, opContext); job = new JobConf(conf, PartialScanTask.class); - jobExecHelper = new HadoopJobExecHelper(queryState, job, this.console, this, this); + jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index ffc6311..2d29afc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -64,7 +64,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) { super.initialize(queryState, queryPlan, driverContext, opContext); job = new JobConf(conf, ColumnTruncateTask.class); - jobExecHelper = new HadoopJobExecHelper(queryState, job, this.console, this, this); + jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); } @Override