diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 47db0c0..88146e4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3200,6 +3200,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE( "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size in dynamic pruning."), + SPARK_SHUFFLE_BUFFER_SIZE("hive.spark.shuffle.buffer.size", 128 * 1024 * 1024L, + "Maximum size for the in-memory buffer used in group by or sort by operations for Hive on Spark. " + + "If the input size exceeds this limit, the extra data will be spill to disk."), + NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBytesWritableCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBytesWritableCache.java new file mode 100644 index 0000000..0cb649a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBytesWritableCache.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import com.clearspring.analytics.util.Preconditions; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.io.BytesWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * A cache with fixed buffer size for {@link BytesWritable}s. If the buffer is full, + * new entries will be spill to disk. NOTE: this class is NOT thread safe. + * + * Use this class in the following pattern: + * + * + * HiveBytesWritableCache cache = new ... + * + * // Write entries to cache. May persist to disk. + * while (...) { + * cache.add(..); + * } + * + * // Done with writing. Start reading from cache. + * cache.startRead(); + * for (BytesWritable bw : cache) { + * ... + * } + * + * // Done with reading. Close and clear the cache. + * cache.close(); + * + */ +public class HiveBytesWritableCache implements Iterable { + private static final Logger LOG = LoggerFactory.getLogger(HiveBytesWritableCache.class.getName()); + + private final List buffer; + + // Indicate whether we have flushed any data to disk. + // If this is not set, then all data can be hold in a single buffer, and thus + // no need to flush to disk and initialize input & output. + private boolean flushed; + + private File parentFile; + private File tmpFile; + + // The maximum buffer size, in bytes. + // If this value is exceeded, the extra data will be spill to disk. + private final long maxBufferSize; + + // The current buffer size. If this exceeds `maxBufferSize`, spill to disk. + private long bufferSize; + + private Output output; + + HiveBytesWritableCache(long maxBufferSize) { + this.flushed = false; + this.buffer = new ArrayList<>(); + this.maxBufferSize = maxBufferSize; + this.bufferSize = 0; + } + + public void add(BytesWritable value) { + if (bufferSize > maxBufferSize) { + flushBuffer(); + } + buffer.add(value); + bufferSize += value.getCapacity(); + } + + /** + * Start reading from the cache. MUST be called before calling any + * of the iterator methods. + */ + public void startRead() { + if (flushed && !buffer.isEmpty()) { + flushBuffer(); + } + if (output != null) { + output.close(); + output = null; + } + } + + @Override + public Iterator iterator() { + return new BytesWritableIterator(buffer); + } + + /** + * Close this cache. Idempotent. + */ + public void close() { + flushed = false; + buffer.clear(); + bufferSize = 0; + + if (parentFile != null) { + if (output != null) { + try { + output.close(); + } catch (Throwable e) { + LOG.warn("Error when closing cache output.", e); + } + output = null; + } + FileUtil.fullyDelete(parentFile); + parentFile = null; + tmpFile = null; + } + } + + private void flushBuffer() { + // Initialize output temporary file if not already set + if (output == null) { + try { + setupOutput(); + } catch (IOException e) { + close(); + throw new RuntimeException("Error when setting up output stream.", e); + } + } + + try { + int i = 0; + for (; i < buffer.size(); i++) { + BytesWritable writable = buffer.get(i); + writeValue(output, writable); + } + buffer.clear(); + flushed = true; + bufferSize = 0; + } catch (Exception e) { + close(); + throw new RuntimeException("Error when spilling to disk", e); + } + } + + private void setupOutput() throws IOException { + Preconditions.checkState(parentFile == null && tmpFile == null); + while (true) { + parentFile = File.createTempFile("hive-resultcache", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + break; + } + LOG.debug("Retry creating tmp result-cache directory..."); + } + + tmpFile = File.createTempFile("ResultCache", ".tmp", parentFile); + LOG.info("ResultCache created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + + FileOutputStream fos; + fos = new FileOutputStream(tmpFile); + output = new Output(fos); + } + + private BytesWritable readValue(Input input) { + return new BytesWritable(input.readBytes(input.readInt())); + } + + private void writeValue(Output output, BytesWritable bytesWritable) { + int size = bytesWritable.getLength(); + output.writeInt(size); + output.writeBytes(bytesWritable.getBytes(), 0, size); + } + + private class BytesWritableIterator implements Iterator { + private Iterator it; + private Input input = null; + + BytesWritableIterator(List buffer) { + this.it = buffer.iterator(); + if (tmpFile != null) { + try { + FileInputStream fis = new FileInputStream(tmpFile); + input = new Input(fis); + } catch (IOException e) { + close(); + throw new RuntimeException("Error when setting up input stream for tmp file: " + tmpFile, e); + } + } + } + + @Override + public boolean hasNext() { + return it.hasNext() || hasMoreInput(); + } + + @Override + public BytesWritable next() { + if (!hasNext()) { + throw new NoSuchElementException("No more next"); + } + if (!it.hasNext()) { + loadBuffer(); + } + return it.next(); + } + + /** + * Whether there's more data to load from disk + */ + private boolean hasMoreInput() { + return input != null && !input.eof(); + } + + private void loadBuffer() { + long bufferSize = 0; + List buffer = new ArrayList<>(); + while (bufferSize < maxBufferSize) { + if (input.eof()) { + input.close(); + input = null; + break; + } + BytesWritable value = readValue(input); + buffer.add(value); + bufferSize += value.getCapacity(); + } + it = buffer.iterator(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 997ab7e..8f96666 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -27,19 +27,22 @@ import org.apache.spark.storage.StorageLevel; import scala.Tuple2; -import java.util.*; +import java.util.Iterator; +import java.util.NoSuchElementException; -public class SortByShuffler implements SparkShuffler { +public class SortByShuffler implements SparkShuffler { private final boolean totalOrder; private final SparkPlan sparkPlan; + private final long maxBufferSize; /** * @param totalOrder whether this shuffler provides total order shuffle. */ - public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { + public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, long maxBufferSize) { this.totalOrder = totalOrder; this.sparkPlan = sparkPlan; + this.maxBufferSize = maxBufferSize; } @Override @@ -60,7 +63,7 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } - return rdd.mapPartitionsToPair(new ShuffleFunction()); + return rdd.mapPartitionsToPair(new ShuffleFunction(maxBufferSize)); } @Override @@ -73,14 +76,19 @@ public String getName() { HiveKey, Iterable> { // make eclipse happy private static final long serialVersionUID = 1L; + private final long maxBufferSize; + + ShuffleFunction(long maxBufferSize) { + this.maxBufferSize = maxBufferSize; + } @Override public Iterator>> call( final Iterator> it) throws Exception { - // Use input iterator to back returned iterable object. + return new Iterator>>() { HiveKey curKey = null; - List curValues = new ArrayList(); + HiveBytesWritableCache curValues = new HiveBytesWritableCache(maxBufferSize); @Override public boolean hasNext() { @@ -89,16 +97,17 @@ public boolean hasNext() { @Override public Tuple2> next() { - // TODO: implement this by accumulating rows with the same key into a list. - // Note that this list needs to improved to prevent excessive memory usage, but this - // can be done in later phase. + if (!hasNext()) { + throw new NoSuchElementException("No more next"); + } while (it.hasNext()) { Tuple2 pair = it.next(); if (curKey != null && !curKey.equals(pair._1())) { HiveKey key = curKey; - List values = curValues; + HiveBytesWritableCache values = curValues; + values.startRead(); curKey = pair._1(); - curValues = new ArrayList(); + curValues = new HiveBytesWritableCache(maxBufferSize); curValues.add(pair._2()); return new Tuple2>(key, values); } @@ -111,13 +120,12 @@ public boolean hasNext() { // if we get here, this should be the last element we have HiveKey key = curKey; curKey = null; + curValues.startRead(); return new Tuple2>(key, curValues); } @Override public void remove() { - // Not implemented. - // throw Unsupported Method Invocation Exception. throw new UnsupportedOperationException(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 66ffe5d..1512b74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -213,11 +213,12 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) { Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); + long maxBufferSize = Long.parseLong(this.jobConf.get(HiveConf.ConfVars.SPARK_SHUFFLE_BUFFER_SIZE.varname)); SparkShuffler shuffler; if (edge.isMRShuffle()) { - shuffler = new SortByShuffler(false, sparkPlan); + shuffler = new SortByShuffler(false, sparkPlan, maxBufferSize); } else if (edge.isShuffleSort()) { - shuffler = new SortByShuffler(true, sparkPlan); + shuffler = new SortByShuffler(true, sparkPlan, maxBufferSize); } else { shuffler = new GroupByShuffler(); }