diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml
index ca6ec0f..bc8c0e5 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -216,7 +216,7 @@
spark.driver.extraClassPath
- ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar
+ ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar
diff --git a/pom.xml b/pom.xml
index 77cfaeb..361219d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1033,6 +1033,8 @@
src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase,cbo_t1,cbo_t2,cbo_t3,src_cbo,part,lineitem
${test.tmp.dir}/conf/krb5.conf
+
+ ${antlr.version}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index 7a36b53..eb48dd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -108,12 +108,16 @@ public MapJoinPersistableTableContainer load(ObjectInputStream in)
public MapJoinTableContainer load(
FileSystem fs, Path folder, Configuration hconf) throws HiveException {
try {
+
+ if (!fs.exists(folder)) {
+ return getDefaultEmptyContainer(keyContext, valueContext);
+ }
if (!fs.isDirectory(folder)) {
throw new HiveException("Error, not a directory: " + folder);
}
FileStatus[] fileStatuses = fs.listStatus(folder);
if (fileStatuses == null || fileStatuses.length == 0) {
- return null;
+ return getDefaultEmptyContainer(keyContext, valueContext);
}
SerDe keySerDe = keyContext.getSerDe();
@@ -210,50 +214,51 @@ private void loadOptimized(MapJoinBytesTableContainer container, ObjectInputStre
public MapJoinTableContainer loadFastContainer(MapJoinDesc mapJoinDesc,
FileSystem fs, Path folder, Configuration hconf) throws HiveException {
try {
- if (!fs.isDirectory(folder)) {
- throw new HiveException("Error, not a directory: " + folder);
- }
- FileStatus[] fileStatuses = fs.listStatus(folder);
- if (fileStatuses == null || fileStatuses.length == 0) {
- return null;
- }
-
- SerDe keySerDe = keyContext.getSerDe();
- SerDe valueSerDe = valueContext.getSerDe();
- Writable key = keySerDe.getSerializedClass().newInstance();
- Writable value = valueSerDe.getSerializedClass().newInstance();
-
VectorMapJoinFastTableContainer tableContainer =
new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1);
tableContainer.setSerde(keyContext, valueContext);
- for (FileStatus fileStatus : fileStatuses) {
- Path filePath = fileStatus.getPath();
- if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
- throw new HiveException("Error, not a file: " + filePath);
+ if (fs.exists(folder)) {
+ if (!fs.isDirectory(folder)) {
+ throw new HiveException("Error, not a directory: " + folder);
}
- InputStream is = null;
- ObjectInputStream in = null;
- try {
- is = fs.open(filePath, 4096);
- in = new ObjectInputStream(is);
- // skip the name and metadata
- in.readUTF();
- in.readObject();
- int numKeys = in.readInt();
- for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
- key.readFields(in);
- long numRows = in.readLong();
- for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
- value.readFields(in);
- tableContainer.putRow(key, value);
+
+ FileStatus[] fileStatuses = fs.listStatus(folder);
+ if (fileStatuses != null && fileStatuses.length > 0) {
+ SerDe keySerDe = keyContext.getSerDe();
+ SerDe valueSerDe = valueContext.getSerDe();
+ Writable key = keySerDe.getSerializedClass().newInstance();
+ Writable value = valueSerDe.getSerializedClass().newInstance();
+
+ for (FileStatus fileStatus : fileStatuses) {
+ Path filePath = fileStatus.getPath();
+ if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) {
+ throw new HiveException("Error, not a file: " + filePath);
+ }
+ InputStream is = null;
+ ObjectInputStream in = null;
+ try {
+ is = fs.open(filePath, 4096);
+ in = new ObjectInputStream(is);
+ // skip the name and metadata
+ in.readUTF();
+ in.readObject();
+ int numKeys = in.readInt();
+ for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
+ key.readFields(in);
+ long numRows = in.readLong();
+ for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+ value.readFields(in);
+ tableContainer.putRow(key, value);
+ }
+ }
+ } finally {
+ if (in != null) {
+ in.close();
+ } else if (is != null) {
+ is.close();
+ }
}
- }
- } finally {
- if (in != null) {
- in.close();
- } else if (is != null) {
- is.close();
}
}
}
@@ -312,4 +317,13 @@ private MapJoinPersistableTableContainer create(
throw new HiveException(msg, e);
}
}
+
+ // Get an empty container when the small table is empty.
+ private static MapJoinTableContainer getDefaultEmptyContainer(MapJoinObjectSerDeContext keyCtx,
+ MapJoinObjectSerDeContext valCtx) throws SerDeException {
+ MapJoinTableContainer container = new HashMapWrapper();
+ container.setSerde(keyCtx, valCtx);
+ container.seal();
+ return container;
+ }
}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index f5b1e48..e3b88d1 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -45,6 +45,7 @@
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.JavaSparkListener;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkJobInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.SparkListenerJobEnd;
@@ -323,18 +324,22 @@ public void call(JavaFutureAction> future,
private final BaseProtocol.JobRequest req;
private final List> jobs;
- private final AtomicInteger completed;
+ private final AtomicInteger jobEndReceived;
+ private int completed;
private SparkCounters sparkCounters;
private Set cachedRDDIds;
+ private Integer sparkJobId;
private Future> future;
JobWrapper(BaseProtocol.JobRequest req) {
this.req = req;
this.jobs = Lists.newArrayList();
- this.completed = new AtomicInteger();
+ completed = 0;
+ jobEndReceived = new AtomicInteger(0);
this.sparkCounters = null;
this.cachedRDDIds = null;
+ this.sparkJobId = null;
}
@Override
@@ -351,11 +356,26 @@ public void call(JavaFutureAction> future,
});
T result = req.job.call(jc);
- synchronized (completed) {
- while (completed.get() < jobs.size()) {
- LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
- req.id, completed.get(), jobs.size());
- completed.wait();
+ // In case the job is empty, there won't be JobStart/JobEnd events. The only way
+ // to know if the job has finished is to check the futures here ourselves.
+ for (JavaFutureAction> future : jobs) {
+ future.get();
+ completed++;
+ LOG.debug("Client job {}: {} of {} Spark jobs finished.",
+ req.id, completed, jobs.size());
+ }
+
+ // If the job is not empty (but runs fast), we have to wait until all the TaskEnd/JobEnd
+ // events are processed. Otherwise, task metrics may get lost. See HIVE-13525.
+ if (sparkJobId != null) {
+ SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
+ if (sparkJobInfo != null && sparkJobInfo.stageIds() != null &&
+ sparkJobInfo.stageIds().length > 0) {
+ synchronized (jobEndReceived) {
+ while (jobEndReceived.get() < jobs.size()) {
+ jobEndReceived.wait();
+ }
+ }
}
}
@@ -363,11 +383,6 @@ public void call(JavaFutureAction> future,
if (sparkCounters != null) {
counters = sparkCounters.snapshot();
}
- // make sure job has really succeeded
- // at this point, future.get shall not block us
- for (JavaFutureAction> future : jobs) {
- future.get();
- }
protocol.jobFinished(req.id, result, null, counters);
} catch (Throwable t) {
// Catch throwables in a best-effort to report job status back to the client. It's
@@ -390,9 +405,9 @@ void submit() {
}
void jobDone() {
- synchronized (completed) {
- completed.incrementAndGet();
- completed.notifyAll();
+ synchronized (jobEndReceived) {
+ jobEndReceived.incrementAndGet();
+ jobEndReceived.notifyAll();
}
}
@@ -420,7 +435,8 @@ private void monitorJob(JavaFutureAction> job,
jc.getMonitoredJobs().get(req.id).add(job);
this.sparkCounters = sparkCounters;
this.cachedRDDIds = cachedRDDIds;
- protocol.jobSubmitted(req.id, job.jobIds().get(0));
+ sparkJobId = job.jobIds().get(0);
+ protocol.jobSubmitted(req.id, sparkJobId);
}
}