diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java index 989cc2d..ddaaa21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Stat; import org.apache.hadoop.hive.ql.exec.TerminalOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -331,7 +332,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { statsCollectRawDataSize = conf.isStatsCollectRawDataSize(); serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); - System.out.println("Deserializer class = "+serializer.getClass().toString()); serializer.initialize(null, conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); @@ -589,6 +589,10 @@ public void processOp(Object data, int tag) throws HiveException { if (vectorizedSerde) { row = records[i]; } else { + if (vrg.valueWriters == null) { + vrg.setValueWriters(VectorExpressionWriterFactory.getExpressionWriters( + (StructObjectInspector)inputObjInspectors[0])); + } row = new Text(vrg.toString()); } /* Create list bucketing sub-directory only if stored-as-directories is on. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index e4368e6..e0c0be2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -19,16 +19,21 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; /** * Select operator implementation. @@ -44,6 +49,8 @@ private int [] projectedColumns = null; + private VectorExpressionWriter [] valueWriters = null; + public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) { this.vContext = ctxt; this.conf = (SelectDesc) conf; @@ -57,6 +64,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { return; } + List objectInspectors = new ArrayList(); + List colList = conf.getColList(); vContext.setOperatorType(OperatorType.SELECT); vExpressions = new VectorExpression[colList.size()]; @@ -66,6 +75,15 @@ protected void initializeOp(Configuration hconf) throws HiveException { // Update column map with output column names vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn()); } + valueWriters = VectorExpressionWriterFactory.getExpressionWriters(colList); + for (VectorExpressionWriter vew : valueWriters) { + objectInspectors.add(vew.getObjectInspector()); + } + + List outputFieldNames = conf.getOutputColumnNames(); + outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( + outputFieldNames, objectInspectors); + initializeChildren(hconf); projectedColumns = new int [vExpressions.length]; for (int i = 0; i < projectedColumns.length; i++) { @@ -97,6 +115,8 @@ public void processOp(Object row, int tag) throws HiveException { } // Prepare output, set the projections + VectorExpressionWriter [] originalValueWriters = vrg.valueWriters; + vrg.setValueWriters(valueWriters); int[] originalProjections = vrg.projectedColumns; int originalProjectionSize = vrg.projectionSize; vrg.projectionSize = vExpressions.length; @@ -106,6 +126,7 @@ public void processOp(Object row, int tag) throws HiveException { // Revert the projected columns back, because vrg will be re-used. vrg.projectionSize = originalProjectionSize; vrg.projectedColumns = originalProjections; + vrg.valueWriters = originalValueWriters; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index a22a2de..a0b7fc3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -21,6 +21,8 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.io.Writable; /** @@ -54,8 +56,7 @@ */ public static final int DEFAULT_SIZE = 1024; - private final Writable[] writableRow; - private int rowIteratorIndex = 0; + public VectorExpressionWriter[] valueWriters = null; /** * Return a batch with the specified number of columns. @@ -81,7 +82,6 @@ public VectorizedRowBatch(int numCols, int size) { selected = new int[size]; selectedInUse = false; this.cols = new ColumnVector[numCols]; - writableRow = new Writable[numCols]; projectedColumns = new int[numCols]; // Initially all columns are projected and in the same order @@ -91,30 +91,6 @@ public VectorizedRowBatch(int numCols, int size) { } } - public void initRowIterator(){ - this.rowIteratorIndex = 0; - } - - public Writable [] getNextRow() { - if (rowIteratorIndex >= size) { - return null; - } - if (selectedInUse) { - int i = selected[rowIteratorIndex]; - for (int k = 0; k < projectionSize; k++) { - int c = this.projectedColumns[k]; - writableRow[c] = cols[c].getWritableObject(i); - } - } else { - int i = rowIteratorIndex; - for (int k = 0; k < projectionSize; k++) { - int c = this.projectedColumns[k]; - writableRow[c] = cols[c].getWritableObject(i); - } - } - return writableRow; - } - /** * Return count of qualifying rows. * @@ -130,45 +106,51 @@ public String toString() { return ""; } StringBuilder b = new StringBuilder(); - if (this.selectedInUse) { - for (int j = 0; j < size; j++) { - int i = selected[j]; - int colIndex = 0; - for (int k = 0; k < projectionSize; k++) { - ColumnVector cv = cols[this.projectedColumns[k]]; - if (cv.isRepeating) { - b.append(cv.getWritableObject(0).toString()); - } else { - b.append(cv.getWritableObject(i).toString()); + try { + if (this.selectedInUse) { + for (int j = 0; j < size; j++) { + int i = selected[j]; + int colIndex = 0; + for (int k = 0; k < projectionSize; k++) { + int projIndex = projectedColumns[k]; + ColumnVector cv = cols[projIndex]; + if (cv.isRepeating) { + b.append(valueWriters[k].writeValue(cv, 0).toString()); + } else { + b.append(valueWriters[k].writeValue(cv, i).toString()); + } + colIndex++; + if (colIndex < cols.length) { + b.append('\u0001'); + } } - colIndex++; - if (colIndex < cols.length) { - b.append('\u0001'); + if (j < size - 1) { + b.append('\n'); } } - if (j < size-1) { - b.append('\n'); - } - } - } else { - for (int i = 0; i < size; i++) { - int colIndex = 0; - for (int k = 0; k < projectionSize; k++) { - ColumnVector cv = cols[this.projectedColumns[k]]; - if (cv.isRepeating) { - b.append(cv.getWritableObject(0).toString()); - } else { - b.append(cv.getWritableObject(i).toString()); + } else { + for (int i = 0; i < size; i++) { + int colIndex = 0; + for (int k = 0; k < projectionSize; k++) { + int projIndex = projectedColumns[k]; + ColumnVector cv = cols[projIndex]; + if (cv.isRepeating) { + b.append(valueWriters[k].writeValue(cv, 0).toString()); + } else { + b.append(valueWriters[k].writeValue(cv, i).toString()); + } + colIndex++; + if (colIndex < cols.length) { + b.append('\u0001'); + } } - colIndex++; - if (colIndex < cols.length) { - b.append('\u0001'); + if (i < size - 1) { + b.append('\n'); } } - if (i < size-1) { - b.append('\n'); - } } + } catch (HiveException ex) { + throw new RuntimeException(ex); } return b.toString(); } @@ -182,5 +164,9 @@ public void readFields(DataInput arg0) throws IOException { public void write(DataOutput arg0) throws IOException { throw new UnsupportedOperationException("Don't call me"); } + + public void setValueWriters(VectorExpressionWriter[] valueWriters) { + this.valueWriters = valueWriters; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java index ac2b6df..5e8f61e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java @@ -139,7 +139,7 @@ public SerDeStats getSerDeStats() { public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) throws SerDeException { if (vos == null) { - vos = new VectorizedOrcSerde(); + vos = new VectorizedOrcSerde(objInspector); } return vos.serialize(vrg, objInspector); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 790c8c7..da93d5d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -552,7 +552,7 @@ Object nextVector(Object previousVector, long batchSize) throws IOException { // Read value entries based on isNull entries for (int i = 0; i < batchSize; i++) { if (!result.isNull[i]) { - result.vector[i] = SerializationUtils.readDouble(stream); + result.vector[i] = SerializationUtils.readFloat(stream); } else { // If the value is not present then set NaN diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java index 0bb10d3..3c28efc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hive.ql.io.orc; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; @@ -31,43 +35,54 @@ private final Writable [] orcRowArray = new Writable [VectorizedRowBatch.DEFAULT_SIZE]; private final ObjectWritable ow = new ObjectWritable(); private final ObjectInspector inspector = null; + private final VectorExpressionWriter [] valueWriters; - public VectorizedOrcSerde() { + public VectorizedOrcSerde(ObjectInspector objInspector) { super(); for (int i = 0; i < orcStructArray.length; i++) { orcRowArray[i] = new OrcSerdeRow(); } + try { + valueWriters = VectorExpressionWriterFactory + .getExpressionWriters((StructObjectInspector) objInspector); + } catch (HiveException e) { + throw new RuntimeException(e); + } } @Override public Writable serialize(Object obj, ObjectInspector inspector) { - VectorizedRowBatch batch = (VectorizedRowBatch)obj; - for (int i = 0; i < batch.size; i++) { - OrcStruct ost = orcStructArray[i]; - if (ost == null) { - ost = new OrcStruct(batch.numCols); - orcStructArray[i] = ost; - } - int index = 0; - if (batch.selectedInUse) { - index = batch.selected[i]; - } else { - index = i; - } - for (int p = 0; p < batch.projectionSize; p++) { - int k = batch.projectedColumns[p]; - Writable w; - if (batch.cols[k].isRepeating) { - w = batch.cols[k].getWritableObject(0); + VectorizedRowBatch batch = (VectorizedRowBatch) obj; + try { + for (int i = 0; i < batch.size; i++) { + OrcStruct ost = orcStructArray[i]; + if (ost == null) { + ost = new OrcStruct(batch.numCols); + orcStructArray[i] = ost; + } + int index = 0; + if (batch.selectedInUse) { + index = batch.selected[i]; } else { - w = batch.cols[k].getWritableObject(index); + index = i; + } + for (int p = 0; p < batch.projectionSize; p++) { + int k = batch.projectedColumns[p]; + Writable w; + if (batch.cols[k].isRepeating) { + w = (Writable) valueWriters[p].writeValue(batch.cols[k], 0); + } else { + w = (Writable) valueWriters[p].writeValue(batch.cols[k], index); + } + ost.setFieldValue(k, w); } - ost.setFieldValue(k, w); + OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i]; + row.realRow = ost; + row.inspector = inspector; } - OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i]; - row.realRow = ost; - row.inspector = inspector; + } catch (HiveException ex) { + throw new RuntimeException(ex); } ow.set(orcRowArray); return ow;