From 8b1f1ba8c708fbf17cb2d352f8a053848b070ce8 Mon Sep 17 00:00:00 2001
From: Tim Armstrong <tarmstrong@cloudera.com>
Date: Thu, 28 Jun 2018 15:14:22 -0700
Subject: [PATCH] Add start and end time to a bunch of plan nodes

TODO: set end time at end of subplan
---
 be/src/exec/blocking-join-node.cc         |  2 ++
 be/src/exec/exchange-node.cc              | 14 ++++++++--
 be/src/exec/exec-node.cc                  |  3 +++
 be/src/exec/exec-node.h                   | 43 +++++++++++++++++++++++++++++++
 be/src/exec/hdfs-scan-node-mt.cc          |  8 +++++-
 be/src/exec/hdfs-scan-node.cc             |  2 ++
 be/src/exec/nested-loop-join-node.cc      |  2 ++
 be/src/exec/partitioned-hash-join-node.cc |  2 ++
 be/src/exec/select-node.cc                |  2 ++
 be/src/exec/sort-node.cc                  |  5 ++++
 be/src/exec/topn-node.cc                  |  4 +++
 11 files changed, 84 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index e9281c6..36e7177 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -281,6 +281,7 @@ Status BlockingJoinNode::GetFirstProbeRow(RuntimeState* state) {
 template <bool ASYNC_BUILD>
 Status BlockingJoinNode::SendBuildInputToSink(RuntimeState* state,
     DataSink* build_sink) {
+  SetOpenStartTime(state);
   {
     SCOPED_TIMER(build_timer_);
     RETURN_IF_ERROR(build_sink->Open(state));
@@ -309,6 +310,7 @@ Status BlockingJoinNode::SendBuildInputToSink(RuntimeState* state,
     SCOPED_TIMER(build_timer_);
     RETURN_IF_ERROR(build_sink->FlushFinal(state));
   }
+  SetOpenEndTime(state);
   return Status::OK();
 }
 
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index affd93c..9988017 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -158,15 +158,21 @@ Status ExchangeNode::FillInputRowBatch(RuntimeState* state) {
 Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool* eos) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
   if (ReachedLimit()) {
     stream_recvr_->TransferAllResources(output_batch);
+    SetGetNextEndTime(state);
     *eos = true;
     return Status::OK();
   } else {
     *eos = false;
   }
 
-  if (is_merging_) return GetNextMerging(state, output_batch, eos);
+  if (is_merging_) {
+    RETURN_IF_ERROR(GetNextMerging(state, output_batch, eos));
+    if (*eos) SetGetNextEndTime(state);
+    return Status::OK();
+  }
 
   while (true) {
     {
@@ -194,6 +200,7 @@ Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool*
       if (ReachedLimit()) {
         stream_recvr_->TransferAllResources(output_batch);
         *eos = true;
+        SetGetNextEndTime(state);
         return Status::OK();
       }
       if (output_batch->AtCapacity()) return Status::OK();
@@ -203,7 +210,10 @@ Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool*
     stream_recvr_->TransferAllResources(output_batch);
     RETURN_IF_ERROR(FillInputRowBatch(state));
     *eos = (input_batch_ == NULL);
-    if (*eos) return Status::OK();
+    if (*eos) {
+      SetGetNextEndTime(state);
+      return Status::OK();
+    }
     next_row_idx_ = 0;
     DCHECK(input_batch_->row_desc()->LayoutIsPrefixOf(*output_batch->row_desc()));
   }
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index d8495ac..7e63987 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -156,6 +156,9 @@ Status ExecNode::Reset(RuntimeState* state, RowBatch* row_batch) {
 void ExecNode::Close(RuntimeState* state) {
   if (is_closed_) return;
   is_closed_ = true;
+  // Ensure that the end time is set, e.g. if an ancestor node hit a limit or we hit an
+  // error.
+  SetGetNextEndTime(state);
 
   if (rows_returned_counter_ != NULL) {
     COUNTER_SET(rows_returned_counter_, num_rows_returned_);
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 1bbb095..9d17f7f 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -30,7 +30,9 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/descriptors.h" // for RowDescriptor
 #include "runtime/reservation-manager.h"
+#include "runtime/runtime-state.h" // TODO: remove
 #include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 namespace impala {
 
@@ -248,6 +250,11 @@ class ExecNode {
   TPlanNodeType::type type_;
   ObjectPool* pool_;
 
+  RuntimeProfile::Counter* getnext_start_time_ = nullptr;
+  RuntimeProfile::Counter* open_start_time_ = nullptr;
+  RuntimeProfile::Counter* getnext_end_time_ = nullptr;
+  RuntimeProfile::Counter* open_end_time_ = nullptr;
+
   /// Conjuncts and their evaluators in this node. 'conjuncts_' live in the
   /// query-state's object pool while the evaluators live in this exec node's
   /// object pool.
@@ -296,6 +303,42 @@ class ExecNode {
   /// If true, codegen should be disabled for this exec node.
   const bool disable_codegen_;
 
+  // TODO: turn this into a scoped object that takes eos as an argument and
+  // calls SetGetNextStartTime() and SetGetNextEndTime() if *eos && !IsInSubPlan().
+  void SetGetNextStartTime(RuntimeState* state) {
+    if (getnext_start_time_ != nullptr) return;
+    getnext_start_time_ = ADD_COUNTER(runtime_profile_, "GetNextStartTime", TUnit::TIME_US);
+    int64_t start;
+    state->utc_timestamp()->UtcToUnixTimeMicros(&start);
+    COUNTER_SET(getnext_start_time_, GetCurrentTimeMicros() - start);
+  }
+
+  void SetGetNextEndTime(RuntimeState* state) {
+    if (getnext_end_time_ != nullptr) return;
+    getnext_end_time_ = ADD_COUNTER(runtime_profile_, "GetNextEndTime", TUnit::TIME_US);
+    int64_t start;
+    state->utc_timestamp()->UtcToUnixTimeMicros(&start);
+    COUNTER_SET(getnext_end_time_, GetCurrentTimeMicros() - start);
+  }
+
+  // TODO: turn this into a scoped object that calls SetOpenStartTime() and
+  // SetOpenEndTime() if !IsInSubplan().
+  void SetOpenStartTime(RuntimeState* state) {
+    if (open_start_time_ != nullptr) return;
+    open_start_time_ = ADD_COUNTER(runtime_profile_, "OpenStartTime", TUnit::TIME_US);
+    int64_t start;
+    state->utc_timestamp()->UtcToUnixTimeMicros(&start);
+    COUNTER_SET(open_start_time_, GetCurrentTimeMicros() - start);
+  }
+
+  void SetOpenEndTime(RuntimeState* state) {
+    if (open_end_time_ != nullptr) return;
+    open_end_time_ = ADD_COUNTER(runtime_profile_, "OpenEndTime", TUnit::TIME_US);
+    int64_t start;
+    state->utc_timestamp()->UtcToUnixTimeMicros(&start);
+    COUNTER_SET(open_end_time_, GetCurrentTimeMicros() - start);
+  }
+
   /// Create a single exec node derived from thrift node; place exec node in 'pool'.
   static Status CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       const DescriptorTbl& descs, ExecNode** node,
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 4e59e0a..ee91f55 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -69,6 +69,7 @@ Status HdfsScanNodeMt::Open(RuntimeState* state) {
 
 Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
@@ -83,6 +84,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
     int64_t scanner_reservation = buffer_pool_client()->GetReservation();
     RETURN_IF_ERROR(StartNextScanRange(&scanner_reservation, &scan_range_));
     if (scan_range_ == nullptr) {
+      SetGetNextEndTime(state);
       *eos = true;
       StopAndFinalizeCounters();
       return Status::OK();
@@ -120,10 +122,14 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
     scanner_->Close(row_batch);
     scanner_.reset();
     *eos = true;
+    SetGetNextEndTime(state);
   }
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
 
-  if (*eos) StopAndFinalizeCounters();
+  if (*eos) {
+    SetGetNextEndTime(state);
+    StopAndFinalizeCounters();
+  }
   return Status::OK();
 }
 
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 04b6589..be58439 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -77,6 +77,7 @@ HdfsScanNode::~HdfsScanNode() {
 
 Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
 
   if (!initial_ranges_issued_) {
     // We do this in GetNext() to maximise the amount of work we can do while waiting for
@@ -105,6 +106,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
 
   Status status = GetNextInternal(state, row_batch, eos);
   if (!status.ok() || *eos) {
+    SetGetNextEndTime(state);
     unique_lock<mutex> l(lock_);
     lock_guard<SpinLock> l2(file_type_counts_);
     StopAndFinalizeCounters();
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 1e1face..9970187 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -198,6 +198,7 @@ Status NestedLoopJoinNode::GetNext(RuntimeState* state, RowBatch* output_batch,
     bool* eos) {
   DCHECK(!output_batch->AtCapacity());
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
@@ -251,6 +252,7 @@ end:
     *eos = true;
     probe_batch_->TransferResourceOwnership(output_batch);
     build_batches_->TransferResourceOwnership(output_batch);
+    SetGetNextEndTime(state);
   }
   return Status::OK();
 }
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 2ef5c08..1100ac5 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -494,6 +494,7 @@ int PartitionedHashJoinNode::ProcessProbeBatch(
 Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch,
     bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   DCHECK(!out_batch->AtCapacity());
 
@@ -645,6 +646,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
 
   num_rows_returned_ += num_rows_added;
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  if (*eos) SetGetNextEndTime(state);
   return Status::OK();
 }
 
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index d5cb7f0..464c88a 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -84,6 +84,7 @@ Status SelectNode::Open(RuntimeState* state) {
 
 Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   // start (or continue) consuming row batches from child
   do {
@@ -108,6 +109,7 @@ Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
       child_row_batch_->Reset();
     }
   } while (!*eos && !row_batch->AtCapacity());
+  if (*eos) SetGetNextEndTime(state);
   return Status::OK();
 }
 
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 3a70c29..9221589 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -71,6 +71,7 @@ void SortNode::Codegen(RuntimeState* state) {
 
 Status SortNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetOpenStartTime(state);
   RETURN_IF_ERROR(ExecNode::Open(state));
   RETURN_IF_ERROR(child(0)->Open(state));
   // Claim reservation after the child has been opened to reduce the peak reservation
@@ -85,17 +86,20 @@ Status SortNode::Open(RuntimeState* state) {
   // The child has been opened and the sorter created. Sort the input.
   // The final merge is done on-demand as rows are requested in GetNext().
   RETURN_IF_ERROR(SortInput(state));
+  SetOpenEndTime(state);
   return Status::OK();
 }
 
 Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
 
   if (ReachedLimit()) {
     *eos = true;
+    SetGetNextEndTime(state);
     return Status::OK();
   } else {
     *eos = false;
@@ -135,6 +139,7 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   if (ReachedLimit()) {
     row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_));
     *eos = true;
+    SetGetNextEndTime(state);
   }
 
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 961416f..24cef2a 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -136,6 +136,7 @@ void TopNNode::Codegen(RuntimeState* state) {
 
 Status TopNNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetOpenStartTime(state);
   RETURN_IF_ERROR(ExecNode::Open(state));
   RETURN_IF_ERROR(
       tuple_row_less_than_->Open(pool_, state, expr_perm_pool(), expr_results_pool()));
@@ -178,11 +179,13 @@ Status TopNNode::Open(RuntimeState* state) {
   // Unless we are inside a subplan expecting to call Open()/GetNext() on the child
   // again, the child can be closed at this point.
   if (!IsInSubplan()) child(0)->Close(state);
+  SetOpenEndTime(state);
   return Status::OK();
 }
 
 Status TopNNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetGetNextStartTime(state);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
@@ -209,6 +212,7 @@ Status TopNNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   // inside a subplan, we might choose to only selectively transfer, e.g., when the
   // block(s) in the pool are all full or when the pool has reached a certain size.
   if (*eos) row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), false);
+  if (*eos) SetGetNextEndTime(state);
   return Status::OK();
 }
 
-- 
2.7.4

