diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 2d06545..3fb9fb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -300,7 +300,7 @@ private void initializeSourceForTag(ReduceWork redWork, int tag, ObjectInspector boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorizedRowBatchCtx()); + redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum()); ois[tag] = sources[tag].getObjectInspector(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 8cd49c5..ad8b9e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -121,11 +121,14 @@ private final GroupIterator groupIterator = new GroupIterator(); + private long vectorizedVertexNum; + void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag, - VectorizedRowBatchCtx batchContext) + VectorizedRowBatchCtx batchContext, long vectorizedVertexNum) throws Exception { + this.vectorizedVertexNum = vectorizedVertexNum; ObjectInspector keyObjectInspector; this.reducer = reducer; @@ -476,7 +479,8 @@ private void processVectorGroup(BytesWritable keyWritable, + StringUtils.stringifyException(e2) + " ]"; } throw new HiveException("Hive Runtime Error while processing vector batch (tag=" - + tag + ") " + rowString, e); + + tag + ") (vectorizedVertexNum " + vectorizedVertexNum + ") " + + rowString, e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index f09bfa4..50eda15 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -201,6 +201,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.NullStructSerDe; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -220,6 +221,7 @@ import org.apache.hive.common.util.AnnotationUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ReflectionUtil; +import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Preconditions; @@ -307,6 +309,8 @@ private void clearNotVectorizedReason() { currentBaseWork.setNotVectorizedReason(null); } + private long vectorizedVertexNum = -1; + public Vectorizer() { /* @@ -604,6 +608,8 @@ private void convertMapWork(MapWork mapWork, boolean isTezOrSpark) throws Semant VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); vectorTaskColumnInfo.assume(); + mapWork.setVectorizedVertexNum(++vectorizedVertexNum); + boolean ret; try { ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark); @@ -1137,6 +1143,8 @@ private void convertReduceWork(ReduceWork reduceWork) throws SemanticException { VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); vectorTaskColumnInfo.assume(); + reduceWork.setVectorizedVertexNum(++vectorizedVertexNum); + boolean ret; try { ret = validateReduceWork(reduceWork, vectorTaskColumnInfo); @@ -1164,38 +1172,56 @@ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork, ArrayList reduceColumnNames = new ArrayList(); ArrayList reduceTypeInfos = new ArrayList(); + if (reduceWork.getNeedsTagging()) { + setNodeIssue("Tagging not supported"); + return false; + } + try { - // Check key ObjectInspector. - ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector(); - if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) { - setNodeIssue("Key object inspector missing or not StructObjectInspector"); - return false; + TableDesc keyTableDesc = reduceWork.getKeyDesc(); + if (LOG.isDebugEnabled()) { + LOG.debug("Using reduce tag " + reduceWork.getTag()); } - StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector; - List keyFields = keyStructObjectInspector.getAllStructFieldRefs(); + TableDesc valueTableDesc = reduceWork.getTagToValueDesc().get(reduceWork.getTag()); - if (reduceWork.getNeedsTagging()) { - setNodeIssue("Tez doesn't use tagging"); + Deserializer keyDeserializer = + ReflectionUtils.newInstance( + keyTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(keyDeserializer, null, keyTableDesc.getProperties(), null); + ObjectInspector keyObjectInspector = keyDeserializer.getObjectInspector(); + if (keyObjectInspector == null) { + setNodeIssue("Key object inspector null"); return false; } - - // Check value ObjectInspector. - ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector(); - if (valueObjectInspector == null || - !(valueObjectInspector instanceof StructObjectInspector)) { - setNodeIssue("Value object inspector missing or not StructObjectInspector"); + if (!(keyObjectInspector instanceof StructObjectInspector)) { + setNodeIssue("Key object inspector not StructObjectInspector"); return false; } - StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; - List valueFields = valueStructObjectInspector.getAllStructFieldRefs(); + StructObjectInspector keyStructObjectInspector = (StructObjectInspector) keyObjectInspector; + List keyFields = keyStructObjectInspector.getAllStructFieldRefs(); for (StructField field: keyFields) { reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); } - for (StructField field: valueFields) { - reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + + Deserializer valueDeserializer = + ReflectionUtils.newInstance( + valueTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(valueDeserializer, null, valueTableDesc.getProperties(), null); + ObjectInspector valueObjectInspector = valueDeserializer.getObjectInspector(); + if (valueObjectInspector != null) { + if (!(valueObjectInspector instanceof StructObjectInspector)) { + setNodeIssue("Value object inspector not StructObjectInspector"); + return false; + } + StructObjectInspector valueStructObjectInspector = (StructObjectInspector) valueObjectInspector; + List valueFields = valueStructObjectInspector.getAllStructFieldRefs(); + + for (StructField field: valueFields) { + reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + } } } catch (Exception e) { throw new SemanticException(e); @@ -1520,6 +1546,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (op instanceof TableScanOperator) { if (taskVectorizationContext == null) { taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo); + if (LOG.isInfoEnabled()) { + LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); + LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString()); + } } vContext = taskVectorizationContext; } else { @@ -1584,8 +1614,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, currentOperator = op; if (op.getParentOperators().size() == 0) { - LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); - + if (LOG.isInfoEnabled()) { + LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); + LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString()); + } vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.allColumnNames, hiveConf); taskVectorizationContext = vContext; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 286ee3b..0984df7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -89,6 +89,8 @@ public BaseWork(String name) { private boolean allNative; private boolean usesVectorUDFAdaptor; + protected long vectorizedVertexNum; + protected boolean llapMode = false; protected boolean uberMode = false; @@ -183,6 +185,14 @@ public void addDummyOp(HashTableDummyOperator dummyOp) { return returnSet; } + public void setVectorizedVertexNum(long vectorizedVertexNum) { + this.vectorizedVertexNum = vectorizedVertexNum; + } + + public long getVectorizedVertexNum() { + return vectorizedVertexNum; + } + // ----------------------------------------------------------------------------------------------- public void setVectorizationExamined(boolean vectorizationExamined) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index f4ab2a0..ee784dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -112,37 +112,6 @@ public TableDesc getKeyDesc() { return keyDesc; } - private ObjectInspector getObjectInspector(TableDesc desc) { - ObjectInspector objectInspector; - try { - Deserializer deserializer = ReflectionUtil.newInstance(desc - .getDeserializerClass(), null); - SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null); - objectInspector = deserializer.getObjectInspector(); - } catch (Exception e) { - return null; - } - return objectInspector; - } - - public ObjectInspector getKeyObjectInspector() { - if (keyObjectInspector == null) { - keyObjectInspector = getObjectInspector(keyDesc); - } - return keyObjectInspector; - } - - // Only works when not tagging. - public ObjectInspector getValueObjectInspector() { - if (needsTagging) { - return null; - } - if (valueObjectInspector == null) { - valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag)); - } - return valueObjectInspector; - } - public List getTagToValueDesc() { return tagToValueDesc; }