diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h index ea8275a639..64bdb462ca 100644 --- a/be/src/runtime/sorter-internal.h +++ b/be/src/runtime/sorter-internal.h @@ -375,6 +375,7 @@ class Sorter::TupleIterator { /// arguments to avoid redundantly storing the same values in multiple iterators in /// perf-critical algorithms. void Next(Sorter::Run* run, int tuple_size); + void Seek(Sorter::Run* run, int tuple_size, int64_t index); /// The reverse of Next(). Can advance one before the first tuple in the run, but it /// is invalid to dereference 'tuple_' in that case. @@ -487,6 +488,10 @@ class Sorter::TupleSorter { /// query is cancelled. Status SortHelper(TupleIterator begin, TupleIterator end); + bool CheckSorted(TupleIterator begin, TupleIterator end); + + bool ProbeSorted(TupleIterator begin, TupleIterator end, int64_t nSkip); + /// Select a pivot to partition [begin, end). Tuple* SelectPivot(TupleIterator begin, TupleIterator end); diff --git a/be/src/runtime/sorter-ir.cc b/be/src/runtime/sorter-ir.cc index 4da6bdd6eb..21c13eaf34 100644 --- a/be/src/runtime/sorter-ir.cc +++ b/be/src/runtime/sorter-ir.cc @@ -55,6 +55,21 @@ void Sorter::TupleIterator::Next(Sorter::Run* run, int tuple_size) { if (UNLIKELY(index_ >= buffer_end_index_)) NextPage(run); } +void Sorter::TupleIterator::Seek(Sorter::Run* run, int tuple_size, int64_t index) { + DCHECK_LT(index, run->num_tuples()) << "Can only advance one past end of run"; + if(index < buffer_start_index_ || index >= buffer_end_index_) { + page_index_ = index / run->page_capacity_; + DCHECK_LT(page_index_, run->fixed_len_pages_.size()); + // If the last page is full, position to the last page instead + if(page_index_ == run->fixed_len_pages_.size()) --page_index_; + buffer_start_index_ = page_index_ * run->page_capacity_; + buffer_end_index_ = buffer_start_index_ + run->page_capacity_; + } + + index_ = index; + tuple_ = run->fixed_len_pages_[page_index_].data() + (tuple_size * (index_ - buffer_start_index_)); +} + void Sorter::TupleIterator::Prev(Sorter::Run* run, int tuple_size) { DCHECK_GE(index_, 0) << "Can only advance one before start of run"; tuple_ -= tuple_size; @@ -156,7 +171,41 @@ Status Sorter::TupleSorter::InsertionSort(const TupleIterator& begin, } +bool Sorter::TupleSorter::ProbeSorted(TupleIterator begin, TupleIterator end, + int64_t nSkip) { + Run* run = run_; + int tuple_size = tuple_size_; + for (;;) { + int64_t seekIndex = begin.index() + nSkip; + if(seekIndex + nSkip >= end.index()) { // Last probe in range + seekIndex = end.index() - 1; // Seek to last record instead + } + const Tuple* prev = begin.tuple(); + begin.Seek(run, tuple_size, seekIndex); + if (Less(begin.row(), reinterpret_cast(&prev))) return false; + if (seekIndex + 1 >= end.index()) return true; + } +} + +bool Sorter::TupleSorter::CheckSorted(TupleIterator begin, TupleIterator end) { + Run* run = run_; + int tuple_size = tuple_size_; + int64_t nRecs = end.index() - begin.index(); + if(nRecs <= 1) return true; + int64_t checkInterval = nRecs / 100; // 1% + if(checkInterval < 10) checkInterval = 10; + if (!ProbeSorted(begin, end, checkInterval)) return false; + + for (;;) { + const Tuple* prev = begin.tuple(); + begin.Next(run, tuple_size); + if (begin.index() >= end.index()) return true; + if (Less(begin.row(), reinterpret_cast(&prev))) return false; + } +} + Status Sorter::TupleSorter::SortHelper(TupleIterator begin, TupleIterator end) { + if (CheckSorted(begin, end)) return Status::OK(); // Use insertion sort for smaller sequences. while (end.index() - begin.index() > INSERTION_THRESHOLD) { // Select a pivot and call Partition() to split the tuples in [begin, end) into two