diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index af25dc8..4d314b7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -21,7 +21,9 @@ import java.io.Serializable; import java.util.ArrayDeque; import java.util.Deque; +import java.util.Iterator; import java.util.List; +import java.util.Stack; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -179,8 +181,8 @@ protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException { } protected void processInputPartition() throws HiveException { - PTFPartition outPart = executeChain(inputPart); - PTFPartitionIterator pItr = outPart.iterator(); + Iterator pItr = executeChain(inputPart); + while (pItr.hasNext()) { Object oRow = pItr.next(); forward(oRow, outputObjInspector); @@ -189,8 +191,11 @@ protected void processInputPartition() throws HiveException { protected void processMapFunction() throws HiveException { PartitionedTableFunctionDef tDef = conf.getStartOfChain(); - PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart); - PTFPartitionIterator pItr = outPart.iterator(); + + Iterator pItr = tDef.getTFunction().canIterateOutput() ? + tDef.getTFunction().transformRawInputIterator(inputPart.iterator()) : + tDef.getTFunction().transformRawInput(inputPart).iterator(); + while (pItr.hasNext()) { Object oRow = pItr.next(); forward(oRow, outputObjInspector); @@ -225,9 +230,9 @@ public OperatorType getType() { * @return * @throws HiveException */ - private PTFPartition executeChain(PTFPartition part) + private Iterator executeChain(PTFPartition part) throws HiveException { - Deque fnDefs = new ArrayDeque(); + Stack fnDefs = new Stack(); PTFInputDef iDef = conf.getFuncDef(); while (iDef instanceof PartitionedTableFunctionDef) { @@ -236,11 +241,21 @@ private PTFPartition executeChain(PTFPartition part) } PartitionedTableFunctionDef currFnDef; - while (!fnDefs.isEmpty()) { + int i = fnDefs.size(); + while (i > 1) { currFnDef = fnDefs.pop(); part = currFnDef.getTFunction().execute(part); + i--; } - return part; + + currFnDef = fnDefs.pop(); + if (!currFnDef.getTFunction().canIterateOutput()) { + part = currFnDef.getTFunction().execute(part); + return part.iterator(); + } else { + return currFnDef.getTFunction().iterator(part.iterator()); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java index ac052cd..34aebf0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java @@ -137,6 +137,7 @@ public PTFDesc translate(PTFInvocationSpec qSpec, ptfDesc.setCfg(hCfg); ptfDesc.setLlInfo(llInfo); translatePTFChain(); + PTFDeserializer.alterOutputOIForStreaming(ptfDesc); return ptfDesc; } @@ -222,6 +223,8 @@ public PTFDesc translate(WindowingSpec wdwSpec, SemanticAnalyzer semAly, HiveCon tFn.setupOutputOI(); + PTFDeserializer.alterOutputOIForStreaming(ptfDesc); + return ptfDesc; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java index 154f29a..217db4e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java @@ -95,6 +95,8 @@ public void initializePTFChain(PartitionedTableFunctionDef tblFnDef) throws Hive initialize((PartitionedTableFunctionDef) currentDef); } } + + PTFDeserializer.alterOutputOIForStreaming(ptfDesc); } public void initializeWindowing(WindowTableFunctionDef def) throws HiveException { @@ -331,4 +333,17 @@ public static void addOIPropertiestoSerDePropsMap(StructObjectInspector OI, {fnames, fields}; } + /* + * If the final PTF in a PTFChain can stream its output, then set the OI of its OutputShape + * to the OI returned by the TableFunctionEvaluator. + */ + public static void alterOutputOIForStreaming(PTFDesc ptfDesc) { + PartitionedTableFunctionDef tDef = ptfDesc.getFuncDef(); + TableFunctionEvaluator tEval = tDef.getTFunction(); + + if ( tEval.canIterateOutput() ) { + tDef.getOutputShape().setOI(tEval.getOutputOI()); + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java index 080fd44..1087bbf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.udf.ptf; +import java.util.Iterator; + import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; @@ -138,6 +140,37 @@ protected PTFPartition _transformRawInput(PTFPartition iPart) throws HiveExcepti return null; } + + /* + * A TableFunction may be able to provide its Output as an Iterator. + * In case it can then for Map-side processing and for the last PTF in a Reduce-side chain + * we can forward rows one by one. This will save the time/space to populate and read an Output + * Partition. + */ + public boolean canIterateOutput() { + return false; + } + + public Iterator iterator(PTFPartitionIterator pItr) throws HiveException { + if (!canIterateOutput()) { + throw new HiveException( + "Internal error: iterator called on a PTF that cannot provide its output as an Iterator"); + } + throw new HiveException(String.format( + "Internal error: PTF %s, provides no iterator method", + getClass().getName())); + } + + public Iterator transformRawInputIterator(PTFPartitionIterator pItr) throws HiveException { + if (!canIterateOutput()) { + throw new HiveException( + "Internal error: iterator called on a PTF that cannot provide its output as an Iterator"); + } + throw new HiveException(String.format( + "Internal error: PTF %s, provides no iterator method", + getClass().getName())); + } + public void close() { if (outputPartition != null) { outputPartition.close(); diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java index 110ef27..de511f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java @@ -20,8 +20,10 @@ import java.util.AbstractList; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.PTFPartition; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; @@ -64,20 +66,7 @@ public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws boolean processWindow = processWindow(wFn); pItr.reset(); if ( !processWindow ) { - GenericUDAFEvaluator fEval = wFn.getWFnEval(); - Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; - AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer(); - while(pItr.hasNext()) { - Object row = pItr.next(); - int i =0; - if ( wFn.getArgs() != null ) { - for(PTFExpressionDef arg : wFn.getArgs()) { - args[i++] = arg.getExprEvaluator().evaluate(row); - } - } - fEval.aggregate(aggBuffer, args); - } - Object out = fEval.evaluate(aggBuffer); + Object out = evaluateWindowFunction(wFn, pItr); if ( !wFn.isPivotResult()) { out = new SameList(iPart.size(), out); } @@ -109,6 +98,28 @@ public void execute(PTFPartitionIterator pItr, PTFPartition outP) throws } } + Object evaluateWindowFunction(WindowFunctionDef wFn, + PTFPartitionIterator pItr) throws HiveException { + GenericUDAFEvaluator fEval = wFn.getWFnEval(); + Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()]; + AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer(); + while(pItr.hasNext()) + { + Object row = pItr.next(); + int i =0; + if ( wFn.getArgs() != null ) { + for(PTFExpressionDef arg : wFn.getArgs()) + { + args[i++] = arg.getExprEvaluator().evaluate(row); + } + } + fEval.aggregate(aggBuffer, args); + } + Object out = fEval.evaluate(aggBuffer); + out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI()); + return out; + } + private boolean processWindow(WindowFunctionDef wFn) { WindowFrameDef frame = wFn.getWindowFrame(); if ( frame == null ) { @@ -121,6 +132,54 @@ private boolean processWindow(WindowFunctionDef wFn) { return true; } + @Override + public boolean canIterateOutput() { + return true; + } + + @SuppressWarnings("rawtypes") + @Override + public Iterator iterator(PTFPartitionIterator pItr) throws HiveException { + WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef(); + ArrayList output = new ArrayList(); + List[] outputFromPivotFunctions = new List[wTFnDef.getWindowFunctions().size()]; + ArrayList wFnsWithWindows = new ArrayList(); + PTFPartition iPart = pItr.getPartition(); + + int i=0; + for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) { + boolean processWindow = processWindow(wFn); + pItr.reset(); + if ( !processWindow && !wFn.isPivotResult() ) { + Object out = evaluateWindowFunction(wFn, pItr); + output.add(out); + } else if (wFn.isPivotResult()) { + /* + * for functions that currently return the output as a List, + * for e.g. the ranking functions, lead/lag, ntile, cume_dist + * - for now continue to execute them here. The functions need to provide a way to get + * each output row as we are iterating through the input. This is relative + * easy to do for ranking functions; not possible for lead, ntile, cume_dist. + * + */ + outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr); + output.add(null); + } else { + output.add(null); + wFnsWithWindows.add(i); + } + i++; + } + + i=0; + for(i=0; i < iPart.getOutputOI().getAllStructFieldRefs().size(); i++) { + output.add(null); + } + + return new WindowingIterator(iPart, output, outputFromPivotFunctions, + ArrayUtils.toPrimitive(wFnsWithWindows.toArray(new Integer[wFnsWithWindows.size()]))); + } + public static class WindowingTableFunctionResolver extends TableFunctionResolver { /* @@ -193,27 +252,11 @@ public boolean carryForwardNames() { Order order) throws HiveException { ArrayList vals = new ArrayList(); - - GenericUDAFEvaluator fEval = wFnDef.getWFnEval(); - - Object[] args = new Object[wFnDef.getArgs() == null ? 0 : wFnDef.getArgs().size()]; for(int i=0; i < iPart.size(); i++) { - AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer(); Range rng = getRange(wFnDef, i, iPart, order); PTFPartitionIterator rItr = rng.iterator(); PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); - while(rItr.hasNext()) { - Object row = rItr.next(); - int j = 0; - if ( wFnDef.getArgs() != null ) { - for(PTFExpressionDef arg : wFnDef.getArgs()) { - args[j++] = arg.getExprEvaluator().evaluate(row); - } - } - fEval.aggregate(aggBuffer, args); - } - Object out = fEval.evaluate(aggBuffer); - out = ObjectInspectorUtils.copyToStandardObject(out, wFnDef.getOI()); + Object out = evaluateWindowFunction(wFnDef, rItr); vals.add(out); } return vals; @@ -792,4 +835,77 @@ public int size() { } + public class WindowingIterator implements Iterator { + + ArrayList output; + List[] outputFromPivotFunctions; + int currIdx; + PTFPartition iPart; + /* + * these are the functions that have a Window. + * Fns w/o a Window have already been processed. + */ + int[] wFnsToProcess; + WindowTableFunctionDef wTFnDef; + Order order; + PTFDesc ptfDesc; + StructObjectInspector inputOI; + + WindowingIterator(PTFPartition iPart, ArrayList output, + List[] outputFromPivotFunctions, int[] wFnsToProcess) { + this.iPart = iPart; + this.output = output; + this.outputFromPivotFunctions = outputFromPivotFunctions; + this.wFnsToProcess = wFnsToProcess; + this.currIdx = 0; + wTFnDef = (WindowTableFunctionDef) getTableDef(); + order = wTFnDef.getOrder().getExpressions().get(0).getOrder(); + ptfDesc = getQueryDef(); + inputOI = iPart.getOutputOI(); + } + + @Override + public boolean hasNext() { + return currIdx < iPart.size(); + } + + @Override + public Object next() { + int i; + for(i = 0; i < outputFromPivotFunctions.length; i++ ) { + if ( outputFromPivotFunctions[i] != null ) { + output.set(i, outputFromPivotFunctions[i].get(currIdx)); + } + } + + try { + for (int j : wFnsToProcess) { + WindowFunctionDef wFn = wTFnDef.getWindowFunctions().get(j); + Range rng = getRange(wFn, currIdx, iPart, order); + PTFPartitionIterator rItr = rng.iterator(); + PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr); + output.set(j, evaluateWindowFunction(wFn, rItr)); + } + + Object iRow = iPart.getAt(currIdx); + i = wTFnDef.getWindowFunctions().size(); + for (StructField f : inputOI.getAllStructFieldRefs()) { + output.set(i++, inputOI.getStructFieldData(iRow, f)); + } + + } catch (HiveException he) { + throw new RuntimeException(he); + } + + currIdx++; + return output; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + } + }