diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 12cf5df..0a32fff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -136,65 +136,65 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { valueTableDesc = new TableDesc[gWork.getTagToValueDesc().size()]; if (vectorized) { - final int maxTags = gWork.getTagToValueDesc().size(); - keyStructInspector = (StructObjectInspector) keyObjectInspector; - batches = new VectorizedRowBatch[maxTags]; - valueStructInspectors = new StructObjectInspector[maxTags]; - valueStringWriters = (List[]) new List[maxTags]; - keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); - buffer = new DataOutputBuffer(); + final int maxTags = gWork.getTagToValueDesc().size(); + keyStructInspector = (StructObjectInspector) keyObjectInspector; + batches = new VectorizedRowBatch[maxTags]; + valueStructInspectors = new StructObjectInspector[maxTags]; + valueStringWriters = (List[]) new List[maxTags]; + keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); + buffer = new DataOutputBuffer(); } for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) { - // We should initialize the SerDe with the TypeInfo when available. - valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag); - inputValueDeserializer[tag] = ReflectionUtils.newInstance( - valueTableDesc[tag].getDeserializerClass(), null); - SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null, - valueTableDesc[tag].getProperties(), null); - valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector(); - - ArrayList ois = new ArrayList(); - - if (vectorized) { - /* vectorization only works with struct object inspectors */ - valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag]; - - batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, - valueStructInspectors[tag]); - final int totalColumns = keysColumnOffset - + valueStructInspectors[tag].getAllStructFieldRefs().size(); - valueStringWriters[tag] = new ArrayList(totalColumns); - valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory - .genVectorStructExpressionWritables(keyStructInspector))); - valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory - .genVectorStructExpressionWritables(valueStructInspectors[tag]))); - - /* - * The row object inspector used by ReduceWork needs to be a - * **standard** struct object inspector, not just any struct object - * inspector. - */ - ArrayList colNames = new ArrayList(); - List fields = keyStructInspector.getAllStructFieldRefs(); - for (StructField field : fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueStructInspectors[tag].getAllStructFieldRefs(); - for (StructField field : fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector( - colNames, ois); - } else { - ois.add(keyObjectInspector); - ois.add(valueObjectInspector[tag]); - //reducer.setGroupKeyObjectInspector(keyObjectInspector); - rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector( - Utilities.reduceFieldNameList, ois); - } + // We should initialize the SerDe with the TypeInfo when available. + valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag); + inputValueDeserializer[tag] = ReflectionUtils.newInstance( + valueTableDesc[tag].getDeserializerClass(), null); + SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null, + valueTableDesc[tag].getProperties(), null); + valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector(); + + ArrayList ois = new ArrayList(); + + if (vectorized) { + /* vectorization only works with struct object inspectors */ + valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag]; + + batches[tag] = VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, + valueStructInspectors[tag]); + final int totalColumns = keysColumnOffset + + valueStructInspectors[tag].getAllStructFieldRefs().size(); + valueStringWriters[tag] = new ArrayList(totalColumns); + valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(keyStructInspector))); + valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory + .genVectorStructExpressionWritables(valueStructInspectors[tag]))); + + /* + * The row object inspector used by ReduceWork needs to be a + * **standard** struct object inspector, not just any struct object + * inspector. + */ + ArrayList colNames = new ArrayList(); + List fields = keyStructInspector.getAllStructFieldRefs(); + for (StructField field : fields) { + colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + fields = valueStructInspectors[tag].getAllStructFieldRefs(); + for (StructField field : fields) { + colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector( + colNames, ois); + } else { + ois.add(keyObjectInspector); + ois.add(valueObjectInspector[tag]); + //reducer.setGroupKeyObjectInspector(keyObjectInspector); + rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector( + Utilities.reduceFieldNameList, ois); + } } } catch (Exception e) { throw new RuntimeException(e); @@ -283,19 +283,20 @@ public void processRow(Object key, Iterator values) throws IOException { } /* this.keyObject passed via reference */ if (vectorized) { - processVectors(values, tag); + processVectors(values, tag); } else { - processKeyValues(values, tag); + processKeyValues(values, tag); } } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; } else { - LOG.fatal(StringUtils.stringifyException(e)); - throw new RuntimeException(e); + String msg = "Fatal error: " + e; + LOG.fatal(msg, e); + throw new RuntimeException(e); } } } @@ -305,43 +306,39 @@ public void processRow(Object key, Iterator values) throws IOException { * @return true if it is not done and can take more inputs */ private boolean processKeyValues(Iterator values, byte tag) throws HiveException { - - // System.err.print(keyObject.toString()); - while (values.hasNext()) { - BytesWritable valueWritable = (BytesWritable) values.next(); - // System.err.print(who.getHo().toString()); - try { - valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable); - } catch (SerDeException e) { - throw new HiveException( - "Hive Runtime Error: Unable to deserialize reduce input value (tag=" - + tag - + ") from " - + Utilities.formatBinaryString(valueWritable.get(), 0, - valueWritable.getSize()) + " with properties " - + valueTableDesc[tag].getProperties(), e); - } - row.clear(); - row.add(keyObject); - row.add(valueObject[tag]); - if (isLogInfoEnabled) { - logMemoryInfo(); - } + while (values.hasNext()) { + BytesWritable valueWritable = (BytesWritable) values.next(); + try { + valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable); + } catch (SerDeException e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + + tag + + ") from " + + Utilities.formatBinaryString(valueWritable.get(), 0, + valueWritable.getSize()) + " with properties " + + valueTableDesc[tag].getProperties(), e); + } + row.clear(); + row.add(keyObject); + row.add(valueObject[tag]); + if (isLogInfoEnabled) { + logMemoryInfo(); + } + try { + reducer.processOp(row, tag); + } catch (Exception e) { + String rowString = null; try { - reducer.processOp(row, tag); - } catch (Exception e) { - String rowString = null; - try { - rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " + - StringUtils.stringifyException(e2) + " ]"; - } - throw new HiveException("Hive Runtime Error while processing row (tag=" - + tag + ") " + rowString, e); + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag + ") " + rowString, e); } - + } return true; // give me more } @@ -362,41 +359,41 @@ private boolean processVectors(Iterator values, byte tag) throws HiveException { int rowIdx = 0; try { while (values.hasNext()) { - /* deserialize value into columns */ - BytesWritable valueWritable = (BytesWritable) values.next(); - Object valueObj = deserializeValue(valueWritable, tag); - - VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx, - keysColumnOffset, batch, buffer); - rowIdx++; - if (rowIdx >= BATCH_SIZE) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); - reducer.processOp(batch, tag); - rowIdx = 0; - if (isLogInfoEnabled) { - logMemoryInfo(); - } - } + /* deserialize value into columns */ + BytesWritable valueWritable = (BytesWritable) values.next(); + Object valueObj = deserializeValue(valueWritable, tag); + + VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx, + keysColumnOffset, batch, buffer); + rowIdx++; + if (rowIdx >= BATCH_SIZE) { + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); + rowIdx = 0; + if (isLogInfoEnabled) { + logMemoryInfo(); + } + } } if (rowIdx > 0) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); - reducer.processOp(batch, tag); + VectorizedBatchUtil.setBatchSize(batch, rowIdx); + reducer.processOp(batch, tag); } if (isLogInfoEnabled) { - logMemoryInfo(); + logMemoryInfo(); } } catch (Exception e) { String rowString = null; try { - /* batch.toString depends on this */ - batch.setValueWriters(valueStringWriters[tag].toArray(new VectorExpressionWriter[0])); - rowString = batch.toString(); + /* batch.toString depends on this */ + batch.setValueWriters(valueStringWriters[tag].toArray(new VectorExpressionWriter[0])); + rowString = batch.toString(); } catch (Exception e2) { - rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2) - + " ]"; + rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2) + + " ]"; } throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + tag + ") " - + rowString, e); + + rowString, e); } return true; // give me more } @@ -406,9 +403,9 @@ private Object deserializeValue(BytesWritable valueWritable, byte tag) throws Hi return inputValueDeserializer[tag].deserialize(valueWritable); } catch (SerDeException e) { throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input value (tag=" - + tag + ") from " - + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength()) - + " with properties " + valueTableDesc[tag].getProperties(), e); + + tag + ") from " + + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength()) + + " with properties " + valueTableDesc[tag].getProperties(), e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index f08691a..12cfd78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -313,14 +313,14 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) } } } else if (currTask instanceof SparkTask) { - SparkWork sparkWork = (SparkWork) currTask.getWork(); - for (BaseWork baseWork : sparkWork.getAllWork()) { - if (baseWork instanceof MapWork) { - convertMapWork((MapWork) baseWork, false); - } else if (baseWork instanceof ReduceWork) { - convertReduceWork((ReduceWork) baseWork); - } - } + SparkWork sparkWork = (SparkWork) currTask.getWork(); + for (BaseWork baseWork : sparkWork.getAllWork()) { + if (baseWork instanceof MapWork) { + convertMapWork((MapWork) baseWork, false); + } else if (baseWork instanceof ReduceWork) { + convertReduceWork((ReduceWork) baseWork); + } + } } return null; }