From 420637b6f668ce6e2d4564a681717e79ae4a6b0b Mon Sep 17 00:00:00 2001
From: Tim Armstrong <tarmstrong@cloudera.com>
Date: Wed, 11 Oct 2017 23:58:19 -0700
Subject: [PATCH] WIP: IMPALA-4123: Columnar decoding in Parquet

These changes should enable further optimizations because more time is
spent in simple kernel functions, e.g. UnpackAndDecode32Values() for
dictionary decompression.

Snappy decompression now seems to be the main CPU bottleneck for
decoding snappy-compressed Parquet.

Perf:
Running TPC-H scale factor 60 on uncompressed and snappy parquet
both showed a ~4% speedup overall.

Microbenchmarks on uncompressed parquet show scans only doing
dictionary decoding on uncompressed Parquet is ~75% faster:

   set mt_dop=1;
   select min(l_returnflag) from lineitem;

Testing:
We have alltypes agg with a mix of null and non-null.

Many tables have long runs of non-null values.

Added a test with long runs of null values.

Extended dict test to test longer runs.

TODO:
* Run code coverage build to confirm all code paths are tested
* Large chars table
* Coverage for timestamp validation with nulls and non-nulls
* Materialise pos slot in nested types

Change-Id: I8c03006981c46ef0dae30602f2b73c253d9b49ef
---
 be/src/exec/hdfs-parquet-scanner.cc                |   1 +
 be/src/exec/parquet-column-readers.cc              | 259 +++++++++++++++++++--
 be/src/exec/parquet-column-readers.h               |  18 +-
 be/src/runtime/tuple.cc                            |   9 +
 be/src/runtime/tuple.h                             |   6 +
 be/src/util/bit-packing.h                          |  16 +-
 be/src/util/bit-packing.inline.h                   |  26 ++-
 be/src/util/bit-stream-utils.h                     |   5 +-
 be/src/util/bit-stream-utils.inline.h              |   4 +-
 be/src/util/dict-encoding.h                        |  93 ++++++++
 be/src/util/dict-test.cc                           | 105 +++++++--
 be/src/util/rle-encoding.h                         |  42 ++--
 .../functional/functional_schema_template.sql      |  37 +++
 .../datasets/functional/schema_constraints.csv     |   4 +-
 .../queries/QueryTest/scanners-many-nulls.test     |  47 ++++
 tests/query_test/test_scanners.py                  |   8 +
 16 files changed, 598 insertions(+), 82 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test

diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e3cac1c..fcb7570 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -347,6 +347,7 @@ Status HdfsParquetScanner::ProcessSplit() {
 }
 
 Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
+  DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
   if (scan_node_->optimize_parquet_count_star()) {
     // Populate the single slot with the Parquet num rows statistic.
     int64_t tuple_buf_size;
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 74603e7..703ff2b 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -84,15 +84,13 @@ const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
 
 Status ParquetLevelDecoder::Init(const string& filename,
     Encoding::type encoding, MemPool* cache_pool, int cache_size,
-    int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+    int max_level, uint8_t** data, int* data_size) {
   DCHECK(*data != nullptr);
   DCHECK_GE(*data_size, 0);
-  DCHECK_GE(num_buffered_values, 0);
   DCHECK_GT(cache_size, 0);
   cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
   encoding_ = encoding;
   max_level_ = max_level;
-  num_buffered_values_ = num_buffered_values;
   filename_ = filename;
   RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
 
@@ -162,6 +160,17 @@ inline int16_t ParquetLevelDecoder::ReadLevel() {
   return CacheGetNext();
 }
 
+int32_t ParquetLevelDecoder::NextRepeatedRunLength() {
+  if (CacheHasNext() || encoding_ != Encoding::RLE) return 0;
+  return rle_decoder_.NextNumRepeats();
+}
+
+uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) {
+  DCHECK(!CacheHasNext());
+  DCHECK_EQ(encoding_, Encoding::RLE);
+  return rle_decoder_.GetRepeatedValue(num_to_consume);
+}
+
 Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
   /// Fill the cache completely if there are enough values remaining.
   /// Otherwise don't try to read more values than are left.
@@ -169,7 +178,7 @@ Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
   if (max_level_ > 0) {
     if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) ||
           num_cached_levels_ < batch_size)) {
-      return Status(decoding_error_code_, num_buffered_values_, filename_);
+      return Status(decoding_error_code_, vals_remaining, filename_);
     }
   } else {
     // No levels to read, e.g., because the field is required. The cache was
@@ -276,6 +285,29 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
       int* RESTRICT num_values) RESTRICT;
 
+  /// Fast path for MaterializeValueBatch() that materializes values for a run of
+  /// repeated definition levels. Read up to 'max_values' values into 'tuple_mem',
+  /// returning the number of values materialised in 'num_values'.
+  bool MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
+      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem'.
+  bool ReadSlots(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when
+  /// conversion is needed.
+  bool ReadAndConvertSlots(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when
+  /// conversion is not needed.
+  bool ReadSlotsNoConversion(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' position values into a batch of tuples starting at 'tuple_mem'.
+  void ReadPositions(int64_t num_to_read, int tuple_size, uint8_t* tuple_mem) RESTRICT;
+
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
       DictDecoderBase** decoder) override {
     if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
@@ -318,6 +350,11 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   inline ALWAYS_INLINE bool DecodeValue(uint8_t** data, const uint8_t* data_end,
       InternalType* RESTRICT val) RESTRICT;
 
+  /// Decode multiple values into 'out_vals' with a stride of 'stride' bytes. Return
+  /// false and set 'parse_error_' if there is an error decoding any value.
+  inline ALWAYS_INLINE bool DecodeValues(
+      int64_t stride, int64_t count, uint8_t* RESTRICT out_vals) RESTRICT;
+
   /// Most column readers never require conversion, so we can avoid branches by
   /// returning constant false. Column readers for types that require conversion
   /// must specialize this function.
@@ -374,6 +411,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
   /// Query-global timezone used as local timezone when executing the query.
   const Timezone& local_time_zone_;
+
+  /// Allocated from parent_->perm_pool_ if NeedsConversion() is true and null otherwise.
+  uint8_t* conversion_buffer_ = nullptr;
 };
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
@@ -423,6 +463,17 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPag
     }
     RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
   }
+  // Allocate a temporary buffer to hold InternalType values if we need to convert
+  // before writing to the final slot.
+  if (NeedsConversionInline() && conversion_buffer_ == nullptr) {
+    int64_t buffer_size = sizeof(InternalType) * parent_->state_->batch_size();
+    conversion_buffer_ =
+        parent_->perm_pool_->TryAllocateAligned(buffer_size, alignof(InternalType));
+    if (conversion_buffer_ == nullptr) {
+      return parent_->perm_pool_->mem_tracker()->MemLimitExceeded(parent_->state_,
+          "Failed to allocate conversion buffer in Parquet scanner", buffer_size);
+    }
+  }
   // TODO: Perform filter selectivity checks here.
   return Status::OK();
 }
@@ -489,6 +540,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatc
       int vals_to_add = min(num_buffered_values_, max_values - val_count);
       val_count += vals_to_add;
       num_buffered_values_ -= vals_to_add;
+      DCHECK_GE(num_buffered_values_, 0);
       continue;
     }
     // Fill the rep level cache if needed. We are flattening out the fields of the
@@ -500,21 +552,29 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatc
       if (UNLIKELY(!parent_->parse_status_.ok())) return false;
     }
 
-    // Fill def level cache if needed.
-    if (!def_levels_.CacheHasNext()) {
-      // TODO: add a fast path here if there's a run of repeated values.
-      parent_->parse_status_.MergeStatus(
-          def_levels_.CacheNextBatch(num_buffered_values_));
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-    }
-
-    // Read data page and cached levels to materialize values.
+    const int remaining_val_capacity = max_values - val_count;
     uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
-    int remaining_val_capacity = max_values - val_count;
-    int ret_val_count = 0;
-    continue_execution = MaterializeValueBatch<IN_COLLECTION>(
-        remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-    val_count += ret_val_count;
+    if (def_levels_.NextRepeatedRunLength() > 0) {
+      // Fast path to materialize a run of values with the same definition level. This
+      // avoids checking for NULL/not-NULL for every value.
+      int ret_val_count = 0;
+      continue_execution = MaterializeValueBatchRepeatedDefLevel(remaining_val_capacity,
+          tuple_size, next_tuple, &ret_val_count);
+      val_count += ret_val_count;
+    } else {
+      // We don't have a repeated run - cache def levels and process value-by-value.
+      if (!def_levels_.CacheHasNext()) {
+        parent_->parse_status_.MergeStatus(
+            def_levels_.CacheNextBatch(num_buffered_values_));
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      }
+
+      // Read data page and cached levels to materialize values.
+      int ret_val_count = 0;
+      continue_execution = MaterializeValueBatch<IN_COLLECTION>(
+          remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+      val_count += ret_val_count;
+    }
     if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
       continue_execution &= ColReaderDebugAction(&val_count);
     }
@@ -535,6 +595,10 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeVa
   const int cache_start_idx = def_levels_.CacheCurrIdx();
   uint8_t* curr_tuple = tuple_mem;
   int val_count = 0;
+  // If the file is corrupt, we may have more cached def levels than values in the page.
+  // Don't attempt to consume more than 'num_buffered_values_' levels in that case to
+  // avoid problems.
+  max_values = min(max_values, num_buffered_values_);
   while (def_levels_.CacheHasNext() && val_count < max_values) {
     Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
     int def_level = def_levels_.CacheGetNext();
@@ -564,11 +628,65 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeVa
     ++val_count;
   }
   num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+  DCHECK_GE(num_buffered_values_, 0);
   *num_values = val_count;
   return true;
 }
 
+// Note that the structure of this function is very similar to MaterializeValueBatch()
+// above, except it is unrolled to operate on multiple values at a time.
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE,
+    MATERIALIZED>::MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
+    uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
+  DCHECK_GT(num_buffered_values_, 0);
+  if (pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
+  int32_t def_level_repeats = def_levels_.NextRepeatedRunLength();
+  DCHECK_GT(def_level_repeats, 0);
+  // Peek at the def level. We don't know how many we'll consume until after
+  // materializing the batch.
+  uint8_t def_level = def_levels_.GetRepeatedValue(0);
+  int32_t num_def_levels_to_consume = 0;
+
+  if (def_level < def_level_of_immediate_repeated_ancestor()) {
+    DCHECK_GT(max_rep_level_, 0) << "Only possible if in a collection.";
+    // A containing repeated field is empty or NULL. We don't need to return any values
+    // but need to advance any rep levels.
+    if (pos_slot_desc_ != nullptr) {
+      num_def_levels_to_consume =
+          min<uint32_t>(def_level_repeats, rep_levels_.CacheRemaining());
+      rep_levels_.CacheSkipLevels(num_def_levels_to_consume);
+    } else {
+      num_def_levels_to_consume = def_level_repeats;
+    }
+    *num_values = 0;
+  } else {
+    // Cannot consume more levels than allowed by buffered input values and output space.
+    num_def_levels_to_consume = min(num_buffered_values_,
+          min(max_values, def_level_repeats));
+    if (pos_slot_desc_ != nullptr) {
+      ReadPositions(num_def_levels_to_consume, tuple_size, tuple_mem);
+    }
+    if (MATERIALIZED) {
+      if (def_level >= max_def_level()) {
+        if (!ReadSlots(num_def_levels_to_consume, tuple_size, tuple_mem)) {
+          return false;
+        }
+      } else {
+        Tuple::SetNullIndicators(
+            null_indicator_offset_, num_def_levels_to_consume, tuple_size, tuple_mem);
+      }
+    }
+    *num_values = num_def_levels_to_consume;
+  }
+  // We now know how many we actually consumed.
+  def_levels_.GetRepeatedValue(num_def_levels_to_consume);
+  num_buffered_values_ -= num_def_levels_to_consume;
+  DCHECK_GE(num_buffered_values_, 0);
+  return true;
+}
+
+template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 template <bool IN_COLLECTION>
 bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
     int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
@@ -618,6 +736,68 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlots(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  return NeedsConversionInline() ?
+      ReadAndConvertSlots(num_to_read, tuple_size, tuple_mem) :
+      ReadSlotsNoConversion(num_to_read, tuple_size, tuple_mem);
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadAndConvertSlots(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(NeedsConversionInline());
+  DCHECK(conversion_buffer_ != nullptr);
+  // Decode into the conversion buffer before doing the conversion into the output tuples.
+  if (!DecodeValues(sizeof(InternalType), num_to_read, conversion_buffer_)) {
+    return false;
+  }
+
+  InternalType* curr_val = reinterpret_cast<InternalType*>(conversion_buffer_);
+  uint8_t* curr_tuple = tuple_mem;
+  for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple += tuple_size) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+    if (NeedsValidationInline()
+        && UNLIKELY(!ValidateValue(curr_val))) {
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      // The value is invalid but execution should continue - set the null indicator and
+      // skip conversion.
+      tuple->SetNull(null_indicator_offset_);
+      continue;
+    }
+    if (UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
+      return false;
+    }
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlotsNoConversion(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(!NeedsConversionInline());
+  // No conversion needed - decode directly into the output slots.
+  if (!DecodeValues(tuple_size, num_to_read, tuple_mem + tuple_offset_)) {
+    return false;
+  }
+  if (NeedsValidationInline()) {
+    // Validate the written slots.
+    uint8_t* curr_tuple = tuple_mem;
+    for (int64_t i = 0; i < num_to_read; ++i, curr_tuple += tuple_size) {
+      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+      InternalType* val = static_cast<InternalType*>(tuple->GetSlot(tuple_offset_));
+      if (UNLIKELY(!ValidateValue(val))) {
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+        // The value is invalid but execution should continue - set the null indicator and
+        // skip conversion.
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 template <Encoding::type ENCODING>
 bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
     uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
@@ -642,6 +822,30 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues(
+    int64_t stride, int64_t count, uint8_t* RESTRICT out_vals) RESTRICT {
+  InternalType* first_slot = reinterpret_cast<InternalType*>(out_vals);
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    if (UNLIKELY(!dict_decoder_.GetNextValues(first_slot, stride, count))) {
+      SetDictDecodeError();
+      return false;
+    }
+  } else {
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    uint8_t* curr_val = out_vals;
+    // Hoist data_ into a local variable to prevent GCC from storing it every loop
+    // iteration.
+    uint8_t* data = data_;
+    for (int64_t i = 0; i < count; ++i, curr_val += stride) {
+      InternalType* val = reinterpret_cast<InternalType*>(curr_val);
+      if (UNLIKELY(!DecodeValue<Encoding::PLAIN>(&data, data_end_, val))) return false;
+    }
+    data_ = data;
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
     ::ReadPositionBatched(int16_t rep_level, int64_t* pos) {
   // Reset position counter if we are at the start of a new parent collection.
@@ -649,7 +853,18 @@ void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
   *pos = pos_current_value_++;
 }
 
-template <>
+template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  int pos_slot_offset = pos_slot_desc()->tuple_offset();
+  for (int64_t i = 0; i < num_to_read; ++i, tuple_mem += tuple_size) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem);
+    ReadPositionBatched(rep_levels_.CacheGetNext(),
+        reinterpret_cast<int64_t*>(tuple->GetSlot(pos_slot_offset)));
+  }
+}
+
+template<>
 inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
 ::NeedsConversionInline() const {
   return needs_conversion_;
@@ -1324,14 +1539,13 @@ Status BaseScalarColumnReader::ReadDataPage() {
     RETURN_IF_ERROR(rep_levels_.Init(filename(),
         current_page_header_.data_page_header.repetition_level_encoding,
         parent_->perm_pool_.get(), parent_->state_->batch_size(),
-        max_rep_level(), num_buffered_values_,
-        &data_, &data_size));
+        max_rep_level(), &data_, &data_size));
 
     // Initialize the definition level data
     RETURN_IF_ERROR(def_levels_.Init(filename(),
         current_page_header_.data_page_header.definition_level_encoding,
         parent_->perm_pool_.get(), parent_->state_->batch_size(),
-        max_def_level(), num_buffered_values_, &data_, &data_size));
+        max_def_level(), &data_, &data_size));
 
     // Data can be empty if the column contains all NULLs
     RETURN_IF_ERROR(InitDataPage(data_, data_size));
@@ -1361,6 +1575,7 @@ bool BaseScalarColumnReader::NextLevels() {
     if (!NextPage()) return parent_->parse_status_.ok();
   }
   --num_buffered_values_;
+  DCHECK_GE(num_buffered_values_, 0);
 
   // Definition level is not present if column and any containing structs are required.
   def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 790bde4..b4fe024 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -36,9 +36,6 @@ class MemPool;
 /// Level values are unsigned 8-bit integers because we support a maximum nesting
 /// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
 /// populating the level cache (e.g., with RLE we can memset() repeated values).
-///
-/// TODO: expose whether we're in a run of repeated values so that callers can
-/// optimise for that case.
 class ParquetLevelDecoder {
  public:
   ParquetLevelDecoder(bool is_def_level_decoder)
@@ -50,13 +47,21 @@ class ParquetLevelDecoder {
   /// encoding requires reading metadata from the page header. 'cache_size' will be
   /// rounded up to a multiple of 32 internally.
   Status Init(const string& filename, parquet::Encoding::type encoding,
-      MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values,
-      uint8_t** data, int* data_size);
+      MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size);
 
   /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
   /// as batched methods.
   inline int16_t ReadLevel();
 
+  /// If the next value is part of a repeated run and is not cached, return the length
+  /// of the repeated run. Otherwise return 0.
+  int32_t NextRepeatedRunLength();
+
+  /// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and consume
+  /// 'num_to_consume' items in the run. Not valid to call if there are cached levels
+  /// that have not been consumed.
+  uint8_t GetRepeatedValue(uint32_t num_to_consume);
+
   /// Decodes and caches the next batch of levels given that there are 'vals_remaining'
   /// values left to decode in the page. Resets members associated with the cache.
   /// Returns a non-ok status if there was a problem decoding a level, if a level was
@@ -116,9 +121,6 @@ class ParquetLevelDecoder {
   /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
   int cache_size_ = 0;
 
-  /// Number of remaining data values in the current data page.
-  int num_buffered_values_ = 0;
-
   /// Name of the parquet file. Used for reporting level decoding errors.
   string filename_;
 
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index 0061419..d2159a7 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -205,6 +205,15 @@ void Tuple::ConvertOffsetsToPointers(const TupleDescriptor& desc, uint8_t* tuple
   }
 }
 
+void Tuple::SetNullIndicators(const NullIndicatorOffset& offset,
+      int64_t num_tuples, int64_t tuple_stride, uint8_t* tuple_mem) {
+  NullIndicatorOffset local_offset = offset;
+  for (int64_t i = 0; i < num_tuples; ++i) {
+    reinterpret_cast<Tuple*>(tuple_mem)->SetNull(local_offset);
+    tuple_mem += tuple_stride;
+  }
+}
+
 template <bool COLLECT_STRING_VALS, bool NO_POOL>
 void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc,
     ScalarExprEvaluator* const* evals, MemPool* pool,
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 91517f1..ad36f02 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -233,6 +233,12 @@ class Tuple {
     return (*null_indicator_byte & offset.bit_mask) != 0;
   }
 
+  /// Set the null indicators on 'num_tuples' tuples. The first tuple is stored at
+  /// 'tuple_mem' and subsequent tuples must be stored at a stride of 'tuple_stride'
+  /// bytes.
+  static void SetNullIndicators(const NullIndicatorOffset& offset,
+      int64_t num_tuples, int64_t tuple_stride, uint8_t* tuple_mem);
+
   void* GetSlot(int offset) {
     DCHECK(offset != -1); // -1 offset indicates non-materialized slot
     return reinterpret_cast<char*>(this) + offset;
diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h
index 05036db..38b39e2 100644
--- a/be/src/util/bit-packing.h
+++ b/be/src/util/bit-packing.h
@@ -68,19 +68,20 @@ class BitPacking {
       int64_t in_bytes, int64_t num_values, OutType* __restrict__ out);
 
   /// Unpack values as above, treating them as unsigned integers, and decode them
-  /// using the provided dict. Sets 'decode_error' to true if one of the packed
-  /// values was greater than 'dict_len'. Does not modify 'decode_error' on success.
+  /// using the provided dict. Writes them to 'out' with a stride of 'stride' bytes.
+  /// Sets 'decode_error' to true if one of the packed values was greater than 'dict_len'.
+  /// Does not modify 'decode_error' on success.
   template <typename OutType>
   static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(int bit_width,
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
       bool* __restrict__ decode_error);
 
   /// Same as above, templated by BIT_WIDTH.
   template <typename OutType, int BIT_WIDTH>
   static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-      int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+      int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
       bool* __restrict__ decode_error);
 
   /// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must point to
@@ -100,13 +101,14 @@ class BitPacking {
   template <typename OutType>
   static const uint8_t* UnpackAndDecode32Values(int bit_width,
       const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-      int64_t dict_len, OutType* __restrict__ out, bool* __restrict__ decode_error);
+      int64_t dict_len, OutType* __restrict__ out, int64_t stride,
+      bool* __restrict__ decode_error);
 
   /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH.
   template <typename OutType, int BIT_WIDTH>
   static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
-      OutType* __restrict__ out, bool* __restrict__ decode_error);
+      OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error);
 
   /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 'out'.
   /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of addressable
@@ -121,7 +123,7 @@ class BitPacking {
   template <typename OutType, int BIT_WIDTH>
   static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
-      OutType* __restrict__ out, bool* __restrict__ decode_error);
+      OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error);
 
  private:
   /// Compute the number of values with the given bit width that can be unpacked from
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 9a43644..6fa31cc 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -90,13 +90,13 @@ std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(
 template <typename OutType>
 std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int bit_width,
     const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
     bool* __restrict__ decode_error) {
 #pragma push_macro("UNPACK_VALUES_CASE")
 #define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
   case i:                                       \
     return UnpackAndDecodeValues<OutType, i>(   \
-        in, in_bytes, dict, dict_len, num_values, out, decode_error);
+        in, in_bytes, dict, dict_len, num_values, out, stride, decode_error);
 
   switch (bit_width) {
     // Expand cases from 0 to 32.
@@ -111,25 +111,27 @@ std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int bit_wid
 template <typename OutType, int BIT_WIDTH>
 std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(
     const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
-    int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+    int64_t dict_len, int64_t num_values, OutType* __restrict__ out, int64_t stride,
     bool* __restrict__ decode_error) {
   constexpr int BATCH_SIZE = 32;
   const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values);
   const int64_t batches_to_read = values_to_read / BATCH_SIZE;
   const int64_t remainder_values = values_to_read % BATCH_SIZE;
   const uint8_t* in_pos = in;
-  OutType* out_pos = out;
+  uint8_t* out_pos = reinterpret_cast<uint8_t*>(out);
   // First unpack as many full batches as possible.
   for (int64_t i = 0; i < batches_to_read; ++i) {
     in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>(
-        in_pos, in_bytes, dict, dict_len, out_pos, decode_error);
-    out_pos += BATCH_SIZE;
+        in_pos, in_bytes, dict, dict_len, reinterpret_cast<OutType*>(out_pos), stride,
+        decode_error);
+    out_pos += stride * BATCH_SIZE;
     in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
   }
   // Then unpack the final partial batch.
   if (remainder_values > 0) {
     in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>(
-        in_pos, in_bytes, dict, dict_len, remainder_values, out_pos, decode_error);
+        in_pos, in_bytes, dict, dict_len, remainder_values,
+        reinterpret_cast<OutType*>(out_pos), stride, decode_error);
   }
   return std::make_pair(in_pos, values_to_read);
 }
@@ -237,7 +239,7 @@ const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restri
 template <typename OutType, int BIT_WIDTH>
 const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ in,
     int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
-    OutType* __restrict__ out, bool* __restrict__ decode_error) {
+    OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
   constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
@@ -250,7 +252,8 @@ const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ i
 #define DECODE_VALUE_CALL(ignore1, i, ignore2)               \
   {                                                          \
     uint32_t idx = UnpackValue<BIT_WIDTH, i>(in);            \
-    DecodeValue(dict, dict_len, idx, &out[i], decode_error); \
+    uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + i * stride; \
+    DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), decode_error); \
   }
 
   BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore);
@@ -301,7 +304,7 @@ const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in,
 template <typename OutType, int BIT_WIDTH>
 const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
       int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
-      OutType* __restrict__ out, bool* __restrict__ decode_error) {
+      OutType* __restrict__ out, int64_t stride, bool* __restrict__ decode_error) {
   static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
   static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
   constexpr int MAX_BATCH_SIZE = 31;
@@ -326,7 +329,8 @@ const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict
 #define DECODE_VALUES_CASE(ignore1, i, ignore2)                   \
   case 31 - i: {                                                  \
     uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i>(in_buffer);     \
-    DecodeValue(dict, dict_len, idx, &out[30 - i], decode_error); \
+    uint8_t* out_pos = reinterpret_cast<uint8_t*>(out) + (30 - i) * stride; \
+    DecodeValue(dict, dict_len, idx, reinterpret_cast<OutType*>(out_pos), decode_error); \
   }
 
   // Use switch with fall-through cases to minimise branching.
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index 67f1a00..2276e4f 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -136,10 +136,11 @@ class BatchedBitReader {
   /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the
   /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is
   /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'.
-  /// Otherwise returns the number of values decoded.
+  /// Otherwise returns the number of values decoded. The values are written to 'v' with
+  /// a stride of 'stride' bytes.
   template<typename T>
   int UnpackAndDecodeBatch(
-      int bit_width, T* dict, int64_t dict_len, int num_values, T* v);
+      int bit_width, T* dict, int64_t dict_len, int num_values, T* v, int64_t stride);
 
   /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores it in 'v'. T
   /// needs to be a little-endian native type and big enough to store 'num_bytes'.
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index 371a81d..b6ccdbb 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -104,7 +104,7 @@ inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
 
 template<typename T>
 inline int BatchedBitReader::UnpackAndDecodeBatch(
-      int bit_width, T* dict, int64_t dict_len, int num_values, T* v){
+    int bit_width, T* dict, int64_t dict_len, int num_values, T* v, int64_t stride) {
   DCHECK(buffer_pos_ != nullptr);
   DCHECK_GE(bit_width, 0);
   DCHECK_LE(bit_width, MAX_BITWIDTH);
@@ -114,7 +114,7 @@ inline int BatchedBitReader::UnpackAndDecodeBatch(
   int64_t num_read;
   bool decode_error = false;
   std::tie(new_buffer_pos, num_read) = BitPacking::UnpackAndDecodeValues(bit_width,
-      buffer_pos_, bytes_left(), dict, dict_len, num_values, v, &decode_error);
+      buffer_pos_, bytes_left(), dict, dict_len, num_values, v, stride, &decode_error);
   if (UNLIKELY(decode_error)) return -1;
   buffer_pos_ = new_buffer_pos;
   DCHECK_LE(buffer_pos_, buffer_end_);
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index a76d9b5..d8b0384 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -348,6 +348,11 @@ class DictDecoder : public DictDecoderBase {
   /// the string data is from the dictionary buffer passed into the c'tor.
   bool GetNextValue(T* value) WARN_UNUSED_RESULT;
 
+  /// Batched version of GetNextValue(). Reads the next 'count' values into
+  /// 'first_values'. Returns false if the data was invalid and 'count' values could not
+  /// be successfully read. 'stride' is the stride in bytes between each subsequent value.
+  bool GetNextValues(T* first_value, int64_t stride, int count) WARN_UNUSED_RESULT;
+
   /// This function returns the size in bytes of the dictionary vector.
   /// It is used by dict-test.cc for validation of bytes consumed against
   /// memory tracked.
@@ -365,6 +370,11 @@ class DictDecoder : public DictDecoderBase {
   /// 'next_literal_idx_'.
   T decoded_values_[DICT_DECODER_BUFFER_SIZE];
 
+  /// Copy as many as possible literal values, up to 'max_to_copy' from 'decoded_values_'
+  /// to '*output' spaced 'stride' bytes apart. Return the number copied and advance
+  /// '*output'.
+  uint32_t CopyLiteralsToOutput(uint32_t max_to_copy, uint8_t** output, int64_t stride);
+
   /// Slow path for GetNextValue() where we need to decode new values. Should not be
   /// inlined everywhere.
   bool DecodeNextValue(T* value);
@@ -448,6 +458,89 @@ ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValue(T* value) {
 }
 
 template <typename T>
+ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValues(
+    T* first_value, int64_t stride, int count) {
+  DCHECK_GE(count, 0);
+  uint8_t* curr_value = reinterpret_cast<uint8_t*>(first_value);
+  if (num_repeats_ > 0) {
+    // Consume any already-decoded repeated value.
+    int num_to_copy = std::min<uint32_t>(num_repeats_, count);
+    T repeated_val = decoded_values_[0];
+    for (int i = 0; i < num_to_copy; ++i) {
+      memcpy(curr_value, &repeated_val, sizeof(T));
+      curr_value += stride;
+    }
+    count -= num_to_copy;
+    num_repeats_ -= num_to_copy;
+  } else if (next_literal_idx_ < num_literal_values_) {
+    // Consume any already-decoded literal values.
+    count -= CopyLiteralsToOutput(count, &curr_value, stride);
+  }
+  DCHECK_GE(count, 0);
+  while (count > 0) {
+    uint32_t num_repeats = data_decoder_.NextNumRepeats();
+    if (num_repeats > 0) {
+      // Decode repeats directly to the output.
+      uint32_t num_repeats_to_consume = std::min<uint32_t>(num_repeats, count);
+      uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats_to_consume);
+      if (UNLIKELY(idx >= dict_.size())) return false;
+      T repeated_val = dict_[idx];
+      for (int i = 0; i < num_repeats_to_consume; ++i) {
+        memcpy(curr_value, &repeated_val, sizeof(T));
+        curr_value += stride;
+      }
+      count -= num_repeats_to_consume;
+    } else {
+      // Decode as many literals as possible directly to the output, buffer the rest.
+      uint32_t num_literals = data_decoder_.NextNumLiterals();
+      if (UNLIKELY(num_literals == 0)) return false;
+      // Case 1: decode the whole literal run directly to the output.
+      // Case 2: decode none or some of the run to the output, buffer some remaining.
+      if (count >= num_literals) { // Case 1
+        if (UNLIKELY(!data_decoder_.DecodeLiteralValues(num_literals, dict_.data(),
+              dict_.size(), reinterpret_cast<T*>(curr_value), stride))) {
+          return false;
+        }
+        count -= num_literals;
+        curr_value += stride * num_literals;
+      } else { // Case 2
+        uint32_t num_to_decode = BitUtil::RoundDown(count, 32);
+        if (UNLIKELY(!data_decoder_.DecodeLiteralValues(num_to_decode, dict_.data(),
+              dict_.size(), reinterpret_cast<T*>(curr_value), stride))) {
+          return false;
+        }
+        curr_value += stride * num_to_decode;
+        count -= num_to_decode;
+        DCHECK_GE(count, 0);
+        if (count > 0) {
+          if (UNLIKELY(!DecodeNextValue(reinterpret_cast<T*>(curr_value)))) return false;
+          curr_value += stride;
+          --count;
+          // Consume any already-decoded literal values.
+          count -= CopyLiteralsToOutput(count, &curr_value, stride);
+        }
+        return true;
+      }
+    }
+  }
+  return true;
+}
+
+template <typename T>
+uint32_t DictDecoder<T>::CopyLiteralsToOutput(
+    uint32_t max_to_copy, uint8_t** output, int64_t stride) {
+  uint32_t num_to_copy =
+      std::min<uint32_t>(num_literal_values_ - next_literal_idx_, max_to_copy);
+  uint8_t* curr_value = *output;
+  for (int i = 0; i < num_to_copy; ++i) {
+    memcpy(curr_value, &decoded_values_[next_literal_idx_++], sizeof(T));
+    curr_value += stride;
+  }
+  *output = curr_value;
+  return num_to_copy;
+}
+
+template <typename T>
 bool DictDecoder<T>::DecodeNextValue(T* value) {
   // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
   // byte aligned for Decimal16Values.
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index cf438f6..f984103 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -25,14 +25,38 @@
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
 #include "util/dict-encoding.h"
 
 #include "common/names.h"
 
+using std::mt19937;
+using std::uniform_int_distribution;
+
 namespace impala {
+class DictTest : public ::testing::Test {
+ public:
+  virtual void SetUp() { RandTestUtil::SeedRng("DICT_TEST_SEED", &rng_); }
+
+  virtual void TearDown() {}
+
+ protected:
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  void ValidateDict(const vector<InternalType>& values,
+      const vector<InternalType>& dict_values, int fixed_buffer_byte_size);
+
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  void TestNumbers(int max_value, int repeat, int value_byte_size);
+
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  void TestNumbers(int value_byte_size);
 
-template<typename InternalType, parquet::Type::type PARQUET_TYPE>
-void ValidateDict(const vector<InternalType>& values,
+  /// Per-test random number generator. Seeded before every test.
+  mt19937 rng_;
+};
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void DictTest::ValidateDict(const vector<InternalType>& values,
     const vector<InternalType>& dict_values, int fixed_buffer_byte_size) {
   set<InternalType> values_set(values.begin(), values.end());
 
@@ -78,10 +102,32 @@ void ValidateDict(const vector<InternalType>& values,
     ASSERT_TRUE(decoder.GetNextValue(&j));
     EXPECT_EQ(i, j);
   }
+
+  // Test reading via batched interface.
+  for (int stride : {1, 2, 3}) {
+    for (int attempt = 0; attempt < 10; ++attempt) {
+      ASSERT_OK(decoder.SetData(data_buffer, data_len));
+      int num_left = values.size();
+      while (num_left > 0) {
+        // Read a random fraction of the remaining data in the dictionary.
+        int num_to_read = uniform_int_distribution<int>(1, num_left)(rng_);
+        LOG(INFO) << "stride=" << stride << " attempt=" << attempt
+                   << " num_to_read=" << num_to_read;
+        vector<InternalType> output(num_to_read * stride);
+        int byte_stride = stride * sizeof(InternalType);
+        ASSERT_TRUE(decoder.GetNextValues(output.data(), byte_stride, num_to_read));
+        for (int i = 0; i < num_to_read; ++i) {
+          ASSERT_EQ(values[values.size() - num_left + i], output[i * stride]);
+        }
+        num_left -= num_to_read;
+      }
+    }
+  }
+
   pool.FreeAll();
 }
 
-TEST(DictTest, TestStrings) {
+TEST_F(DictTest, TestStrings) {
   StringValue sv1("hello world");
   StringValue sv2("foo");
   StringValue sv3("bar");
@@ -109,7 +155,7 @@ TEST(DictTest, TestStrings) {
   ValidateDict<StringValue, parquet::Type::BYTE_ARRAY>(values, dict_values, -1);
 }
 
-TEST(DictTest, TestTimestamps) {
+TEST_F(DictTest, TestTimestamps) {
   TimestampValue tv1 = TimestampValue::Parse("2011-01-01 09:01:01");
   TimestampValue tv2 = TimestampValue::Parse("2012-01-01 09:01:01");
   TimestampValue tv3 = TimestampValue::Parse("2011-01-01 09:01:02");
@@ -136,10 +182,13 @@ void IncrementValue(InternalType* t) { ++(*t); }
 
 template <> void IncrementValue(Decimal4Value* t) { ++(t->value()); }
 template <> void IncrementValue(Decimal8Value* t) { ++(t->value()); }
-template <> void IncrementValue(Decimal16Value* t) { ++(t->value()); }
+template <>
+void IncrementValue(Decimal16Value* t) {
+  ++(t->value());
+}
 
-template<typename InternalType, parquet::Type::type PARQUET_TYPE>
-void TestNumbers(int max_value, int repeat, int value_byte_size) {
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void DictTest::TestNumbers(int max_value, int repeat, int value_byte_size) {
   vector<InternalType> values;
   vector<InternalType> dict_values;
   for (InternalType val = 0; val < max_value; IncrementValue(&val)) {
@@ -152,15 +201,15 @@ void TestNumbers(int max_value, int repeat, int value_byte_size) {
   ValidateDict<InternalType, PARQUET_TYPE>(values, dict_values, value_byte_size);
 }
 
-template<typename InternalType, parquet::Type::type PARQUET_TYPE>
-void TestNumbers(int value_byte_size) {
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void DictTest::TestNumbers(int value_byte_size) {
   TestNumbers<InternalType, PARQUET_TYPE>(100, 1, value_byte_size);
   TestNumbers<InternalType, PARQUET_TYPE>(1, 100, value_byte_size);
   TestNumbers<InternalType, PARQUET_TYPE>(1, 1, value_byte_size);
   TestNumbers<InternalType, PARQUET_TYPE>(1, 2, value_byte_size);
 }
 
-TEST(DictTest, TestNumbers) {
+TEST_F(DictTest, TestNumbers) {
   TestNumbers<int8_t, parquet::Type::INT32>(
       ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT)));
   TestNumbers<int16_t, parquet::Type::INT32>(
@@ -181,7 +230,31 @@ TEST(DictTest, TestNumbers) {
   }
 }
 
-TEST(DictTest, TestInvalidStrings) {
+TEST_F(DictTest, TestRandomIntegers) {
+  // Test longer, random sequences of encoding integers with a variety of run lengths.
+  const int NUM_RUNS = 10000;
+  for (int num_distinct_values : {2, 100, 40000}) {
+    LOG(INFO) << "num_distinct_values: " << num_distinct_values;
+    vector<int32_t> values;
+    vector<int32_t> dict_values;
+    set<int32_t> dict_values_set;
+    for (int i = 0; i < NUM_RUNS; ++i) {
+      int32_t value = uniform_int_distribution<int>(1, num_distinct_values)(rng_);
+      if (dict_values_set.find(value) == dict_values_set.end()) {
+        // Add any newly-generated values to the dictionary.
+        dict_values.push_back(value);
+        dict_values_set.insert(value);
+      }
+      // Most values are single non-repeated values.
+      bool repeated_run = uniform_int_distribution<int>(0, 10)(rng_) == 0;
+      int run_len = repeated_run ? uniform_int_distribution<int>(1, 100)(rng_) : 1;
+      for (int j = 0; j < run_len; ++j) values.push_back(value);
+    }
+    ValidateDict<int32_t, parquet::Type::INT32>(values, dict_values, sizeof(int32_t));
+  }
+}
+
+TEST_F(DictTest, TestInvalidStrings) {
   uint8_t buffer[sizeof(int32_t) + 10];
   int32_t len = -10;
   memcpy(buffer, &len, sizeof(int32_t));
@@ -190,11 +263,11 @@ TEST(DictTest, TestInvalidStrings) {
   // the decoder should fail.
   MemTracker tracker;
   DictDecoder<StringValue> decoder(&tracker);
-  ASSERT_FALSE(decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer),
-      0));
+  ASSERT_FALSE(
+      decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer), 0));
 }
 
-TEST(DictTest, TestStringBufferOverrun) {
+TEST_F(DictTest, TestStringBufferOverrun) {
   // Test string length past end of buffer.
   uint8_t buffer[sizeof(int32_t) + 10];
   int32_t len = 100;
@@ -211,7 +284,7 @@ TEST(DictTest, TestStringBufferOverrun) {
 // Make sure that SetData() resets the dictionary decoder, including the embedded RLE
 // decoder to a clean state, even if the input is not fully consumed. The RLE decoder
 // has various state that needs to be reset.
-TEST(DictTest, SetDataAfterPartialRead) {
+TEST_F(DictTest, SetDataAfterPartialRead) {
   int bytes_alloc = 0;
   MemTracker tracker;
   MemTracker track_encoder;
@@ -252,7 +325,7 @@ TEST(DictTest, SetDataAfterPartialRead) {
 }
 
 // Test handling of decode errors from out-of-range values.
-TEST(DictTest, DecodeErrors) {
+TEST_F(DictTest, DecodeErrors) {
   int bytes_alloc = 0;
   MemTracker tracker;
   MemTracker track_encoder;
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index babb0fd..278dc01 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -104,7 +104,8 @@ class RleBatchDecoder {
 
   /// Get the value of the current repeated run and consume the given number of repeats.
   /// Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
-  /// be greater than the remaining number of repeats in the run.
+  /// be greater than the remaining number of repeats in the run. 'num_repeats_to_consume'
+  /// can be set to 0 to peek at the value without consuming repeats.
   T GetRepeatedValue(int32_t num_repeats_to_consume);
 
   /// Return the size of the current literal run. Returns zero if the current run is
@@ -119,7 +120,8 @@ class RleBatchDecoder {
   bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
 
   /// Consume 'num_literals_to_consume' literals from the current literal run,
-  /// decoding them using 'dict' and outputting them to 'values'.
+  /// decoding them using 'dict' and outputting them to 'values' with a stride of
+  /// 'stride' bytes. 'stride' defaults to sizeof(OutType).
   /// 'num_literals_to_consume' must be <= NextNumLiterals(). Returns true if
   /// the requested number of literals were successfully read or false if an error
   /// was encountered, e.g. the input was truncated or the value was not present
@@ -127,6 +129,9 @@ class RleBatchDecoder {
   /// to read from a new buffer.
   template <typename OutType>
   bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict,
+      int64_t dict_len, OutType* values, int64_t stride) WARN_UNUSED_RESULT;
+  template <typename OutType>
+  bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict,
       int64_t dict_len, OutType* values) WARN_UNUSED_RESULT;
 
   /// Convenience method to get the next value. Not efficient. Returns true on success
@@ -188,8 +193,8 @@ class RleBatchDecoder {
   /// 'literal_count_'. Returns the number of literals outputted or 0 if a
   /// decoding error is encountered.
   template <typename OutType>
-  int32_t DecodeBufferedLiterals(
-      int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values);
+  int32_t DecodeBufferedLiterals(int32_t max_to_output, OutType* dict, int64_t dict_len,
+      OutType* values, int64_t stride);
 };
 
 /// Class to incrementally build the rle data.   This class does not allocate any memory.
@@ -494,7 +499,7 @@ inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
 
 template <typename T>
 inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
-  DCHECK_GT(num_repeats_to_consume, 0);
+  DCHECK_GE(num_repeats_to_consume, 0);
   DCHECK_GE(repeat_count_, num_repeats_to_consume);
   repeat_count_ -= num_repeats_to_consume;
   return repeated_value_;
@@ -548,13 +553,21 @@ template <typename T>
 template <typename OutType>
 inline bool RleBatchDecoder<T>::DecodeLiteralValues(
     int32_t num_literals_to_consume, OutType* dict, int64_t dict_len, OutType* values) {
+  return DecodeLiteralValues(
+      num_literals_to_consume, dict, dict_len, values, sizeof(OutType));
+}
+
+template <typename T>
+template <typename OutType>
+inline bool RleBatchDecoder<T>::DecodeLiteralValues(int32_t num_literals_to_consume,
+    OutType* dict, int64_t dict_len, OutType* values, int64_t stride) {
   DCHECK_GE(num_literals_to_consume, 0);
   DCHECK_GE(literal_count_, num_literals_to_consume);
   int32_t num_consumed = 0;
   // Decode any buffered literals left over from previous calls.
   if (HaveBufferedLiterals()) {
     num_consumed =
-        DecodeBufferedLiterals(num_literals_to_consume, dict, dict_len, values);
+        DecodeBufferedLiterals(num_literals_to_consume, dict, dict_len, values, stride);
     if (UNLIKELY(num_consumed == 0)) return false;
   }
 
@@ -565,8 +578,9 @@ inline bool RleBatchDecoder<T>::DecodeLiteralValues(
   int32_t num_to_bypass =
       std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
   if (num_to_bypass > 0) {
-    int num_read = bit_reader_.UnpackAndDecodeBatch(
-        bit_width_, dict, dict_len, num_to_bypass, values + num_consumed);
+    uint8_t* values_pos = reinterpret_cast<uint8_t*>(values) + num_consumed * stride;
+    int num_read = bit_reader_.UnpackAndDecodeBatch(bit_width_, dict, dict_len,
+        num_to_bypass, reinterpret_cast<OutType*>(values_pos), stride);
     // If we couldn't read the expected number, that means the input was truncated.
     if (num_read < num_to_bypass) return false;
     literal_count_ -= num_to_bypass;
@@ -578,8 +592,9 @@ inline bool RleBatchDecoder<T>::DecodeLiteralValues(
     // We weren't able to copy all the literals requested directly from the input.
     // Buffer literals and copy over the requested number.
     if (UNLIKELY(!FillLiteralBuffer())) return false;
-    int32_t num_copied =
-        DecodeBufferedLiterals(num_remaining, dict, dict_len, values + num_consumed);
+    uint8_t* values_pos = reinterpret_cast<uint8_t*>(values) + num_consumed * stride;
+    int32_t num_copied = DecodeBufferedLiterals(
+        num_remaining, dict, dict_len, reinterpret_cast<OutType*>(values_pos), stride);
     if (UNLIKELY(num_copied == 0)) return false;
     DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals";
   }
@@ -655,14 +670,15 @@ inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(
 
 template <typename T>
 template <typename OutType>
-inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(
-    int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values) {
+inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(int32_t max_to_output,
+    OutType* dict, int64_t dict_len, OutType* values, int64_t stride) {
   int32_t num_to_output =
       std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
   for (int32_t i = 0; i < num_to_output; ++i) {
     T idx = literal_buffer_[literal_buffer_pos_ + i];
     if (UNLIKELY(idx < 0 || idx >= dict_len)) return 0;
-    memcpy(&values[i], &dict[idx], sizeof(OutType));
+    uint8_t* values_pos = reinterpret_cast<uint8_t*>(values) + i * stride;
+    memcpy(reinterpret_cast<OutType*>(values_pos), &dict[idx], sizeof(OutType));
   }
   literal_buffer_pos_ += num_to_output;
   literal_count_ -= num_to_output;
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index cfd9030..a90f0be 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2194,3 +2194,40 @@ CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (i1 integer)
 STORED AS {file_format}
 TBLPROPERTIES('skip.header.line.count'='2');
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+manynulls
+---- COLUMNS
+id int
+nullcol int
+---- ALTER
+-- Ensure the nulls are clustered together.
+ALTER TABLE {table_name} SORT BY (id)
+---- CREATE_KUDU
+DROP VIEW IF EXISTS {db_name}{db_suffix}.{table_name};
+DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}_idx;
+
+CREATE TABLE {db_name}{db_suffix}.{table_name}_idx (
+  kudu_idx BIGINT PRIMARY KEY,
+  id INT,
+  nullcol INT NULL
+)
+PARTITION BY HASH (kudu_idx) PARTITIONS 3 STORED AS KUDU;
+CREATE VIEW {db_name}{db_suffix}.{table_name} AS
+SELECT id, nullcol
+FROM {db_name}{db_suffix}.{table_name}_idx;
+---- DEPENDENT_LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT id, nullcol
+FROM {db_name}.{table_name};
+---- DEPENDENT_LOAD_KUDU
+INSERT into TABLE {db_name}{db_suffix}.{table_name}_idx
+SELECT row_number() over (order by id),
+       id, nullcol
+FROM {db_name}.{table_name};
+---- LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT id, if((id div 500) % 2 = 0, NULL, id) as nullcol
+FROM functional.alltypesagg;
+====
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index baf0306..9e79a93 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -102,10 +102,11 @@ table_name:view_view, constraint:restrict_to, table_format:seq/snap/block
 table_name:subquery_view, constraint:restrict_to, table_format:seq/snap/block
 table_name:subquery_view, constraint:restrict_to, table_format:rc/none/none
 
-# liketbl and tblwithraggedcolumns all have
+# liketbl, tblwithraggedcolumns and manynulls all have
 # NULLs in primary key columns. hbase does not support
 # writing NULLs to primary key columns.
 table_name:liketbl, constraint:exclude, table_format:hbase/none/none
+table_name:manynulls, constraint:exclude, table_format:hbase/none/none
 table_name:tblwithraggedcolumns, constraint:exclude, table_format:hbase/none/none
 
 # Tables with only one column are not supported in hbase.
@@ -193,6 +194,7 @@ table_name:nulltable, constraint:only, table_format:kudu/none/none
 table_name:nullescapedtable, constraint:only, table_format:kudu/none/none
 table_name:decimal_tbl, constraint:only, table_format:kudu/none/none
 table_name:decimal_tiny, constraint:only, table_format:kudu/none/none
+table_name:manynulls, constraint:only, table_format:kudu/none/none
 
 # Skipping header lines is only effective with text tables
 table_name:table_with_header, constraint:restrict_to, table_format:text/none/none
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test b/testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test
new file mode 100644
index 0000000..a82af23
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners-many-nulls.test
@@ -0,0 +1,47 @@
+====
+---- QUERY
+# Test that we materialize the right number of nulls.
+select count(*),
+  count(id),
+  count(nullcol),
+  sum(nullcol)
+from manynulls
+---- RESULTS
+11000,11000,5500,28870000
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Spot check some values.
+select id, nullcol
+from manynulls
+where id >= 4490 and id <= 4510
+order by id
+---- RESULTS
+4490,NULL
+4490,NULL
+4491,NULL
+4492,NULL
+4493,NULL
+4494,NULL
+4495,NULL
+4496,NULL
+4497,NULL
+4498,NULL
+4499,NULL
+4500,4500
+4500,4500
+4501,4501
+4502,4502
+4503,4503
+4504,4504
+4505,4505
+4506,4506
+4507,4507
+4508,4508
+4509,4509
+4510,4510
+4510,4510
+---- TYPES
+INT,INT
+====
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 8ee0eb7..453ef06 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -92,6 +92,14 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
     new_vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action')
     self.run_test_case('QueryTest/scanners', new_vector)
 
+  def test_many_nulls(self, vector):
+    if vector.get_value('table_format').file_format == 'hbase':
+      # manynulls table not loaded for HBase
+      pytest.skip()
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
+    self.run_test_case('QueryTest/scanners-many-nulls', new_vector)
+
   def test_hdfs_scanner_profile(self, vector):
     if vector.get_value('table_format').file_format in ('kudu', 'hbase') or \
        vector.get_value('exec_option')['num_nodes'] != 0:
-- 
2.7.4

