diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 8251900..1cc14be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -95,6 +95,7 @@ // number of columns pertaining to keys in a vectorized row batch private int keysColumnOffset; private static final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; + private static final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES; private StructObjectInspector keyStructInspector; private StructObjectInspector[] valueStructInspectors; /* this is only used in the error code path */ @@ -372,6 +373,7 @@ public void processRow(Object key, final Object value) throws IOException { } int rowIdx = 0; + int batchBytes = 0; try { while (values.hasNext()) { /* deserialize value into columns */ @@ -380,11 +382,13 @@ public void processRow(Object key, final Object value) throws IOException { VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx, keysColumnOffset, batch, buffer); + batchBytes += valueWritable.getLength(); rowIdx++; - if (rowIdx >= BATCH_SIZE) { + if (rowIdx >= BATCH_SIZE || batchBytes > BATCH_BYTES) { VectorizedBatchUtil.setBatchSize(batch, rowIdx); reducer.process(batch, tag); rowIdx = 0; + batchBytes = 0; if (isLogInfoEnabled) { logMemoryInfo(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 8cd49c5..d94d5b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -103,6 +103,8 @@ // number of columns pertaining to keys in a vectorized row batch private int firstValueColumnOffset; + private final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES; + private StructObjectInspector keyStructInspector; private StructObjectInspector valueStructInspectors; @@ -432,6 +434,7 @@ private void processVectorGroup(BytesWritable keyWritable, final int maxSize = batch.getMaxSize(); Preconditions.checkState(maxSize > 0); int rowIdx = 0; + int batchBytes = keyBytes.length; try { for (Object value : values) { if (valueLazyBinaryDeserializeToRow != null) { @@ -439,6 +442,7 @@ private void processVectorGroup(BytesWritable keyWritable, BytesWritable valueWritable = (BytesWritable) value; byte[] valueBytes = valueWritable.getBytes(); int valueLength = valueWritable.getLength(); + batchBytes += valueLength; // l4j.info("ReduceRecordSource processVectorGroup valueBytes " + valueLength + " " + // VectorizedBatchUtil.displayBytes(valueBytes, 0, valueLength)); @@ -447,7 +451,7 @@ private void processVectorGroup(BytesWritable keyWritable, valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx); } rowIdx++; - if (rowIdx >= maxSize) { + if (rowIdx >= maxSize || batchBytes >= BATCH_BYTES) { // Batch is full. batch.size = rowIdx; @@ -459,6 +463,7 @@ private void processVectorGroup(BytesWritable keyWritable, batch.cols[i].reset(); } rowIdx = 0; + batchBytes = 0; } } if (rowIdx > 0) { diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index 0235ffc..4e36f0b 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -59,6 +59,11 @@ */ public static final int DEFAULT_SIZE = 1024; + /* + * This number is a safety limit for 32MB of writables. + */ + public static final int DEFAULT_BYTES = 32 * 1024 * 1024; + /** * Return a batch with the specified number of columns. * This is the standard constructor -- all batches should be the same size