diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml index ca6ec0f..bc8c0e5 100644 --- a/data/conf/spark/standalone/hive-site.xml +++ b/data/conf/spark/standalone/hive-site.xml @@ -216,7 +216,7 @@ spark.driver.extraClassPath - ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar + ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar diff --git a/pom.xml b/pom.xml index 77cfaeb..361219d 100644 --- a/pom.xml +++ b/pom.xml @@ -1033,6 +1033,8 @@ src,src1,srcbucket,srcbucket2,src_json,src_thrift,src_sequencefile,srcpart,alltypesorc,src_hbase,cbo_t1,cbo_t2,cbo_t3,src_cbo,part,lineitem ${test.tmp.dir}/conf/krb5.conf + + ${antlr.version} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java index 7a36b53..eb48dd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java @@ -108,12 +108,16 @@ public MapJoinPersistableTableContainer load(ObjectInputStream in) public MapJoinTableContainer load( FileSystem fs, Path folder, Configuration hconf) throws HiveException { try { + + if (!fs.exists(folder)) { + return getDefaultEmptyContainer(keyContext, valueContext); + } if (!fs.isDirectory(folder)) { throw new HiveException("Error, not a directory: " + folder); } FileStatus[] fileStatuses = fs.listStatus(folder); if (fileStatuses == null || fileStatuses.length == 0) { - return null; + return getDefaultEmptyContainer(keyContext, valueContext); } SerDe keySerDe = keyContext.getSerDe(); @@ -210,50 +214,51 @@ private void loadOptimized(MapJoinBytesTableContainer container, ObjectInputStre public MapJoinTableContainer loadFastContainer(MapJoinDesc mapJoinDesc, FileSystem fs, Path folder, Configuration hconf) throws HiveException { try { - if (!fs.isDirectory(folder)) { - throw new HiveException("Error, not a directory: " + folder); - } - FileStatus[] fileStatuses = fs.listStatus(folder); - if (fileStatuses == null || fileStatuses.length == 0) { - return null; - } - - SerDe keySerDe = keyContext.getSerDe(); - SerDe valueSerDe = valueContext.getSerDe(); - Writable key = keySerDe.getSerializedClass().newInstance(); - Writable value = valueSerDe.getSerializedClass().newInstance(); - VectorMapJoinFastTableContainer tableContainer = new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1); tableContainer.setSerde(keyContext, valueContext); - for (FileStatus fileStatus : fileStatuses) { - Path filePath = fileStatus.getPath(); - if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) { - throw new HiveException("Error, not a file: " + filePath); + if (fs.exists(folder)) { + if (!fs.isDirectory(folder)) { + throw new HiveException("Error, not a directory: " + folder); } - InputStream is = null; - ObjectInputStream in = null; - try { - is = fs.open(filePath, 4096); - in = new ObjectInputStream(is); - // skip the name and metadata - in.readUTF(); - in.readObject(); - int numKeys = in.readInt(); - for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { - key.readFields(in); - long numRows = in.readLong(); - for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) { - value.readFields(in); - tableContainer.putRow(key, value); + + FileStatus[] fileStatuses = fs.listStatus(folder); + if (fileStatuses != null && fileStatuses.length > 0) { + SerDe keySerDe = keyContext.getSerDe(); + SerDe valueSerDe = valueContext.getSerDe(); + Writable key = keySerDe.getSerializedClass().newInstance(); + Writable value = valueSerDe.getSerializedClass().newInstance(); + + for (FileStatus fileStatus : fileStatuses) { + Path filePath = fileStatus.getPath(); + if (ShimLoader.getHadoopShims().isDirectory(fileStatus)) { + throw new HiveException("Error, not a file: " + filePath); + } + InputStream is = null; + ObjectInputStream in = null; + try { + is = fs.open(filePath, 4096); + in = new ObjectInputStream(is); + // skip the name and metadata + in.readUTF(); + in.readObject(); + int numKeys = in.readInt(); + for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { + key.readFields(in); + long numRows = in.readLong(); + for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) { + value.readFields(in); + tableContainer.putRow(key, value); + } + } + } finally { + if (in != null) { + in.close(); + } else if (is != null) { + is.close(); + } } - } - } finally { - if (in != null) { - in.close(); - } else if (is != null) { - is.close(); } } } @@ -312,4 +317,13 @@ private MapJoinPersistableTableContainer create( throw new HiveException(msg, e); } } + + // Get an empty container when the small table is empty. + private static MapJoinTableContainer getDefaultEmptyContainer(MapJoinObjectSerDeContext keyCtx, + MapJoinObjectSerDeContext valCtx) throws SerDeException { + MapJoinTableContainer container = new HashMapWrapper(); + container.setSerde(keyCtx, valCtx); + container.seal(); + return container; + } } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index f5b1e48..e3b88d1 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -45,6 +45,7 @@ import org.apache.hive.spark.counter.SparkCounters; import org.apache.spark.JavaSparkListener; import org.apache.spark.SparkConf; +import org.apache.spark.SparkJobInfo; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.scheduler.SparkListenerJobEnd; @@ -323,18 +324,22 @@ public void call(JavaFutureAction future, private final BaseProtocol.JobRequest req; private final List> jobs; - private final AtomicInteger completed; + private final AtomicInteger jobEndReceived; + private int completed; private SparkCounters sparkCounters; private Set cachedRDDIds; + private Integer sparkJobId; private Future future; JobWrapper(BaseProtocol.JobRequest req) { this.req = req; this.jobs = Lists.newArrayList(); - this.completed = new AtomicInteger(); + completed = 0; + jobEndReceived = new AtomicInteger(0); this.sparkCounters = null; this.cachedRDDIds = null; + this.sparkJobId = null; } @Override @@ -351,11 +356,26 @@ public void call(JavaFutureAction future, }); T result = req.job.call(jc); - synchronized (completed) { - while (completed.get() < jobs.size()) { - LOG.debug("Client job {} finished, {} of {} Spark jobs finished.", - req.id, completed.get(), jobs.size()); - completed.wait(); + // In case the job is empty, there won't be JobStart/JobEnd events. The only way + // to know if the job has finished is to check the futures here ourselves. + for (JavaFutureAction future : jobs) { + future.get(); + completed++; + LOG.debug("Client job {}: {} of {} Spark jobs finished.", + req.id, completed, jobs.size()); + } + + // If the job is not empty (but runs fast), we have to wait until all the TaskEnd/JobEnd + // events are processed. Otherwise, task metrics may get lost. See HIVE-13525. + if (sparkJobId != null) { + SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + if (sparkJobInfo != null && sparkJobInfo.stageIds() != null && + sparkJobInfo.stageIds().length > 0) { + synchronized (jobEndReceived) { + while (jobEndReceived.get() < jobs.size()) { + jobEndReceived.wait(); + } + } } } @@ -363,11 +383,6 @@ public void call(JavaFutureAction future, if (sparkCounters != null) { counters = sparkCounters.snapshot(); } - // make sure job has really succeeded - // at this point, future.get shall not block us - for (JavaFutureAction future : jobs) { - future.get(); - } protocol.jobFinished(req.id, result, null, counters); } catch (Throwable t) { // Catch throwables in a best-effort to report job status back to the client. It's @@ -390,9 +405,9 @@ void submit() { } void jobDone() { - synchronized (completed) { - completed.incrementAndGet(); - completed.notifyAll(); + synchronized (jobEndReceived) { + jobEndReceived.incrementAndGet(); + jobEndReceived.notifyAll(); } } @@ -420,7 +435,8 @@ private void monitorJob(JavaFutureAction job, jc.getMonitoredJobs().get(req.id).add(job); this.sparkCounters = sparkCounters; this.cachedRDDIds = cachedRDDIds; - protocol.jobSubmitted(req.id, job.jobIds().get(0)); + sparkJobId = job.jobIds().get(0); + protocol.jobSubmitted(req.id, sparkJobId); } }