diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java index 6568a76..e551bea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java @@ -27,8 +27,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; +import java.util.Queue; /** * Base class for @@ -43,12 +45,12 @@ private final Iterator inputIterator; // Contains results from last processed input record. - private final HiveKVResultCache lastRecordOutput; + private final Queue> lastRecordOutput; private boolean iteratorAlreadyCreated = false; public HiveBaseFunctionResultList(Configuration conf, Iterator inputIterator) { this.inputIterator = inputIterator; - this.lastRecordOutput = new HiveKVResultCache(conf); + this.lastRecordOutput = new LinkedList>(); } @Override @@ -60,7 +62,7 @@ public Iterator iterator() { @Override public void collect(BytesWritable key, BytesWritable value) throws IOException { - lastRecordOutput.add(copyBytesWritable(key), copyBytesWritable(value)); + lastRecordOutput.add(new Tuple2(copyBytesWritable(key), copyBytesWritable(value))); } private static BytesWritable copyBytesWritable(BytesWritable bw) { @@ -83,7 +85,7 @@ private static BytesWritable copyBytesWritable(BytesWritable bw) { @Override public boolean hasNext(){ // Return remaining records (if any) from last processed input record. - if (lastRecordOutput.hasNext()) { + if (!lastRecordOutput.isEmpty()) { return true; } @@ -96,7 +98,7 @@ public boolean hasNext(){ while (inputIterator.hasNext() && !processingDone()) { try { processNextRecord(inputIterator.next()); - if (lastRecordOutput.hasNext()) { + if (!lastRecordOutput.isEmpty()) { return true; } } catch (IOException ex) { @@ -110,7 +112,7 @@ public boolean hasNext(){ // It is possible that some operators add records after closing the processor, so make sure // to check the lastRecordOutput - if (lastRecordOutput.hasNext()) { + if (!lastRecordOutput.isEmpty()) { return true; } @@ -121,7 +123,7 @@ public boolean hasNext(){ @Override public Tuple2 next() { if (hasNext()) { - return lastRecordOutput.next(); + return lastRecordOutput.remove(); } throw new NoSuchElementException("There are no more elements"); }