diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 997ab7e..22fee5e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -80,7 +80,7 @@ public String getName() { // Use input iterator to back returned iterable object. return new Iterator>>() { HiveKey curKey = null; - List curValues = new ArrayList(); + HiveKVResultCache curValues = new HiveKVResultCache(); @Override public boolean hasNext() { @@ -89,21 +89,18 @@ public boolean hasNext() { @Override public Tuple2> next() { - // TODO: implement this by accumulating rows with the same key into a list. - // Note that this list needs to improved to prevent excessive memory usage, but this - // can be done in later phase. while (it.hasNext()) { Tuple2 pair = it.next(); if (curKey != null && !curKey.equals(pair._1())) { HiveKey key = curKey; - List values = curValues; + HiveKVResultCache values = curValues; curKey = pair._1(); - curValues = new ArrayList(); - curValues.add(pair._2()); - return new Tuple2>(key, values); + curValues = new HiveKVResultCache(); + curValues.add(pair._1(), pair._2()); + return new Tuple2>(key, new KeyGroupValueIterable(values)); } curKey = pair._1(); - curValues.add(pair._2()); + curValues.add(pair._1(), pair._2()); } if (curKey == null) { throw new NoSuchElementException(); @@ -111,7 +108,7 @@ public boolean hasNext() { // if we get here, this should be the last element we have HiveKey key = curKey; curKey = null; - return new Tuple2>(key, curValues); + return new Tuple2>(key, new KeyGroupValueIterable(curValues)); } @Override @@ -126,3 +123,34 @@ public void remove() { } } + +class KeyGroupValueIterable implements Iterable { + private final HiveKVResultCache cache; + + public KeyGroupValueIterable(HiveKVResultCache cache) { + this.cache = cache; + } + + @Override + public Iterator iterator() { + return new Iterator() { + + @Override + public boolean hasNext() { + return cache.hasNext(); + } + + @Override + public BytesWritable next() { + return cache.next()._2(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + +}