Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
===================================================================
--- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (revision 6444)
+++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (working copy)
@@ -31,8 +31,10 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
@@ -57,6 +59,9 @@
// gone in 0.18+
}
+ public boolean isJobPreparing(RunningJob job) throws IOException {
+ return job.getJobState() == JobStatus.PREP;
+ }
/**
* Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
*/
Index: shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java
===================================================================
--- shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java (revision 6444)
+++ shims/src/0.17/java/org/apache/hadoop/hive/shims/Hadoop17Shims.java (working copy)
@@ -26,6 +26,7 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import java.io.IOException;
@@ -43,6 +44,13 @@
return false;
}
+ /**
+ * No way to get this information in hadoop 17
+ */
+ public boolean isJobPreparing(RunningJob job) throws IOException {
+ return false;
+ }
+
public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
throws IOException {
fmt.validateInput(conf);
Index: shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java
===================================================================
--- shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java (revision 6444)
+++ shims/src/0.18/java/org/apache/hadoop/hive/shims/Hadoop18Shims.java (working copy)
@@ -26,6 +26,7 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
@@ -51,6 +52,10 @@
// gone in 0.18+
}
+ public boolean isJobPreparing(RunningJob job) throws IOException {
+ return false;
+ }
+
/**
* workaround for hadoop-17 - jobclient only looks at commandlineconfig.
*/
Index: shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java
===================================================================
--- shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java (revision 6444)
+++ shims/src/0.19/java/org/apache/hadoop/hive/shims/Hadoop19Shims.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskID;
@@ -40,6 +41,7 @@
import org.apache.hadoop.mapred.MultiFileSplit;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RunningJob;
/**
* Implemention of shims against Hadoop 0.19.0.
@@ -59,6 +61,10 @@
throws IOException {
// gone in 0.18+
}
+
+ public boolean isJobPreparing(RunningJob job) throws IOException {
+ return job.getJobState() == JobStatus.PREP;
+ }
/**
* workaround for hadoop-17 - jobclient only looks at commandlineconfig.
Index: shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
===================================================================
--- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (revision 6444)
+++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (working copy)
@@ -32,6 +32,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
/**
@@ -49,6 +50,12 @@
* command line interpretation.
*/
boolean usesJobShell();
+
+ /**
+ * Return true if the job has not switched to RUNNING state yet
+ * and is still in PREP state
+ */
+ boolean isJobPreparing(RunningJob job) throws IOException;
/**
* Calls fs.deleteOnExit(path) if such a function exists.
Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 6444)
+++ conf/hive-default.xml (working copy)
@@ -474,6 +474,12 @@
+ hive.exec.counters.pull.interval
+ 1000
+ The interval with which to poll the JobTracker for the counters the running job. The smaller it is the more load there will be on the jobtracker, the higher it is the less granular the caught will be.
+
+
+
hive.enforce.bucketing
false
Whether bucketing is enforced. If true, while inserting into the table, bucketing is enforced.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 6444)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -84,6 +84,7 @@
EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true),
+ HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L),
DYNAMICPARTITIONING("hive.exec.dynamic.partition", false),
DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "strict"),
DYNAMICPARTITIONMAXPARTS("hive.exec.max.dynamic.partitions", 1000),
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 6444)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -73,6 +73,7 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
@@ -315,11 +316,24 @@
long maxReportInterval = 60 * 1000; // One minute
boolean fatal = false;
StringBuilder errMsg = new StringBuilder();
+ long pullInterval = HiveConf.getLongVar(job,
+ HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
+ boolean initializing = true;
while (!rj.isComplete()) {
try {
- Thread.sleep(1000);
+ Thread.sleep(pullInterval);
} catch (InterruptedException e) {
}
+
+ if (initializing &&
+ ShimLoader.getHadoopShims().isJobPreparing(rj)) {
+ // No reason to poll untill the job is initialized
+ continue;
+ } else {
+ // By now the job is initialized so no reason to do
+ // rj.getJobState() again and we do not want to do an extra RPC call
+ initializing = false;
+ }
th.setRunningJob(jc.getJob(rj.getJobID()));
// If fatal errors happen we should kill the job immediately rather than