diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 997ab7e..65e348c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -77,41 +77,53 @@ public String getName() { @Override public Iterator>> call( final Iterator> it) throws Exception { - // Use input iterator to back returned iterable object. + // Use input iterator to back returned iterable object as well as the iterable value object. + // Since two iterables are both backed by a single iterator, it's expected that for each + // tuple in the outer iterator, the value iterator must be consumed before moving on to the + // next tupe in the outer iterator. + final PeekableIterator peekableIt = new PeekableIterator(it); return new Iterator>>() { - HiveKey curKey = null; - List curValues = new ArrayList(); @Override public boolean hasNext() { - return it.hasNext() || curKey != null; + return peekableIt.hasNext(); + // This should be fine since it's expected that when this is called, peekableIt should have + // a new key at the beginning or nothing left at all. } @Override public Tuple2> next() { - // TODO: implement this by accumulating rows with the same key into a list. - // Note that this list needs to improved to prevent excessive memory usage, but this - // can be done in later phase. - while (it.hasNext()) { - Tuple2 pair = it.next(); - if (curKey != null && !curKey.equals(pair._1())) { - HiveKey key = curKey; - List values = curValues; - curKey = pair._1(); - curValues = new ArrayList(); - curValues.add(pair._2()); - return new Tuple2>(key, values); + Tuple2 pair = peekableIt.next(); + final HiveKey key = pair._1(); + + final Iterator values = new Iterator() { + + @Override + public boolean hasNext() { + Tuple2 nextPair = peekableIt.peek(); + return nextPair != null && key.equals(nextPair._1()); + } + + @Override + public BytesWritable next() { + return peekableIt.next()._2(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); } - curKey = pair._1(); - curValues.add(pair._2()); - } - if (curKey == null) { - throw new NoSuchElementException(); - } - // if we get here, this should be the last element we have - HiveKey key = curKey; - curKey = null; - return new Tuple2>(key, curValues); + + }; + + return new Tuple2>(key, new Iterable() { + + @Override + public Iterator iterator() { + return values; + } + + }); } @Override @@ -126,3 +138,31 @@ public void remove() { } } + +class PeekableIterator implements Iterator> { + private Iterator> it; + Tuple2 next = null; + + public PeekableIterator(Iterator> it) { + this.it = it; + next = it.hasNext() ? it.next() : null; + } + + @Override + public boolean hasNext() { + return (next != null); + + } + + @Override + public Tuple2 next() { + Tuple2 current = next; + next = (it.hasNext() ? it.next() : null); + return current; + } + + public Tuple2 peek() { + return next; + } + +}