Details
Description
Executing the query:
select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem_1_row where l_shipdate <= date_sub('1998-12-01', '90') group by l_returnflag, l_linestatus ;
twice (in succession), will result in CodeGenerator cache misses in BOTH executions. Since the query is identical, I would expect the same code to be generated.
Turns out, the generated code is not exactly the same, resulting in cache misses when performing the lookup in the CodeGenerator cache. Yet, the code is equivalent.
Below is (some portion of the) generated code for two runs of the query:
run-1
import java.nio.ByteBuffer; import java.nio.ByteOrder; import scala.collection.Iterator; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; public SpecificColumnarIterator generate(Object[] references) { return new SpecificColumnarIterator(); } class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator { private ByteOrder nativeOrder = null; private byte[][] buffers = null; private UnsafeRow unsafeRow = new UnsafeRow(7); private BufferHolder bufferHolder = new BufferHolder(unsafeRow); private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); private MutableUnsafeRow mutableRow = null; private int currentRow = 0; private int numRowsInBatch = 0; private scala.collection.Iterator input = null; private DataType[] columnTypes = null; private int[] columnIndexes = null; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor1; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2; private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4; private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5; private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor6; public SpecificColumnarIterator() { this.nativeOrder = ByteOrder.nativeOrder(); this.buffers = new byte[7][]; this.mutableRow = new MutableUnsafeRow(rowWriter); } public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { this.input = input; this.columnTypes = columnTypes; this.columnIndexes = columnIndexes; } public boolean hasNext() { if (currentRow < numRowsInBatch) { return true; } if (!input.hasNext()) { return false; } org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); currentRow = 0; numRowsInBatch = batch.numRows(); for (int i = 0; i < columnIndexes.length; i ++) { buffers[i] = batch.buffers()[columnIndexes[i]]; } accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); accessor1 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder)); accessor6 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder)); return hasNext(); } public InternalRow next() { currentRow += 1; bufferHolder.reset(); rowWriter.zeroOutNullBytes(); accessor.extractTo(mutableRow, 0); accessor1.extractTo(mutableRow, 1); accessor2.extractTo(mutableRow, 2); accessor3.extractTo(mutableRow, 3); accessor4.extractTo(mutableRow, 4); accessor5.extractTo(mutableRow, 5); accessor6.extractTo(mutableRow, 6); unsafeRow.setTotalSize(bufferHolder.totalSize()); return unsafeRow; } }
run-2:
import java.nio.ByteBuffer; import java.nio.ByteOrder; import scala.collection.Iterator; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; public SpecificColumnarIterator generate(Object[] references) { return new SpecificColumnarIterator(); } class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator { private ByteOrder nativeOrder = null; private byte[][] buffers = null; private UnsafeRow unsafeRow = new UnsafeRow(7); private BufferHolder bufferHolder = new BufferHolder(unsafeRow); private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7); private MutableUnsafeRow mutableRow = null; private int currentRow = 0; private int numRowsInBatch = 0; private scala.collection.Iterator input = null; private DataType[] columnTypes = null; private int[] columnIndexes = null; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor; private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor1; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2; private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4; private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5; private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor6; public SpecificColumnarIterator() { this.nativeOrder = ByteOrder.nativeOrder(); this.buffers = new byte[7][]; this.mutableRow = new MutableUnsafeRow(rowWriter); } public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { this.input = input; this.columnTypes = columnTypes; this.columnIndexes = columnIndexes; } public boolean hasNext() { if (currentRow < numRowsInBatch) { return true; } if (!input.hasNext()) { return false; } org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); currentRow = 0; numRowsInBatch = batch.numRows(); for (int i = 0; i < columnIndexes.length; i ++) { buffers[i] = batch.buffers()[columnIndexes[i]]; } accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder)); accessor1 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder)); accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder)); accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder)); accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder)); accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder)); accessor6 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder)); return hasNext(); } public InternalRow next() { currentRow += 1; bufferHolder.reset(); rowWriter.zeroOutNullBytes(); accessor.extractTo(mutableRow, 0); accessor1.extractTo(mutableRow, 1); accessor2.extractTo(mutableRow, 2); accessor3.extractTo(mutableRow, 3); accessor4.extractTo(mutableRow, 4); accessor5.extractTo(mutableRow, 5); accessor6.extractTo(mutableRow, 6); unsafeRow.setTotalSize(bufferHolder.totalSize()); return unsafeRow; } }
Diff-ing the two files reveals that the "accessor*" variable definitions are permuted.