diff --git a/pom.xml b/pom.xml
index 2fb78cd..a3be94e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,7 +145,7 @@
2.4.0
1.9.13
- 2.4.2
+ 2.6.5
5.5.23
2.3.4
2.3.1
@@ -168,7 +168,7 @@
2.3
1.9.5
2.0.0-M5
- 4.0.23.Final
+ 4.0.29.Final
1.8.1
0.16.0
2.5.0
@@ -178,9 +178,9 @@
0.8.4
0.90.2-incubating
2.2.0
- 1.6.0
- 2.10
- 2.10.4
+ 2.0.0
+ 2.11
+ 2.11.8
1.1
0.2
1.4
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
index 5b65036..0fc79f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
@@ -38,15 +38,14 @@
* through Iterator interface.
*/
@SuppressWarnings("rawtypes")
-public abstract class HiveBaseFunctionResultList implements
- Iterable, OutputCollector, Serializable {
+public abstract class HiveBaseFunctionResultList
+ implements Iterator, OutputCollector, Serializable {
private static final long serialVersionUID = -1L;
private final Iterator inputIterator;
private boolean isClosed = false;
// Contains results from last processed input record.
private final HiveKVResultCache lastRecordOutput;
- private boolean iteratorAlreadyCreated = false;
public HiveBaseFunctionResultList(Iterator inputIterator) {
this.inputIterator = inputIterator;
@@ -54,13 +53,6 @@ public HiveBaseFunctionResultList(Iterator inputIterator) {
}
@Override
- public Iterator iterator() {
- Preconditions.checkState(!iteratorAlreadyCreated, "Iterator can only be created once.");
- iteratorAlreadyCreated = true;
- return new ResultIterator();
- }
-
- @Override
public void collect(HiveKey key, BytesWritable value) throws IOException {
lastRecordOutput.add(SparkUtilities.copyHiveKey(key),
SparkUtilities.copyBytesWritable(value));
@@ -77,57 +69,55 @@ public void collect(HiveKey key, BytesWritable value) throws IOException {
/** Close the record processor. */
protected abstract void closeRecordProcessor();
- /** Implement Iterator interface. */
- public class ResultIterator implements Iterator {
- @Override
- public boolean hasNext(){
- // Return remaining records (if any) from last processed input record.
- if (lastRecordOutput.hasNext()) {
- return true;
- }
+ @Override
+ public boolean hasNext() {
+ // Return remaining records (if any) from last processed input record.
+ if (lastRecordOutput.hasNext()) {
+ return true;
+ }
- // Process the records in the input iterator until
- // - new output records are available for serving downstream operator,
- // - input records are exhausted or
- // - processing is completed.
- while (inputIterator.hasNext() && !processingDone()) {
- try {
- processNextRecord(inputIterator.next());
- if (lastRecordOutput.hasNext()) {
- return true;
- }
- } catch (IOException ex) {
- throw new IllegalStateException("Error while processing input.", ex);
+ // Process the records in the input iterator until
+ // - new output records are available for serving downstream operator,
+ // - input records are exhausted or
+ // - processing is completed.
+ while (inputIterator.hasNext() && !processingDone()) {
+ try {
+ processNextRecord(inputIterator.next());
+ if (lastRecordOutput.hasNext()) {
+ return true;
}
+ } catch (IOException ex) {
+ throw new IllegalStateException("Error while processing input.", ex);
}
+ }
- // At this point we are done processing the input. Close the record processor
- if (!isClosed) {
- closeRecordProcessor();
- isClosed = true;
- }
-
- // It is possible that some operators add records after closing the processor, so make sure
- // to check the lastRecordOutput
- if (lastRecordOutput.hasNext()) {
- return true;
- }
-
- lastRecordOutput.clear();
- return false;
+ // At this point we are done processing the input. Close the record processor
+ if (!isClosed) {
+ closeRecordProcessor();
+ isClosed = true;
}
- @Override
- public Tuple2 next() {
- if (hasNext()) {
- return lastRecordOutput.next();
- }
- throw new NoSuchElementException("There are no more elements");
+ // It is possible that some operators add records after closing the processor, so make sure
+ // to check the lastRecordOutput
+ if (lastRecordOutput.hasNext()) {
+ return true;
}
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Iterator.remove() is not supported");
+ lastRecordOutput.clear();
+ return false;
+ }
+
+ @Override
+ public Tuple2 next() {
+ if (hasNext()) {
+ return lastRecordOutput.next();
}
+ throw new NoSuchElementException("There are no more elements");
}
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Iterator.remove() is not supported");
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
index 53c5c0e..ff21a52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
@@ -38,7 +38,7 @@ public HiveMapFunction(byte[] jobConfBuffer, SparkReporter sparkReporter) {
@SuppressWarnings("unchecked")
@Override
- public Iterable>
+ public Iterator>
call(Iterator> it) throws Exception {
initJobConf();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
index f6595f1..eeb4443 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
@@ -36,7 +36,7 @@ public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) {
@SuppressWarnings("unchecked")
@Override
- public Iterable>
+ public Iterator>
call(Iterator>> it) throws Exception {
initJobConf();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index a6350d3..997ab7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -75,60 +75,52 @@ public String getName() {
private static final long serialVersionUID = 1L;
@Override
- public Iterable>> call(
- final Iterator> it) throws Exception {
+ public Iterator>> call(
+ final Iterator> it) throws Exception {
// Use input iterator to back returned iterable object.
- final Iterator>> resultIt =
- new Iterator>>() {
- HiveKey curKey = null;
- List curValues = new ArrayList();
+ return new Iterator>>() {
+ HiveKey curKey = null;
+ List curValues = new ArrayList();
- @Override
- public boolean hasNext() {
- return it.hasNext() || curKey != null;
- }
+ @Override
+ public boolean hasNext() {
+ return it.hasNext() || curKey != null;
+ }
- @Override
- public Tuple2> next() {
- // TODO: implement this by accumulating rows with the same key into a list.
- // Note that this list needs to improved to prevent excessive memory usage, but this
- // can be done in later phase.
- while (it.hasNext()) {
- Tuple2 pair = it.next();
- if (curKey != null && !curKey.equals(pair._1())) {
- HiveKey key = curKey;
- List values = curValues;
- curKey = pair._1();
- curValues = new ArrayList();
- curValues.add(pair._2());
- return new Tuple2>(key, values);
- }
- curKey = pair._1();
- curValues.add(pair._2());
- }
- if (curKey == null) {
- throw new NoSuchElementException();
- }
- // if we get here, this should be the last element we have
+ @Override
+ public Tuple2> next() {
+ // TODO: implement this by accumulating rows with the same key into a list.
+ // Note that this list needs to improved to prevent excessive memory usage, but this
+ // can be done in later phase.
+ while (it.hasNext()) {
+ Tuple2 pair = it.next();
+ if (curKey != null && !curKey.equals(pair._1())) {
HiveKey key = curKey;
- curKey = null;
- return new Tuple2>(key, curValues);
+ List values = curValues;
+ curKey = pair._1();
+ curValues = new ArrayList();
+ curValues.add(pair._2());
+ return new Tuple2>(key, values);
}
+ curKey = pair._1();
+ curValues.add(pair._2());
+ }
+ if (curKey == null) {
+ throw new NoSuchElementException();
+ }
+ // if we get here, this should be the last element we have
+ HiveKey key = curKey;
+ curKey = null;
+ return new Tuple2>(key, curValues);
+ }
- @Override
- public void remove() {
- // Not implemented.
- // throw Unsupported Method Invocation Exception.
- throw new UnsupportedOperationException();
- }
-
- };
-
- return new Iterable>>() {
@Override
- public Iterator>> iterator() {
- return resultIt;
+ public void remove() {
+ // Not implemented.
+ // throw Unsupported Method Invocation Exception.
+ throw new UnsupportedOperationException();
}
+
};
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
index 09c54c1..b48de3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
@@ -24,15 +24,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.spark.JavaSparkListener;
import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-public class JobMetricsListener extends JavaSparkListener {
+public class JobMetricsListener extends SparkListener {
private static final Logger LOG = LoggerFactory.getLogger(JobMetricsListener.class);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
index ee9f9b7..7bb9c62 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java
@@ -282,9 +282,8 @@ public void remove() {
resultList.init(rows, threshold, separate, prefix1, prefix2);
long startTime = System.currentTimeMillis();
- Iterator it = resultList.iterator();
- while (it.hasNext()) {
- Object item = it.next();
+ while (resultList.hasNext()) {
+ Object item = resultList.next();
if (output != null) {
output.add((Tuple2)item);
}
diff --git a/spark-client/pom.xml b/spark-client/pom.xml
index 6cf3b17..543c3e2 100644
--- a/spark-client/pom.xml
+++ b/spark-client/pom.xml
@@ -33,7 +33,6 @@
..
- 2.10
true
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index e77aa78..0f03a64 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -151,7 +151,6 @@ private Metrics aggregate(Predicate filter) {
// Input metrics.
boolean hasInputMetrics = false;
- DataReadMethod readMethod = null;
long bytesRead = 0L;
// Shuffle read metrics.
@@ -177,11 +176,6 @@ private Metrics aggregate(Predicate filter) {
if (m.inputMetrics != null) {
hasInputMetrics = true;
- if (readMethod == null) {
- readMethod = m.inputMetrics.readMethod;
- } else if (readMethod != m.inputMetrics.readMethod) {
- readMethod = DataReadMethod.Multiple;
- }
bytesRead += m.inputMetrics.bytesRead;
}
@@ -201,7 +195,7 @@ private Metrics aggregate(Predicate filter) {
InputMetrics inputMetrics = null;
if (hasInputMetrics) {
- inputMetrics = new InputMetrics(readMethod, bytesRead);
+ inputMetrics = new InputMetrics(bytesRead);
}
ShuffleReadMetrics shuffleReadMetrics = null;
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index e3b88d1..ede8ce9 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -43,11 +43,11 @@
import org.apache.hive.spark.client.rpc.Rpc;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.counter.SparkCounters;
-import org.apache.spark.JavaSparkListener;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
@@ -441,7 +441,7 @@ private void monitorJob(JavaFutureAction> job,
}
- private class ClientListener extends JavaSparkListener {
+ private class ClientListener extends SparkListener {
private final Map stageToJobId = Maps.newHashMap();
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
index e46b67d..f137007 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
@@ -28,25 +28,20 @@
*/
@InterfaceAudience.Private
public class InputMetrics implements Serializable {
-
- public final DataReadMethod readMethod;
public final long bytesRead;
private InputMetrics() {
// For Serialization only.
- this(null, 0L);
+ this(0L);
}
public InputMetrics(
- DataReadMethod readMethod,
long bytesRead) {
- this.readMethod = readMethod;
this.bytesRead = bytesRead;
}
public InputMetrics(TaskMetrics metrics) {
- this(DataReadMethod.valueOf(metrics.inputMetrics().get().readMethod().toString()),
- metrics.inputMetrics().get().bytesRead());
+ this(metrics.inputMetrics().bytesRead());
}
}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index a7305cf..418d534 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -99,15 +99,15 @@ public Metrics(TaskMetrics metrics) {
}
private static InputMetrics optionalInputMetric(TaskMetrics metrics) {
- return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null;
+ return (metrics.inputMetrics() != null) ? new InputMetrics(metrics) : null;
}
private static ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) {
- return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null;
+ return (metrics.shuffleReadMetrics() != null) ? new ShuffleReadMetrics(metrics) : null;
}
private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) {
- return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null;
+ return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
}
}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
index be14c06..9ff4d0f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
@@ -30,9 +30,9 @@
public class ShuffleReadMetrics implements Serializable {
/** Number of remote blocks fetched in shuffles by tasks. */
- public final int remoteBlocksFetched;
+ public final long remoteBlocksFetched;
/** Number of local blocks fetched in shuffles by tasks. */
- public final int localBlocksFetched;
+ public final long localBlocksFetched;
/**
* Time tasks spent waiting for remote shuffle blocks. This only includes the
* time blocking on shuffle input data. For instance if block B is being
@@ -49,8 +49,8 @@ private ShuffleReadMetrics() {
}
public ShuffleReadMetrics(
- int remoteBlocksFetched,
- int localBlocksFetched,
+ long remoteBlocksFetched,
+ long localBlocksFetched,
long fetchWaitTime,
long remoteBytesRead) {
this.remoteBlocksFetched = remoteBlocksFetched;
@@ -60,16 +60,16 @@ public ShuffleReadMetrics(
}
public ShuffleReadMetrics(TaskMetrics metrics) {
- this(metrics.shuffleReadMetrics().get().remoteBlocksFetched(),
- metrics.shuffleReadMetrics().get().localBlocksFetched(),
- metrics.shuffleReadMetrics().get().fetchWaitTime(),
- metrics.shuffleReadMetrics().get().remoteBytesRead());
+ this(metrics.shuffleReadMetrics().remoteBlocksFetched(),
+ metrics.shuffleReadMetrics().localBlocksFetched(),
+ metrics.shuffleReadMetrics().fetchWaitTime(),
+ metrics.shuffleReadMetrics().remoteBytesRead());
}
/**
* Number of blocks fetched in shuffle by tasks (remote or local).
*/
- public int getTotalBlocksFetched() {
+ public long getTotalBlocksFetched() {
return remoteBlocksFetched + localBlocksFetched;
}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
index 4420e4d..64a4b86 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
@@ -47,8 +47,8 @@ public ShuffleWriteMetrics(
}
public ShuffleWriteMetrics(TaskMetrics metrics) {
- this(metrics.shuffleWriteMetrics().get().shuffleBytesWritten(),
- metrics.shuffleWriteMetrics().get().shuffleWriteTime());
+ this(metrics.shuffleWriteMetrics().shuffleBytesWritten(),
+ metrics.shuffleWriteMetrics().shuffleWriteTime());
}
}
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index 5146e91..8fef66b 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -95,22 +95,21 @@ public void testInputReadMethodAggregation() {
long value = taskValue(1, 1, 1);
Metrics metrics1 = new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(DataReadMethod.Memory, value), null, null);
+ new InputMetrics(value), null, null);
Metrics metrics2 = new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(DataReadMethod.Disk, value), null, null);
+ new InputMetrics(value), null, null);
collection.addMetrics(1, 1, 1, metrics1);
collection.addMetrics(1, 1, 2, metrics2);
Metrics global = collection.getAllMetrics();
assertNotNull(global.inputMetrics);
- assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod);
}
private Metrics makeMetrics(int jobId, int stageId, long taskId) {
long value = 1000000 * jobId + 1000 * stageId + taskId;
return new Metrics(value, value, value, value, value, value, value,
- new InputMetrics(DataReadMethod.Memory, value),
+ new InputMetrics(value),
new ShuffleReadMetrics((int) value, (int) value, value, value),
new ShuffleWriteMetrics(value, value));
}
@@ -156,7 +155,6 @@ private void checkMetrics(Metrics metrics, long expected) {
assertEquals(expected, metrics.memoryBytesSpilled);
assertEquals(expected, metrics.diskBytesSpilled);
- assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod);
assertEquals(expected, metrics.inputMetrics.bytesRead);
assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);