diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 23d6a5d..0d603c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; @@ -153,8 +155,10 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) throws /* vectorization only works with struct object inspectors */ valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag]; - batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, - valueStructInspectors[tag]); + ObjectPair pair = VectorizedBatchUtil. + constructVectorizedRowBatch(keyStructInspector, + valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap()); + batches[tag] = pair.getFirst(); final int totalColumns = keysColumnOffset + valueStructInspectors[tag].getAllStructFieldRefs().size(); valueStringWriters[tag] = new ArrayList(totalColumns); @@ -163,24 +167,7 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) throws valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory .genVectorStructExpressionWritables(valueStructInspectors[tag]))); - /* - * The row object inspector used by ReduceWork needs to be a - * **standard** struct object inspector, not just any struct object - * inspector. - */ - ArrayList colNames = new ArrayList(); - List fields = keyStructInspector.getAllStructFieldRefs(); - for (StructField field : fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueStructInspectors[tag].getAllStructFieldRefs(); - for (StructField field : fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector( - colNames, ois); + rowObjectInspector[tag] = pair.getSecond(); } else { ois.add(keyObjectInspector); ois.add(valueObjectInspector[tag]); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index cdabe3a..26b90f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -51,6 +52,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; @@ -95,7 +97,6 @@ private VectorDeserializeRow valueLazyBinaryDeserializeToRow; - private VectorizedRowBatchCtx batchContext; private VectorizedRowBatch batch; // number of columns pertaining to keys in a vectorized row batch @@ -117,7 +118,7 @@ private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private Iterable valueWritables; - + private final GroupIterator groupIterator = new GroupIterator(); void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyTableDesc, @@ -173,22 +174,13 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT * The row object inspector used by ReduceWork needs to be a **standard** * struct object inspector, not just any struct object inspector. */ - ArrayList colNames = new ArrayList(); - List fields = keyStructInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueStructInspectors.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); + ObjectPair pair = + VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap); + rowObjectInspector = pair.getSecond(); + batch = pair.getFirst(); - batchContext = new VectorizedRowBatchCtx(); + VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); batchContext.init(vectorScratchColumnTypeMap, (StructObjectInspector) rowObjectInspector); - batch = batchContext.createVectorizedRowBatch(); // Setup vectorized deserialization for the key and value. BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer; @@ -237,7 +229,7 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } - + @Override public final boolean isGrouped() { return vectorized; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 99cb620..119d3da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -24,13 +24,16 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -195,18 +198,31 @@ public static VectorizedRowBatch constructVectorizedRowBatch( * * @param keyInspector * @param valueInspector - * @return VectorizedRowBatch + * @param vectorScratchColumnTypeMap + * @return VectorizedRowBatch, OI * @throws HiveException */ - public static VectorizedRowBatch constructVectorizedRowBatch( - StructObjectInspector keyInspector, StructObjectInspector valueInspector) + public static ObjectPair constructVectorizedRowBatch( + StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map vectorScratchColumnTypeMap) throws HiveException { - final List cvList = new LinkedList(); - allocateColumnVector(keyInspector, cvList); - allocateColumnVector(valueInspector, cvList); - final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size()); - result.cols = cvList.toArray(result.cols); - return result; + + ArrayList colNames = new ArrayList(); + ArrayList ois = new ArrayList(); + List fields = keyInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + fields = valueInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); + + VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); + batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector); + return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector); } /** @@ -559,7 +575,7 @@ public static StandardStructObjectInspector convertToStandardStructObjectInspect for(StructField field : fields) { TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString( field.getFieldObjectInspector().getTypeName()); - ObjectInspector standardWritableObjectInspector = + ObjectInspector standardWritableObjectInspector = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo); oids.add(standardWritableObjectInspector); columnNames.add(field.getFieldName()); @@ -634,7 +650,7 @@ public static String displayBytes(byte[] bytes, int start, int length) { for (int i = start; i < start + length; i++) { char ch = (char) bytes[i]; if (ch < ' ' || ch > '~') { - sb.append(String.format("\\%03d", (int) (bytes[i] & 0xff))); + sb.append(String.format("\\%03d", bytes[i] & 0xff)); } else { sb.append(ch); }