Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18394

Executing the same query twice in a row results in CodeGenerator cache misses

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.3.0
    • SQL
    • None
    • HiveThriftServer2 running on branch-2.0 on Mac laptop

    Description

      Executing the query:

      select
          l_returnflag,
          l_linestatus,
          sum(l_quantity) as sum_qty,
          sum(l_extendedprice) as sum_base_price,
          sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
          sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
          avg(l_quantity) as avg_qty,
          avg(l_extendedprice) as avg_price,
          avg(l_discount) as avg_disc,
          count(*) as count_order
      from
          lineitem_1_row
      where
          l_shipdate <= date_sub('1998-12-01', '90')
      group by
          l_returnflag,
          l_linestatus
      ;
      

      twice (in succession), will result in CodeGenerator cache misses in BOTH executions. Since the query is identical, I would expect the same code to be generated.

      Turns out, the generated code is not exactly the same, resulting in cache misses when performing the lookup in the CodeGenerator cache. Yet, the code is equivalent.

      Below is (some portion of the) generated code for two runs of the query:

      run-1

      import java.nio.ByteBuffer;
      import java.nio.ByteOrder;
      import scala.collection.Iterator;
      import org.apache.spark.sql.types.DataType;
      import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
      import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
      import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
      
      public SpecificColumnarIterator generate(Object[] references) {
      return new SpecificColumnarIterator();
      }
      
      class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {
      
      private ByteOrder nativeOrder = null;
      private byte[][] buffers = null;
      private UnsafeRow unsafeRow = new UnsafeRow(7);
      private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
      private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
      private MutableUnsafeRow mutableRow = null;
      
      private int currentRow = 0;
      private int numRowsInBatch = 0;
      
      private scala.collection.Iterator input = null;
      private DataType[] columnTypes = null;
      private int[] columnIndexes = null;
      
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor1;
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
      private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
      private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
      private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor6;
      
      public SpecificColumnarIterator() {
      this.nativeOrder = ByteOrder.nativeOrder();
      this.buffers = new byte[7][];
      this.mutableRow = new MutableUnsafeRow(rowWriter);
      }
      
      public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
      this.input = input;
      this.columnTypes = columnTypes;
      this.columnIndexes = columnIndexes;
      }
      
      
      
      public boolean hasNext() {
      if (currentRow < numRowsInBatch) {
      return true;
      }
      if (!input.hasNext()) {
      return false;
      }
      
      org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
      currentRow = 0;
      numRowsInBatch = batch.numRows();
      for (int i = 0; i < columnIndexes.length; i ++) {
      buffers[i] = batch.buffers()[columnIndexes[i]];
      }
      accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
      accessor1 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
      accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
      accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
      accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
      accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
      accessor6 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));
      
      return hasNext();
      }
      
      public InternalRow next() {
      currentRow += 1;
      bufferHolder.reset();
      rowWriter.zeroOutNullBytes();
      accessor.extractTo(mutableRow, 0);
      accessor1.extractTo(mutableRow, 1);
      accessor2.extractTo(mutableRow, 2);
      accessor3.extractTo(mutableRow, 3);
      accessor4.extractTo(mutableRow, 4);
      accessor5.extractTo(mutableRow, 5);
      accessor6.extractTo(mutableRow, 6);
      unsafeRow.setTotalSize(bufferHolder.totalSize());
      return unsafeRow;
      }
      }
      

      run-2:

      import java.nio.ByteBuffer;
      import java.nio.ByteOrder;
      import scala.collection.Iterator;
      import org.apache.spark.sql.types.DataType;
      import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
      import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
      import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
      
      public SpecificColumnarIterator generate(Object[] references) {
      return new SpecificColumnarIterator();
      }
      
      class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {
      
      private ByteOrder nativeOrder = null;
      private byte[][] buffers = null;
      private UnsafeRow unsafeRow = new UnsafeRow(7);
      private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
      private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 7);
      private MutableUnsafeRow mutableRow = null;
      
      private int currentRow = 0;
      private int numRowsInBatch = 0;
      
      private scala.collection.Iterator input = null;
      private DataType[] columnTypes = null;
      private int[] columnIndexes = null;
      
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor;
      private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor1;
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor2;
      private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor3;
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor4;
      private org.apache.spark.sql.execution.columnar.StringColumnAccessor accessor5;
      private org.apache.spark.sql.execution.columnar.DoubleColumnAccessor accessor6;
      
      public SpecificColumnarIterator() {
      this.nativeOrder = ByteOrder.nativeOrder();
      this.buffers = new byte[7][];
      this.mutableRow = new MutableUnsafeRow(rowWriter);
      }
      
      public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
      this.input = input;
      this.columnTypes = columnTypes;
      this.columnIndexes = columnIndexes;
      }
      
      
      
      public boolean hasNext() {
      if (currentRow < numRowsInBatch) {
      return true;
      }
      if (!input.hasNext()) {
      return false;
      }
      
      org.apache.spark.sql.execution.columnar.CachedBatch batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
      currentRow = 0;
      numRowsInBatch = batch.numRows();
      for (int i = 0; i < columnIndexes.length; i ++) {
      buffers[i] = batch.buffers()[columnIndexes[i]];
      }
      accessor = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[0]).order(nativeOrder));
      accessor1 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[1]).order(nativeOrder));
      accessor2 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[2]).order(nativeOrder));
      accessor3 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[3]).order(nativeOrder));
      accessor4 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[4]).order(nativeOrder));
      accessor5 = new org.apache.spark.sql.execution.columnar.StringColumnAccessor(ByteBuffer.wrap(buffers[5]).order(nativeOrder));
      accessor6 = new org.apache.spark.sql.execution.columnar.DoubleColumnAccessor(ByteBuffer.wrap(buffers[6]).order(nativeOrder));
      
      return hasNext();
      }
      
      public InternalRow next() {
      currentRow += 1;
      bufferHolder.reset();
      rowWriter.zeroOutNullBytes();
      accessor.extractTo(mutableRow, 0);
      accessor1.extractTo(mutableRow, 1);
      accessor2.extractTo(mutableRow, 2);
      accessor3.extractTo(mutableRow, 3);
      accessor4.extractTo(mutableRow, 4);
      accessor5.extractTo(mutableRow, 5);
      accessor6.extractTo(mutableRow, 6);
      unsafeRow.setTotalSize(bufferHolder.totalSize());
      return unsafeRow;
      }
      }
      

      Diff-ing the two files reveals that the "accessor*" variable definitions are permuted.

      Attachments

        Activity

          People

            maropu Takeshi Yamamuro
            jwserencsa Jonny Serencsa
            Herman van Hövell Herman van Hövell
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: