diff --git itests/pom.xml itests/pom.xml index a452db3..bf3febe 100644 --- itests/pom.xml +++ itests/pom.xml @@ -98,7 +98,7 @@ mv $BASE_DIR/spark-${spark.version}-bin-hadoop2-without-hive $BASE_DIR/$finalName } mkdir -p $DOWNLOAD_DIR - download "http://d3jw87u4immizc.cloudfront.net/spark-tarball/spark-${spark.version}-bin-hadoop2-without-hive.tgz" "spark" + download "http://blog.sundp.me/spark/spark-2.0.0-bin-hadoop2-without-hive.tgz" "spark" cp -f $HIVE_ROOT/data/conf/spark/log4j2.properties $BASE_DIR/spark/conf/ diff --git pom.xml pom.xml index 2fb78cd..a3be94e 100644 --- pom.xml +++ pom.xml @@ -145,7 +145,7 @@ 2.4.0 1.9.13 - 2.4.2 + 2.6.5 5.5.23 2.3.4 2.3.1 @@ -168,7 +168,7 @@ 2.3 1.9.5 2.0.0-M5 - 4.0.23.Final + 4.0.29.Final 1.8.1 0.16.0 2.5.0 @@ -178,9 +178,9 @@ 0.8.4 0.90.2-incubating 2.2.0 - 1.6.0 - 2.10 - 2.10.4 + 2.0.0 + 2.11 + 2.11.8 1.1 0.2 1.4 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java index 5b65036..0fc79f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java @@ -38,15 +38,14 @@ * through Iterator interface. */ @SuppressWarnings("rawtypes") -public abstract class HiveBaseFunctionResultList implements - Iterable, OutputCollector, Serializable { +public abstract class HiveBaseFunctionResultList + implements Iterator, OutputCollector, Serializable { private static final long serialVersionUID = -1L; private final Iterator inputIterator; private boolean isClosed = false; // Contains results from last processed input record. private final HiveKVResultCache lastRecordOutput; - private boolean iteratorAlreadyCreated = false; public HiveBaseFunctionResultList(Iterator inputIterator) { this.inputIterator = inputIterator; @@ -54,13 +53,6 @@ public HiveBaseFunctionResultList(Iterator inputIterator) { } @Override - public Iterator iterator() { - Preconditions.checkState(!iteratorAlreadyCreated, "Iterator can only be created once."); - iteratorAlreadyCreated = true; - return new ResultIterator(); - } - - @Override public void collect(HiveKey key, BytesWritable value) throws IOException { lastRecordOutput.add(SparkUtilities.copyHiveKey(key), SparkUtilities.copyBytesWritable(value)); @@ -77,57 +69,55 @@ public void collect(HiveKey key, BytesWritable value) throws IOException { /** Close the record processor. */ protected abstract void closeRecordProcessor(); - /** Implement Iterator interface. */ - public class ResultIterator implements Iterator { - @Override - public boolean hasNext(){ - // Return remaining records (if any) from last processed input record. - if (lastRecordOutput.hasNext()) { - return true; - } + @Override + public boolean hasNext() { + // Return remaining records (if any) from last processed input record. + if (lastRecordOutput.hasNext()) { + return true; + } - // Process the records in the input iterator until - // - new output records are available for serving downstream operator, - // - input records are exhausted or - // - processing is completed. - while (inputIterator.hasNext() && !processingDone()) { - try { - processNextRecord(inputIterator.next()); - if (lastRecordOutput.hasNext()) { - return true; - } - } catch (IOException ex) { - throw new IllegalStateException("Error while processing input.", ex); + // Process the records in the input iterator until + // - new output records are available for serving downstream operator, + // - input records are exhausted or + // - processing is completed. + while (inputIterator.hasNext() && !processingDone()) { + try { + processNextRecord(inputIterator.next()); + if (lastRecordOutput.hasNext()) { + return true; } + } catch (IOException ex) { + throw new IllegalStateException("Error while processing input.", ex); } + } - // At this point we are done processing the input. Close the record processor - if (!isClosed) { - closeRecordProcessor(); - isClosed = true; - } - - // It is possible that some operators add records after closing the processor, so make sure - // to check the lastRecordOutput - if (lastRecordOutput.hasNext()) { - return true; - } - - lastRecordOutput.clear(); - return false; + // At this point we are done processing the input. Close the record processor + if (!isClosed) { + closeRecordProcessor(); + isClosed = true; } - @Override - public Tuple2 next() { - if (hasNext()) { - return lastRecordOutput.next(); - } - throw new NoSuchElementException("There are no more elements"); + // It is possible that some operators add records after closing the processor, so make sure + // to check the lastRecordOutput + if (lastRecordOutput.hasNext()) { + return true; } - @Override - public void remove() { - throw new UnsupportedOperationException("Iterator.remove() is not supported"); + lastRecordOutput.clear(); + return false; + } + + @Override + public Tuple2 next() { + if (hasNext()) { + return lastRecordOutput.next(); } + throw new NoSuchElementException("There are no more elements"); } + + @Override + public void remove() { + throw new UnsupportedOperationException("Iterator.remove() is not supported"); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 53c5c0e..ff21a52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -38,7 +38,7 @@ public HiveMapFunction(byte[] jobConfBuffer, SparkReporter sparkReporter) { @SuppressWarnings("unchecked") @Override - public Iterable> + public Iterator> call(Iterator> it) throws Exception { initJobConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index f6595f1..eeb4443 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -36,7 +36,7 @@ public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) { @SuppressWarnings("unchecked") @Override - public Iterable> + public Iterator> call(Iterator>> it) throws Exception { initJobConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index a6350d3..997ab7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -75,60 +75,52 @@ public String getName() { private static final long serialVersionUID = 1L; @Override - public Iterable>> call( - final Iterator> it) throws Exception { + public Iterator>> call( + final Iterator> it) throws Exception { // Use input iterator to back returned iterable object. - final Iterator>> resultIt = - new Iterator>>() { - HiveKey curKey = null; - List curValues = new ArrayList(); + return new Iterator>>() { + HiveKey curKey = null; + List curValues = new ArrayList(); - @Override - public boolean hasNext() { - return it.hasNext() || curKey != null; - } + @Override + public boolean hasNext() { + return it.hasNext() || curKey != null; + } - @Override - public Tuple2> next() { - // TODO: implement this by accumulating rows with the same key into a list. - // Note that this list needs to improved to prevent excessive memory usage, but this - // can be done in later phase. - while (it.hasNext()) { - Tuple2 pair = it.next(); - if (curKey != null && !curKey.equals(pair._1())) { - HiveKey key = curKey; - List values = curValues; - curKey = pair._1(); - curValues = new ArrayList(); - curValues.add(pair._2()); - return new Tuple2>(key, values); - } - curKey = pair._1(); - curValues.add(pair._2()); - } - if (curKey == null) { - throw new NoSuchElementException(); - } - // if we get here, this should be the last element we have + @Override + public Tuple2> next() { + // TODO: implement this by accumulating rows with the same key into a list. + // Note that this list needs to improved to prevent excessive memory usage, but this + // can be done in later phase. + while (it.hasNext()) { + Tuple2 pair = it.next(); + if (curKey != null && !curKey.equals(pair._1())) { HiveKey key = curKey; - curKey = null; - return new Tuple2>(key, curValues); + List values = curValues; + curKey = pair._1(); + curValues = new ArrayList(); + curValues.add(pair._2()); + return new Tuple2>(key, values); } + curKey = pair._1(); + curValues.add(pair._2()); + } + if (curKey == null) { + throw new NoSuchElementException(); + } + // if we get here, this should be the last element we have + HiveKey key = curKey; + curKey = null; + return new Tuple2>(key, curValues); + } - @Override - public void remove() { - // Not implemented. - // throw Unsupported Method Invocation Exception. - throw new UnsupportedOperationException(); - } - - }; - - return new Iterable>>() { @Override - public Iterator>> iterator() { - return resultIt; + public void remove() { + // Not implemented. + // throw Unsupported Method Invocation Exception. + throw new UnsupportedOperationException(); } + }; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index 09c54c1..b48de3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -24,15 +24,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.JavaSparkListener; import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.SparkListenerTaskEnd; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -public class JobMetricsListener extends JavaSparkListener { +public class JobMetricsListener extends SparkListener { private static final Logger LOG = LoggerFactory.getLogger(JobMetricsListener.class); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java index ee9f9b7..7bb9c62 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java @@ -282,9 +282,8 @@ public void remove() { resultList.init(rows, threshold, separate, prefix1, prefix2); long startTime = System.currentTimeMillis(); - Iterator it = resultList.iterator(); - while (it.hasNext()) { - Object item = it.next(); + while (resultList.hasNext()) { + Object item = resultList.next(); if (output != null) { output.add((Tuple2)item); } diff --git ql/src/test/results/clientpositive/spark/bucket4.q.out ql/src/test/results/clientpositive/spark/bucket4.q.out index b1ef928..68f8143 100644 --- ql/src/test/results/clientpositive/spark/bucket4.q.out +++ ql/src/test/results/clientpositive/spark/bucket4.q.out @@ -54,7 +54,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -74,7 +74,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git ql/src/test/results/clientpositive/spark/bucket5.q.out ql/src/test/results/clientpositive/spark/bucket5.q.out index b5d8890..a78fae0 100644 --- ql/src/test/results/clientpositive/spark/bucket5.q.out +++ ql/src/test/results/clientpositive/spark/bucket5.q.out @@ -73,7 +73,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -93,7 +93,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -141,7 +141,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -161,7 +161,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git ql/src/test/results/clientpositive/spark/disable_merge_for_bucketing.q.out ql/src/test/results/clientpositive/spark/disable_merge_for_bucketing.q.out index 8cefe46..c8503cd 100644 --- ql/src/test/results/clientpositive/spark/disable_merge_for_bucketing.q.out +++ ql/src/test/results/clientpositive/spark/disable_merge_for_bucketing.q.out @@ -53,7 +53,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -73,7 +73,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.out ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.out index 2e5c8b4..f1ca6da 100644 --- ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.out +++ ql/src/test/results/clientpositive/spark/list_bucket_dml_10.q.out @@ -110,7 +110,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -130,7 +130,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out index 1fc9d28..b20e8fe 100644 --- ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out +++ ql/src/test/results/clientpositive/spark/reduce_deduplicate.q.out @@ -54,7 +54,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -74,7 +74,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index e77aa78..de81a66 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -177,11 +177,6 @@ private Metrics aggregate(Predicate filter) { if (m.inputMetrics != null) { hasInputMetrics = true; - if (readMethod == null) { - readMethod = m.inputMetrics.readMethod; - } else if (readMethod != m.inputMetrics.readMethod) { - readMethod = DataReadMethod.Multiple; - } bytesRead += m.inputMetrics.bytesRead; } @@ -201,7 +196,7 @@ private Metrics aggregate(Predicate filter) { InputMetrics inputMetrics = null; if (hasInputMetrics) { - inputMetrics = new InputMetrics(readMethod, bytesRead); + inputMetrics = new InputMetrics(bytesRead); } ShuffleReadMetrics shuffleReadMetrics = null; diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e3b88d1..ede8ce9 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -43,11 +43,11 @@ import org.apache.hive.spark.client.rpc.Rpc; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.counter.SparkCounters; -import org.apache.spark.JavaSparkListener; import org.apache.spark.SparkConf; import org.apache.spark.SparkJobInfo; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.SparkListenerTaskEnd; @@ -441,7 +441,7 @@ private void monitorJob(JavaFutureAction job, } - private class ClientListener extends JavaSparkListener { + private class ClientListener extends SparkListener { private final Map stageToJobId = Maps.newHashMap(); diff --git spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java index e46b67d..f137007 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java +++ spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java @@ -28,25 +28,20 @@ */ @InterfaceAudience.Private public class InputMetrics implements Serializable { - - public final DataReadMethod readMethod; public final long bytesRead; private InputMetrics() { // For Serialization only. - this(null, 0L); + this(0L); } public InputMetrics( - DataReadMethod readMethod, long bytesRead) { - this.readMethod = readMethod; this.bytesRead = bytesRead; } public InputMetrics(TaskMetrics metrics) { - this(DataReadMethod.valueOf(metrics.inputMetrics().get().readMethod().toString()), - metrics.inputMetrics().get().bytesRead()); + this(metrics.inputMetrics().bytesRead()); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index a7305cf..418d534 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -99,15 +99,15 @@ public Metrics(TaskMetrics metrics) { } private static InputMetrics optionalInputMetric(TaskMetrics metrics) { - return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null; + return (metrics.inputMetrics() != null) ? new InputMetrics(metrics) : null; } private static ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) { - return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null; + return (metrics.shuffleReadMetrics() != null) ? new ShuffleReadMetrics(metrics) : null; } private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) { - return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null; + return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null; } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java index be14c06..9ff4d0f 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java +++ spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java @@ -30,9 +30,9 @@ public class ShuffleReadMetrics implements Serializable { /** Number of remote blocks fetched in shuffles by tasks. */ - public final int remoteBlocksFetched; + public final long remoteBlocksFetched; /** Number of local blocks fetched in shuffles by tasks. */ - public final int localBlocksFetched; + public final long localBlocksFetched; /** * Time tasks spent waiting for remote shuffle blocks. This only includes the * time blocking on shuffle input data. For instance if block B is being @@ -49,8 +49,8 @@ private ShuffleReadMetrics() { } public ShuffleReadMetrics( - int remoteBlocksFetched, - int localBlocksFetched, + long remoteBlocksFetched, + long localBlocksFetched, long fetchWaitTime, long remoteBytesRead) { this.remoteBlocksFetched = remoteBlocksFetched; @@ -60,16 +60,16 @@ public ShuffleReadMetrics( } public ShuffleReadMetrics(TaskMetrics metrics) { - this(metrics.shuffleReadMetrics().get().remoteBlocksFetched(), - metrics.shuffleReadMetrics().get().localBlocksFetched(), - metrics.shuffleReadMetrics().get().fetchWaitTime(), - metrics.shuffleReadMetrics().get().remoteBytesRead()); + this(metrics.shuffleReadMetrics().remoteBlocksFetched(), + metrics.shuffleReadMetrics().localBlocksFetched(), + metrics.shuffleReadMetrics().fetchWaitTime(), + metrics.shuffleReadMetrics().remoteBytesRead()); } /** * Number of blocks fetched in shuffle by tasks (remote or local). */ - public int getTotalBlocksFetched() { + public long getTotalBlocksFetched() { return remoteBlocksFetched + localBlocksFetched; } diff --git spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java index 4420e4d..64a4b86 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java +++ spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java @@ -47,8 +47,8 @@ public ShuffleWriteMetrics( } public ShuffleWriteMetrics(TaskMetrics metrics) { - this(metrics.shuffleWriteMetrics().get().shuffleBytesWritten(), - metrics.shuffleWriteMetrics().get().shuffleWriteTime()); + this(metrics.shuffleWriteMetrics().shuffleBytesWritten(), + metrics.shuffleWriteMetrics().shuffleWriteTime()); } } diff --git spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 5146e91..8fef66b 100644 --- spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -95,22 +95,21 @@ public void testInputReadMethodAggregation() { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(DataReadMethod.Memory, value), null, null); + new InputMetrics(value), null, null); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(DataReadMethod.Disk, value), null, null); + new InputMetrics(value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); Metrics global = collection.getAllMetrics(); assertNotNull(global.inputMetrics); - assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod); } private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; return new Metrics(value, value, value, value, value, value, value, - new InputMetrics(DataReadMethod.Memory, value), + new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), new ShuffleWriteMetrics(value, value)); } @@ -156,7 +155,6 @@ private void checkMetrics(Metrics metrics, long expected) { assertEquals(expected, metrics.memoryBytesSpilled); assertEquals(expected, metrics.diskBytesSpilled); - assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod); assertEquals(expected, metrics.inputMetrics.bytesRead); assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);