From 2cb86db8e96f55173ec7dba86089ec3161a46145 Mon Sep 17 00:00:00 2001
From: KathySun-c <kathy.sun@cloudera.com>
Date: Wed, 07 Sep 2016 15:03:21 -0700
Subject: [PATCH] [Draft] Add Nested Value for System table

This is a prototype of nested value.

we could handle queries like:
1.
select * from system.metrics m,
m.nested_value;
2.
select key, nested_value.value
from system.metrics.nested_value;

Change-Id: Ib2f993e77f3ef59056bad9294168540d07be5f36
---

diff --git a/be/src/exec/system-table-scan-node.cc b/be/src/exec/system-table-scan-node.cc
index 2e12def..2fc3cde 100644
--- a/be/src/exec/system-table-scan-node.cc
+++ b/be/src/exec/system-table-scan-node.cc
@@ -51,7 +51,7 @@
     num_rows_(0) {
   table_name_ = tnode.system_table_scan_node.table_name;
   // currently, we only have metrics table in system db
-  DCHECK(table_name_ == TSystemTableName::type::METRICS);
+  DCHECK_EQ(table_name_, TSystemTableName::type::METRICS);
 }
 
 SystemTableScanNode::~SystemTableScanNode() {}
@@ -73,10 +73,6 @@
     case TSystemTableName::METRICS:
       scanner_.reset(new MetricScanner());
       break;
-    case TSystemTableName::QUERIES:
-      // Could add QueryScanner class for exposing impala queries.
-      return Status(Substitute("QueryScanner NYI"));
-      break;
     default:
       return Status(Substitute("Unknown table type: $0", table_name_));
   }
@@ -86,9 +82,10 @@
 
 Status SystemTableScanNode::MaterializeNextTuple(MemPool* tuple_pool, Tuple* tuple) {
   tuple->Init(tuple_desc_->byte_size());
-  scanner_->MaterializeNextTuple(tuple_pool, tuple, tuple_desc_);
+  RETURN_IF_ERROR(scanner_->MaterializeNextTuple(tuple_pool, tuple, tuple_desc_));
   return Status::OK();
 }
+
 
 Status SystemTableScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
@@ -104,7 +101,8 @@
   int64_t tuple_buffer_size;
   uint8_t* tuple_buffer;
   RETURN_IF_ERROR(
-      row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_buffer));
+      row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size,
+          &tuple_buffer));
   Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
   ExprContext** ctxs = &conjunct_ctxs_[0];
   int num_ctxs = conjunct_ctxs_.size();
@@ -113,19 +111,53 @@
 
   // copy rows until we hit the limit/capacity or until we exhaust input batch
   while (!ReachedLimit() && !row_batch->AtCapacity() && !scanner_->eos_) {
-    RETURN_IF_ERROR(MaterializeNextTuple(row_batch->tuple_data_pool(), tuple));
-    int row_idx = row_batch->AddRow();
-    TupleRow* tuple_row = row_batch->GetRow(row_idx);
-    tuple_row->SetTuple(0, tuple);
+    if (!tuple_desc_->tuple_path().empty()) {
+      map<string, string> value_map;
+      scanner_->getMapValue(value_map);
 
-    if (ExecNode::EvalConjuncts(ctxs, num_ctxs, tuple_row)) {
-      row_batch->CommitLastRow();
-      tuple = reinterpret_cast<Tuple*>(
-          reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
-      ++num_rows_returned_;
+      // iterate every map, add several rows
+      for (auto itr : value_map) {
+        const string& key = itr.first;
+        const SlotDescriptor* inner_slot_desc = tuple_desc_->slots()[0];
+        scanner_->WriteStringSlot(key, tuple, inner_slot_desc,
+            row_batch->tuple_data_pool());
+
+        const string& value = itr.second;
+        inner_slot_desc = tuple_desc_->slots()[1];
+        scanner_->WriteStringSlot(value, tuple, inner_slot_desc,
+            row_batch->tuple_data_pool());
+
+        //======================================
+        int row_idx = row_batch->AddRow();
+        TupleRow* tuple_row = row_batch->GetRow(row_idx);
+        tuple_row->SetTuple(0, tuple);
+
+        if (ExecNode::EvalConjuncts(ctxs, num_ctxs, tuple_row)) {
+          row_batch->CommitLastRow();
+          tuple = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple)
+              + tuple_desc_->byte_size());
+          ++num_rows_returned_;
+        }
+      }
+      LOG(INFO) << "metric idx= " << scanner_->next_row_idx_;
+
+      ++(scanner_->next_row_idx_);
+      scanner_->CheckEOS();
+    } else {
+      RETURN_IF_ERROR(
+          MaterializeNextTuple(row_batch->tuple_data_pool(), tuple));
+      int row_idx = row_batch->AddRow();
+      TupleRow* tuple_row = row_batch->GetRow(row_idx);
+      tuple_row->SetTuple(0, tuple);
+
+      if (ExecNode::EvalConjuncts(ctxs, num_ctxs, tuple_row)) {
+        row_batch->CommitLastRow();
+        tuple = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple)
+            + tuple_desc_->byte_size());
+        ++num_rows_returned_;
+      }
+      LOG(INFO) << "metric idx= " << scanner_->next_row_idx_;
     }
-    ++(scanner_->next_row_idx_);
-    scanner_->CheckEOS();
   }
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   if (ReachedLimit() || row_batch->AtCapacity() || scanner_->eos_) {
diff --git a/be/src/exec/system-table-scan-node.h b/be/src/exec/system-table-scan-node.h
index beb7d29..2e1f23d 100644
--- a/be/src/exec/system-table-scan-node.h
+++ b/be/src/exec/system-table-scan-node.h
@@ -28,19 +28,24 @@
 namespace impala {
 class Tuple;
 
+/// A scan node that exposes Impala system state as a virtual table.
+///
+/// Different SystemTableScanner subclasses gather data from different Impala subsystems and
+/// materialize it into Tuples.
+/// e.g MetricScanner expose impala metrics as a table.
 class SystemTableScanNode : public ScanNode {
  public:
   SystemTableScanNode(
       ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
   ~SystemTableScanNode();
 
-  /// Create  schema and columns to slots mapping.
+  /// Create schema and columns to slots mapping.
   virtual Status Prepare(RuntimeState* state);
 
-  /// Start  scan.
+  /// Start scan.
   virtual Status Open(RuntimeState* state);
 
-  /// Fill the next row batch by fetching more data from metrics.
+  /// Fill the next row batch by fetching more data from system table.
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
 
   /// NYI
@@ -59,14 +64,14 @@
   /// Descriptor of tuples read from SystemTable
   const TupleDescriptor* tuple_desc_;
 
-  // enum table type: e.g. metrics, queries
+  // Enum table type: e.g. metrics, queries
   TSystemTableName::type table_name_;
 
   /// The number of rows in input_batch_->rows. The data source should have set
   /// TRowBatch.num_rows, but we compute it just in case they haven't.
   int num_rows_;
 
-  /// Materializes the next row (next_row_idx_) into tuple.
+  /// Materializes the next row into tuple.
   Status MaterializeNextTuple(MemPool* mem_pool, Tuple* tuple);
 };
 }
diff --git a/be/src/exec/system-table-scanner.cc b/be/src/exec/system-table-scanner.cc
index bf8b315..5445d69 100644
--- a/be/src/exec/system-table-scanner.cc
+++ b/be/src/exec/system-table-scanner.cc
@@ -24,9 +24,16 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/collection-value-builder.h"
+#include "runtime/collection-value.h"
+
 #include "runtime/tuple-row.h"
 #include "util/metrics.h"
+#include "util/collection-metrics.h"
+#include "util/histogram-metric.h"
 #include "util/periodic-counter-updater.h"
+#include "testutil/gtest-util.h"
+
 
 #include <gutil/strings/substitute.h>
 #include <algorithm>
@@ -34,9 +41,10 @@
 
 using strings::Substitute;
 namespace impala {
+
 const string ERROR_MEM_LIMIT_EXCEEDED =
-    "SystemTableScanNode::$0() failed to allocate "
-    "$1 bytes for $2.";
+    "SystemTableScanNode::$0() failed to allocate $1 bytes for $2.";
+
 Status MetricScanner::Open() {
   MetricGroup* all_metrics = ExecEnv::GetInstance()->metrics();
   std::stack<MetricGroup*> groups;
@@ -66,9 +74,11 @@
   METRIC_NAME,
   HUMAN_READABLE,
   DESCRIPTION,
+  MAP,
 };
 
-Status WriteStringSlot(const string& slot_value, MemPool* tuple_pool, void* slot) {
+Status MetricScanner::WriteStringSlot(const string& slot_value, Tuple* tuple, const SlotDescriptor* slot_desc, MemPool* tuple_pool) {
+  void* slot = tuple->GetSlot(slot_desc->tuple_offset());
   size_t value_size = slot_value.size();
   char* buffer = reinterpret_cast<char*>(tuple_pool->TryAllocate(value_size));
   if (UNLIKELY(buffer == NULL)) {
@@ -82,6 +92,84 @@
   return Status::OK();
 }
 
+Status MetricScanner::WriteMapSlot(
+    map<string, string> value_map, Tuple* tuple, const SlotDescriptor* slot_desc, MemPool* tuple_pool){
+  int size = value_map.size();
+  const TupleDescriptor* item_desc= slot_desc->collection_item_descriptor();
+  CollectionValue cv;
+  CollectionValueBuilder builder(&cv, *item_desc, tuple_pool, NULL, size);
+  Tuple* tuple_mem;
+  int n;
+  builder.GetFreeMemory(&tuple_mem, &n);
+  //  EXPECT_OK(builder.GetFreeMemory(&tuple_mem, &n));
+  //  ASSERT_GE(n, size);
+  for (auto itr:value_map){
+    const string& key = itr.first;
+    const SlotDescriptor* inner_slot_desc = item_desc->slots()[0];
+    WriteStringSlot(key, tuple_mem, inner_slot_desc, tuple_pool);
+
+    const string& value = itr.second;
+    inner_slot_desc = item_desc->slots()[1];
+    WriteStringSlot(value, tuple_mem, inner_slot_desc, tuple_pool);
+    tuple_mem += item_desc->byte_size();
+  }
+  builder.CommitTuples(size);
+  void* slot = tuple->GetSlot(slot_desc->tuple_offset());
+  reinterpret_cast<CollectionValue*>(slot)->num_tuples = cv.num_tuples;
+  reinterpret_cast<CollectionValue*>(slot)->ptr = cv.ptr;
+
+  return Status::OK();
+}
+
+void MetricScanner::getMapValue(map<string, string>& value_map) {
+  if(StatsMetric<double, StatsType::ALL> * stats_metric = dynamic_cast<StatsMetric<double, StatsType::ALL>*>(metric_pool_[next_row_idx_].metric_)){
+    value_map["MIN"] =  boost::accumulators::min(stats_metric->acc_);
+    value_map["MAX"] =  boost::accumulators::max(stats_metric->acc_);
+    value_map["MEAN"] = boost::accumulators::mean(stats_metric->acc_);
+    value_map["STDDEV"] = sqrt(boost::accumulators::variance(stats_metric->acc_));
+    value_map["COUNT"] = boost::accumulators::count(stats_metric->acc_);
+    value_map["LAST"] = stats_metric->value_;
+
+    LOG(INFO)<< "###################StatsMetric###############";
+
+    LOG(INFO)<< "MIN = "<< value_map["MIN"];
+    LOG(INFO)<< "MAX = "<< value_map["MAX"];
+    LOG(INFO)<< "MEAN = "<< value_map["MEAN"];
+    LOG(INFO)<< "STDDEV = "<< value_map["STDDEV"];
+    LOG(INFO)<< "COUNT = "<< value_map["COUNT"];
+    LOG(INFO)<< "LAST = "<< value_map["LAST"];
+
+  }
+  else if (HistogramMetric* histogram_metric = dynamic_cast<HistogramMetric*>(metric_pool_[next_row_idx_].metric_)){
+    //need to_string()?
+    value_map["25th %-ile"] =  histogram_metric->histogram_.ValueAtPercentile(25);
+    value_map["50th %-ile"] =  histogram_metric->histogram_.ValueAtPercentile(50);
+    value_map["75th %-ile"] =  histogram_metric->histogram_.ValueAtPercentile(75);
+    value_map["90th %-ile"] =  histogram_metric->histogram_.ValueAtPercentile(90);
+    value_map["95th %-ile"] =  histogram_metric->histogram_.ValueAtPercentile(95);
+    value_map["99.9th %-ile"] =  histogram_metric->histogram_.ValueAtPercentile(99.9);
+    value_map["count"] =  histogram_metric->histogram_.TotalCount();
+    LOG(INFO)<< "###################StatsMetric###############";
+
+     LOG(INFO)<< "25th %-ile"<< value_map["25th %-ile"];
+     LOG(INFO)<< "50th %-ile"<< value_map["50th %-ile"];
+     LOG(INFO)<< "COUNT = "<< value_map["COUNT"];
+  }
+  else if (IntGauge* simple_metric = dynamic_cast<IntGauge*>(metric_pool_[next_row_idx_].metric_)) {
+    LOG(INFO)<< "###################simple_metric###############";
+
+    value_map["value"] = std::to_string(simple_metric->value());
+  }
+  else if (SetMetric<string>* set_metric = dynamic_cast<SetMetric<string>*>(metric_pool_[next_row_idx_].metric_)){
+    LOG(INFO)<< "###################SetMetric###############";
+    int counter = 0;
+    for(auto item : set_metric->value_){
+      value_map[std::to_string(counter++)] = item;
+    }
+  }
+
+}
+
 Status MetricScanner::MaterializeNextTuple(
     MemPool* tuple_pool, Tuple* tuple, const TupleDescriptor* tuple_desc_) {
   const TNetworkAddress& backend_address = ExecEnv::GetInstance()->backend_address();
@@ -91,31 +179,39 @@
   const string& metric_name = metric_pool_[next_row_idx_].metric_->key();
   const string& human_readable = metric_pool_[next_row_idx_].metric_->ToHumanReadable();
   const string& description = metric_pool_[next_row_idx_].metric_->description();
+  map<string, string> value_map;
+  getMapValue(value_map);
+
+  typedef class SimpleMetric<int64_t, TMetricKind::GAUGE> IntGauge;
+
 
   for (int i = 0; i < tuple_desc_->slots().size(); ++i) {
     const SlotDescriptor* slot_desc = tuple_desc_->slots()[i];
-    void* slot = tuple->GetSlot(slot_desc->tuple_offset());
-
     switch (slot_desc->col_pos()) {
       case IMPALAD_ADDRESS:
-        RETURN_IF_ERROR(WriteStringSlot(node_address, tuple_pool, slot));
+        RETURN_IF_ERROR(WriteStringSlot(node_address, tuple, slot_desc, tuple_pool));
         break;
       case METRIC_GROUP:
-        RETURN_IF_ERROR(WriteStringSlot(group_name, tuple_pool, slot));
+        RETURN_IF_ERROR(WriteStringSlot(group_name, tuple, slot_desc, tuple_pool));
         break;
       case METRIC_NAME:
-        RETURN_IF_ERROR(WriteStringSlot(group_name, tuple_pool, slot));
+        RETURN_IF_ERROR(WriteStringSlot(metric_name, tuple, slot_desc, tuple_pool));
         break;
       case HUMAN_READABLE:
-        RETURN_IF_ERROR(WriteStringSlot(human_readable, tuple_pool, slot));
+        RETURN_IF_ERROR(WriteStringSlot(human_readable, tuple, slot_desc, tuple_pool));
         break;
       case DESCRIPTION:
-        RETURN_IF_ERROR(WriteStringSlot(description, tuple_pool, slot));
+        RETURN_IF_ERROR(WriteStringSlot(description, tuple, slot_desc, tuple_pool));
+        break;
+      case MAP:
+        RETURN_IF_ERROR(WriteMapSlot(value_map, tuple, slot_desc, tuple_pool));
         break;
       default:
-        return Status(Substitute("column error"));
+	DCHECK(false) << "Unknown column position " << slot_desc->col_pos();
     }
   }
+  ++next_row_idx_;
+  if (next_row_idx_ >= metric_pool_.size()) setEos(true);
   return Status::OK();
 }
 
diff --git a/be/src/exec/system-table-scanner.h b/be/src/exec/system-table-scanner.h
index caa8de2..9f37209 100644
--- a/be/src/exec/system-table-scanner.h
+++ b/be/src/exec/system-table-scanner.h
@@ -36,6 +36,9 @@
   SystemTableScanner() : eos_(false), next_row_idx_(0){};
   virtual ~SystemTableScanner(){};
 
+  bool isEos() const { return eos_; }
+  void setEos(bool eos) {  eos_ = eos; }
+
   /// Start scan, load data needed
   virtual Status Open() = 0;
 
@@ -46,6 +49,15 @@
   /// Set eos = true when all items have been scanned
   virtual void CheckEOS() = 0;
 
+  virtual Status WriteStringSlot(
+      const string& slot_value, Tuple* tuple, const SlotDescriptor* slot_desc, MemPool* tuple_pool) = 0;
+
+  virtual Status WriteMapSlot(
+      std::map<string, string> value_map, Tuple* tuple, const SlotDescriptor* slot_desc, MemPool* tuple_pool) = 0;
+
+  virtual void getMapValue(std::map<string, string>& value_map) = 0;
+
+// private:
   /// if true, nothing left to return in getNext() in SystemTableScanNode
   bool eos_;
 
@@ -70,8 +82,19 @@
       MemPool* tuple_pool, Tuple* tuple, const TupleDescriptor* tuple_desc_);
 
   virtual void CheckEOS() {
-    if (next_row_idx_ >= metric_pool_.size()) eos_ = true;
+    if (next_row_idx_ >= metric_pool_.size()) setEos(true);
   }
+
+  virtual Status WriteStringSlot(
+      const string& slot_value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* tuple_pool);
+
+  virtual Status WriteMapSlot(
+      std::map<string, string> value_map, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* tuple_pool);
+
+  virtual void getMapValue(std::map<string, string>& value_map);
+
 };
 
 } /* namespace impala */
diff --git a/be/src/util/collection-metrics.h b/be/src/util/collection-metrics.h
index 1081c33..d5b99fb 100644
--- a/be/src/util/collection-metrics.h
+++ b/be/src/util/collection-metrics.h
@@ -104,7 +104,7 @@
     return out.str();
   }
 
- private:
+// private:
   /// Lock protecting the set
   boost::mutex lock_;
 
@@ -268,6 +268,7 @@
   boost::mutex lock_;
 
   /// The last value
+ public :
   T value_;
 
   /// The set of accumulators that update the statistics on each Update()
diff --git a/be/src/util/histogram-metric.h b/be/src/util/histogram-metric.h
index 211deee..4057fa8 100644
--- a/be/src/util/histogram-metric.h
+++ b/be/src/util/histogram-metric.h
@@ -85,7 +85,7 @@
     return out.str();
   }
 
- private:
+// private:
   HdrHistogram histogram_;
 
   const TUnit::type unit_;
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/SystemDb.java b/fe/src/main/java/com/cloudera/impala/catalog/SystemDb.java
index a2419a4..6a362fa 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/SystemDb.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/SystemDb.java
@@ -37,6 +37,8 @@
     table.addColumn(new Column("name", Type.STRING, 2));
     table.addColumn(new Column("value", Type.STRING, 3));
     table.addColumn(new Column("description", Type.STRING, 4));
+    table.addColumn(new Column("nested_value", new MapType(Type.STRING, Type.STRING), 5)); //????
+
     addTable(table);
   }
 
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SystemTableScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/SystemTableScanNode.java
index 6c0174a..fa3d2ae 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SystemTableScanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SystemTableScanNode.java
@@ -23,7 +23,9 @@
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.impala.analysis.Analyzer;
+import com.cloudera.impala.analysis.SlotDescriptor;
 import com.cloudera.impala.analysis.TupleDescriptor;
+import com.cloudera.impala.catalog.Column;
 import com.cloudera.impala.catalog.SystemTable;
 import com.cloudera.impala.common.ImpalaException;
 import com.cloudera.impala.common.InternalException;
@@ -39,6 +41,7 @@
 import com.cloudera.impala.thrift.TScanRangeLocations;
 import com.cloudera.impala.thrift.TSystemTableScanNode;
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class SystemTableScanNode extends ScanNode {
@@ -142,4 +145,34 @@
     }
     return output.toString();
   }
+  
+
+  @Override
+  protected void checkForSupportedFileFormats() {
+    Preconditions.checkNotNull(desc_);
+    Preconditions.checkNotNull(desc_.getTable());
+    Column firstComplexTypedCol = null;
+    for (Column col : desc_.getTable().getColumns()) {
+      if (col.getType().isComplexType()) {
+        firstComplexTypedCol = col;
+        break;
+      }
+    }
+    if (firstComplexTypedCol == null)
+      return;
+
+    boolean referencesComplexTypedCol = false;
+    for (SlotDescriptor slotDesc : desc_.getSlots()) {
+      if (!slotDesc.isMaterialized())
+        continue;
+      if (slotDesc.getType().isComplexType() || slotDesc.getColumn() == null) {
+        referencesComplexTypedCol = true;
+        break;
+      }
+    }
+    if (referencesComplexTypedCol) {
+      return;
+    }
+  }
+  
 }
