diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index 4d314b7..6103340 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -55,7 +55,7 @@ transient KeyWrapperFactory keyWrapperFactory; protected transient KeyWrapper currentKeys; protected transient KeyWrapper newKeys; - transient HiveConf hiveConf; + transient Configuration hiveConf; /* @@ -66,14 +66,13 @@ */ @Override protected void initializeOp(Configuration jobConf) throws HiveException { - hiveConf = new HiveConf(jobConf, PTFOperator.class); - // if the parent is ExtractOperator, this invocation is from reduce-side + hiveConf = jobConf; + // if the parent is ExtractOperator, this invocation is from reduce-side Operator parentOp = getParentOperators().get(0); isMapOperator = conf.isMapSide(); reconstructQueryDef(hiveConf); - inputPart = createFirstPartitionForChain( - inputObjInspectors[0], hiveConf, isMapOperator); + inputPart = createFirstPartitionForChain(inputObjInspectors[0], isMapOperator); if (isMapOperator) { PartitionedTableFunctionDef tDef = conf.getStartOfChain(); @@ -147,7 +146,7 @@ public void processOp(Object row, int tag) throws HiveException * @param hiveConf * @throws HiveException */ - protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException { + protected void reconstructQueryDef(Configuration hiveConf) throws HiveException { PTFDeserializer dS = new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf); @@ -191,11 +190,11 @@ protected void processInputPartition() throws HiveException { protected void processMapFunction() throws HiveException { PartitionedTableFunctionDef tDef = conf.getStartOfChain(); - - Iterator pItr = tDef.getTFunction().canIterateOutput() ? + + Iterator pItr = tDef.getTFunction().canIterateOutput() ? tDef.getTFunction().transformRawInputIterator(inputPart.iterator()) : tDef.getTFunction().transformRawInput(inputPart).iterator(); - + while (pItr.hasNext()) { Object oRow = pItr.next(); forward(oRow, outputObjInspector); @@ -280,9 +279,8 @@ public OperatorType getType() { * @throws HiveException */ public PTFPartition createFirstPartitionForChain(ObjectInspector oi, - HiveConf hiveConf, boolean isMapSide) throws HiveException { + boolean isMapSide) throws HiveException { PartitionedTableFunctionDef tabDef = conf.getStartOfChain(); - TableFunctionEvaluator tEval = tabDef.getTFunction(); PTFPartition part = null; SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() : diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java index 3c717d6..b5adb11 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.persistence.PTFRowContainer; @@ -46,7 +47,7 @@ StructObjectInspector outputOI; private final PTFRowContainer> elems; - protected PTFPartition(HiveConf cfg, + protected PTFPartition(Configuration cfg, SerDe serDe, StructObjectInspector inputOI, StructObjectInspector outputOI) throws HiveException { @@ -130,11 +131,13 @@ public void close() { createTimeSz = PTFPartition.this.size(); } + @Override public boolean hasNext() { checkForComodification(); return idx < end; } + @Override public Object next() { checkForComodification(); try { @@ -144,6 +147,7 @@ public Object next() { } } + @Override public void remove() { throw new UnsupportedOperationException(); } @@ -222,7 +226,7 @@ public void reset() { void reset() throws HiveException; } - public static PTFPartition create(HiveConf cfg, + public static PTFPartition create(Configuration cfg, SerDe serDe, StructObjectInspector inputOI, StructObjectInspector outputOI) diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java index 95f38b6..3ac3245 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.PTFUtils; import org.apache.hadoop.hive.ql.parse.LeadLagInfo; @@ -42,7 +43,7 @@ */ boolean isMapSide = false; - transient HiveConf cfg; + transient Configuration cfg; static{ PTFUtils.makeTransient(PTFDesc.class, "llInfo"); @@ -81,11 +82,11 @@ public void setMapSide(boolean isMapSide) { this.isMapSide = isMapSide; } - public HiveConf getCfg() { + public Configuration getCfg() { return cfg; } - public void setCfg(HiveConf cfg) { + public void setCfg(Configuration cfg) { this.cfg = cfg; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java index 217db4e..f75bec5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.PTFPartition; @@ -64,10 +65,10 @@ PTFDesc ptfDesc; StructObjectInspector inputOI; - HiveConf hConf; + Configuration hConf; LeadLagInfo llInfo; - public PTFDeserializer(PTFDesc ptfDesc, StructObjectInspector inputOI, HiveConf hConf) { + public PTFDeserializer(PTFDesc ptfDesc, StructObjectInspector inputOI, Configuration hConf) { super(); this.ptfDesc = ptfDesc; ptfDesc.setCfg(hConf); @@ -288,7 +289,7 @@ private TableFunctionResolver constructResolver(String className) throws HiveExc @SuppressWarnings("unchecked") Class rCls = (Class) Class.forName(className); - return (TableFunctionResolver) ReflectionUtils.newInstance(rCls, null); + return ReflectionUtils.newInstance(rCls, null); } catch (Exception e) { throw new HiveException(e); }