From d556ca9bbd7b56e350f629948da38d9f1f5aff3a Mon Sep 17 00:00:00 2001
From: kathy.sun <kathy.sun@cloudera.com>
Date: Mon, 25 Jul 2016 14:27:16 -0700
Subject: [PATCH] PREVIEW: IMPALA-4024: "system" db with metric table

TODO:
* This seems to basically work with some gaps
* We get the warning about missing stats in the explain plan,
  which we should fix or suppress.

This is to expose metadata (current status of impala, e.g impala-metrics)
into system table, so that users could query them via sql.
Currently, only metrics table is included.
We could add other table later, e.g. queries table

You can run impala-shell.sh and type query like:
select * from system.metrics;
select name, value, description from system.metrics;

Testing:
TODO: add more end-to-end and planner tests

Change-Id: Id0ea3cec4dabb3217a4f4647df2c4aa6fc8f4fbc
---

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index b5f7d3e..32d0137 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -88,6 +88,8 @@
   singular-row-src-node.cc
   sort-node.cc
   subplan-node.cc
+  system-table-scan-node.cc
+  system-table-scanner.cc
   text-converter.cc
   topn-node.cc
   topn-node-ir.cc
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index aaca8be..cf36bc6 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -27,8 +27,6 @@
 #include "codegen/llvm-codegen.h"
 #include "common/object-pool.h"
 #include "common/status.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
 #include "exec/analytic-eval-node.h"
 #include "exec/data-source-scan-node.h"
 #include "exec/empty-set-node.h"
@@ -47,10 +45,13 @@
 #include "exec/singular-row-src-node.h"
 #include "exec/sort-node.h"
 #include "exec/subplan-node.h"
+#include "exec/system-table-scan-node.h"
 #include "exec/topn-node.h"
 #include "exec/union-node.h"
 #include "exec/unnest-node.h"
 #include "exprs/expr.h"
+#include "exprs/scalar-expr-evaluator.h"
+#include "exprs/scalar-expr.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
@@ -411,6 +412,9 @@
     case TPlanNodeType::UNNEST_NODE:
       *node = pool->Add(new UnnestNode(pool, tnode, descs));
       break;
+    case TPlanNodeType::SYSTEM_TABLE_SCAN_NODE:
+      *node = pool->Add(new SystemTableScanNode(pool, tnode, descs));
+      break;
     default:
       map<int, const char*>::const_iterator i =
           _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
diff --git a/be/src/exec/system-table-scan-node.cc b/be/src/exec/system-table-scan-node.cc
new file mode 100644
index 0000000..b408887
--- /dev/null
+++ b/be/src/exec/system-table-scan-node.cc
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/system-table-scan-node.h"
+
+#include <algorithm>
+#include <vector>
+
+#include <boost/algorithm/string/replace.hpp>
+#include <gutil/strings/substitute.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
+
+#include "exec/system-table-scanner.h"
+#include "gen-cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/metrics.h"
+
+#include "common/names.h"
+
+using boost::algorithm::replace_all_copy;
+using strings::Substitute;
+
+namespace impala {
+
+SystemTableScanNode::SystemTableScanNode(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+  : ScanNode(pool, tnode, descs),
+    tuple_id_(tnode.system_table_scan_node.tuple_id),
+    table_name_(tnode.system_table_scan_node.table_name) {
+  // currently, we only have metrics table in system db
+  DCHECK_EQ(table_name_, TSystemTableName::type::METRICS);
+}
+
+SystemTableScanNode::~SystemTableScanNode() {}
+
+Status SystemTableScanNode::Prepare(RuntimeState* state) {
+  RETURN_IF_ERROR(ScanNode::Prepare(state));
+  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
+  DCHECK(tuple_desc_ != nullptr);
+  return Status::OK();
+}
+
+Status SystemTableScanNode::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+
+  switch (table_name_) {
+    case TSystemTableName::METRICS:
+      scanner_.reset(new MetricScanner(state));
+      break;
+    default:
+      return Status(Substitute("Unknown table type: $0", table_name_));
+  }
+  RETURN_IF_ERROR(scanner_->Open());
+  return Status::OK();
+}
+
+Status SystemTableScanNode::MaterializeNextTuple(MemPool* tuple_pool, Tuple* tuple) {
+  tuple->Init(tuple_desc_->byte_size());
+  RETURN_IF_ERROR(scanner_->MaterializeNextTuple(tuple_pool, tuple, tuple_desc_));
+  return Status::OK();
+}
+
+Status SystemTableScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+
+  int64_t tuple_buffer_size;
+  uint8_t* tuple_mem;
+  RETURN_IF_ERROR(
+      row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_mem));
+
+  SCOPED_TIMER(materialize_tuple_timer());
+
+  // copy rows until we hit the limit/capacity or until we exhaust input batch
+  while (!ReachedLimit() && !row_batch->AtCapacity() && !scanner_->eos()) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem);
+    RETURN_IF_ERROR(MaterializeNextTuple(row_batch->tuple_data_pool(), tuple));
+    TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow());
+    tuple_row->SetTuple(0, tuple);
+
+    if (ExecNode::EvalConjuncts(
+            conjunct_evals_.data(), conjunct_evals_.size(), tuple_row)) {
+      row_batch->CommitLastRow();
+      tuple_mem += tuple_desc_->byte_size();
+      ++num_rows_returned_;
+    }
+  }
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  *eos = ReachedLimit() || scanner_->eos();
+  return Status::OK();
+}
+
+Status SystemTableScanNode::Reset(RuntimeState* state) {
+  DCHECK(false) << "NYI";
+  return Status("NYI");
+}
+
+void SystemTableScanNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  ScanNode::Close(state);
+}
+
+} // namespace impala
diff --git a/be/src/exec/system-table-scan-node.h b/be/src/exec/system-table-scan-node.h
new file mode 100644
index 0000000..14ee493
--- /dev/null
+++ b/be/src/exec/system-table-scan-node.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_SYSTEM_TABLE_SCAN_NODE_H_
+#define IMPALA_EXEC_SYSTEM_TABLE_SCAN_NODE_H_
+
+#include <boost/scoped_ptr.hpp>
+#include <rapidjson/document.h>
+#include "exec/scan-node.h"
+
+namespace impala {
+class Tuple;
+class TupleDescriptor;
+class SystemTableScanner;
+
+class SystemTableScanNode : public ScanNode {
+  /// A scan node that exposes Impala system state as a table.
+  ///
+  /// Different SystemTableScanner subclasses gather data from different Impala subsystems
+  /// and
+  /// materialize it into Tuples.
+  /// e.g. MetricScanner materialize impala metric into tuples.
+ public:
+  SystemTableScanNode(
+      ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  ~SystemTableScanNode();
+
+  /// Create schema and columns to slots mapping.
+  virtual Status Prepare(RuntimeState* state);
+
+  /// Start scan.
+  virtual Status Open(RuntimeState* state);
+
+  /// Fill the next row batch by fetching more data from metrics.
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+
+  /// NYI
+  virtual Status Reset(RuntimeState* state);
+
+  /// Close after scan is finished
+  virtual void Close(RuntimeState* state);
+
+ private:
+  // Used to scan gather data of system table from ExecEnv
+  boost::scoped_ptr<SystemTableScanner> scanner_;
+
+  /// Tuple id resolved in Prepare() to set tuple_desc_
+  const int tuple_id_;
+
+  /// Descriptor of tuples read from SystemTable
+  const TupleDescriptor* tuple_desc_ = nullptr;
+
+  // enum table type: e.g. metrics, queries
+  TSystemTableName::type table_name_;
+
+  /// Materializes the next row into tuple.
+  Status MaterializeNextTuple(MemPool* mem_pool, Tuple* tuple);
+};
+}
+#endif /* SYSTEM_TABLE_SCAN_NODE_H_ */
diff --git a/be/src/exec/system-table-scanner.cc b/be/src/exec/system-table-scanner.cc
new file mode 100644
index 0000000..a251e86
--- /dev/null
+++ b/be/src/exec/system-table-scanner.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/system-table-scanner.h"
+
+#include "common/names.h"
+#include "gen-cpp/PlanNodes_types.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/metrics.h"
+#include "util/network-util.h"
+#include "util/periodic-counter-updater.h"
+
+#include <algorithm>
+#include <boost/algorithm/string/replace.hpp>
+#include <gutil/strings/substitute.h>
+
+using strings::Substitute;
+namespace impala {
+
+const string ERROR_MEM_LIMIT_EXCEEDED =
+    "SystemTableScanNode::$0() failed to allocate $1 bytes for $2.";
+
+Status MetricScanner::Open() {
+  impalad_address_ = TNetworkAddressToString(ExecEnv::GetInstance()->backend_address());
+  MetricGroup* all_metrics = ExecEnv::GetInstance()->metrics();
+  std::stack<MetricGroup*> groups;
+  groups.push(all_metrics);
+  typedef std::map<std::string, MetricGroup*> ChildGroupMap;
+
+  while (!groups.empty()) {
+    // Depth-first traversal of children to flatten all metrics
+    MetricGroup* group = groups.top();
+    groups.pop();
+    for (auto child : group->getChildren()) {
+      groups.push(child.second);
+    }
+    for (auto it : group->getMetricMap()) {
+      MetricInfo metric_info;
+      metric_info.parent_ = group;
+      metric_info.metric_ = it.second;
+      metrics_.push_back(metric_info);
+    }
+  }
+  return Status::OK();
+}
+
+enum MetricsColumn {
+  IMPALAD_ADDRESS,
+  METRIC_GROUP,
+  METRIC_NAME,
+  HUMAN_READABLE,
+  DESCRIPTION,
+};
+
+Status SystemTableScanner::WriteStringSlot(
+    const char* data, int len, MemPool* pool, void* slot) {
+  char* buffer = reinterpret_cast<char*>(pool->TryAllocate(len));
+  if (UNLIKELY(buffer == nullptr)) {
+    string details =
+        Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", len, "string slot");
+    return pool->mem_tracker()->MemLimitExceeded(state_, details, len);
+  }
+  memcpy(buffer, data, len);
+  StringValue* sv = reinterpret_cast<StringValue*>(slot);
+  sv->ptr = buffer;
+  sv->len = len;
+  return Status::OK();
+}
+
+Status SystemTableScanner::WriteStringSlot(const string& str, MemPool* pool, void* slot) {
+  return WriteStringSlot(str.data(), str.size(), pool, slot);
+}
+
+Status MetricScanner::MaterializeNextTuple(
+    MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc_) {
+  const MetricInfo& metric = metrics_[next_metric_idx_];
+  for (int i = 0; i < tuple_desc_->slots().size(); ++i) {
+    const SlotDescriptor* slot_desc = tuple_desc_->slots()[i];
+    void* slot = tuple->GetSlot(slot_desc->tuple_offset());
+
+    switch (slot_desc->col_pos()) {
+      case IMPALAD_ADDRESS:
+        RETURN_IF_ERROR(WriteStringSlot(impalad_address_, pool, slot));
+        break;
+      case METRIC_GROUP:
+        RETURN_IF_ERROR(WriteStringSlot(metric.parent_->name(), pool, slot));
+        break;
+      case METRIC_NAME:
+        RETURN_IF_ERROR(WriteStringSlot(metric.metric_->key(), pool, slot));
+        break;
+      case HUMAN_READABLE:
+        RETURN_IF_ERROR(WriteStringSlot(metric.metric_->ToHumanReadable(), pool, slot));
+        break;
+      case DESCRIPTION:
+        RETURN_IF_ERROR(WriteStringSlot(metric.metric_->description(), pool, slot));
+        break;
+      default:
+        DCHECK(false) << "Unknown column position " << slot_desc->col_pos();
+    }
+  }
+  ++next_metric_idx_;
+  if (next_metric_idx_ >= metrics_.size()) eos_ = true;
+  return Status::OK();
+}
+
+} /* namespace impala */
diff --git a/be/src/exec/system-table-scanner.h b/be/src/exec/system-table-scanner.h
new file mode 100644
index 0000000..fc818e5
--- /dev/null
+++ b/be/src/exec/system-table-scanner.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SYSTEM_TABLE_SCANNER_H_
+#define IMPALA_SYSTEM_TABLE_SCANNER_H_
+
+#include <vector>
+#include <boost/scoped_ptr.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "exec/scan-node.h"
+#include "runtime/descriptors.h"
+#include "util/metrics.h"
+
+namespace impala {
+
+// SystemTableScanner is the generic interface for different implementations of
+// system table scanning.
+class SystemTableScanner {
+ public:
+  SystemTableScanner(RuntimeState* state) : state_(state), eos_(false){};
+  virtual ~SystemTableScanner(){};
+
+  /// Start scan, load data needed
+  virtual Status Open() = 0;
+
+  /// Fill the next row batch by fetching more data from system table data source.
+  virtual Status MaterializeNextTuple(
+      MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc_) = 0;
+
+  bool eos() const { return eos_; }
+
+ protected:
+  /// Write a string value to a STRING slot, allocating memory from 'pool'. Returns
+  /// an error if memory cannot be allocated without exceeding a memory limit.
+  Status WriteStringSlot(const char* data, int len, MemPool* pool, void* slot);
+  Status WriteStringSlot(const std::string& str, MemPool* pool, void* slot);
+
+  RuntimeState* const state_;
+
+  /// if true, nothing left to return in getNext() in SystemTableScanNode
+  bool eos_;
+};
+
+struct MetricInfo {
+  Metric* metric_;
+  MetricGroup* parent_;
+};
+
+class MetricScanner : public SystemTableScanner {
+ public:
+  MetricScanner(RuntimeState* state) : SystemTableScanner(state), next_metric_idx_(0){};
+
+  /// Start scan, load metrics into metric_pool.
+  virtual Status Open();
+
+  /// Fill the next row batch by fetching more data from metric_pool.
+  virtual Status MaterializeNextTuple(
+      MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc_);
+
+ private:
+  /// The address of the current backend, materialized as 'impalad_address'.
+  /// Populated in Open();
+  string impalad_address_;
+
+  /// All of the metrics for this backend. Populated in Open().
+  vector<MetricInfo> metrics_;
+
+  /// The index of the next metric to return.
+  int next_metric_idx_;
+};
+
+} /* namespace impala */
+
+#endif /* SYSTEM_TABLE_SCANNER_H_ */
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index ef553fb..87ff17b 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -276,6 +276,16 @@
   return out.str();
 }
 
+SystemTableDescriptor::SystemTableDescriptor(const TTableDescriptor& tdesc)
+  : TableDescriptor(tdesc), table_name_(tdesc.systemTable.table_name) {}
+
+string SystemTableDescriptor::DebugString() const {
+  stringstream out;
+  out << "SystemTable(" << TableDescriptor::DebugString() << " table=" << table_name_
+      << ")";
+  return out.str();
+}
+
 TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc)
   : id_(tdesc.id),
     table_desc_(NULL),
@@ -515,6 +525,9 @@
       case TTableType::KUDU_TABLE:
         desc = pool->Add(new KuduTableDescriptor(tdesc));
         break;
+      case TTableType::SYSTEM_TABLE:
+        desc = pool->Add(new SystemTableDescriptor(tdesc));
+        break;
       default:
         DCHECK(false) << "invalid table type: " << tdesc.tableType;
     }
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 572bfda..2df9e84 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -28,7 +28,8 @@
 #include "common/status.h"
 #include "runtime/types.h"
 
-#include "gen-cpp/Descriptors_types.h"  // for TTupleId
+#include "gen-cpp/Descriptors_types.h" // for TTupleId
+#include "gen-cpp/PlanNodes_types.h"
 #include "gen-cpp/Types_types.h"
 
 namespace llvm {
@@ -384,6 +385,16 @@
   std::vector<std::string> master_addresses_;
 };
 
+// Descriptor for a SystemTable
+class SystemTableDescriptor : public TableDescriptor {
+ public:
+  SystemTableDescriptor(const TTableDescriptor& tdesc);
+  virtual std::string DebugString() const;
+
+ private:
+  TSystemTableName::type table_name_;
+};
+
 class TupleDescriptor {
  public:
   int byte_size() const { return byte_size_; }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5cf0f01..9de815f 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -371,7 +371,7 @@
   // Add hosts of scan nodes.
   vector<TPlanNodeType::type> scan_node_types{TPlanNodeType::HDFS_SCAN_NODE,
       TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE,
-      TPlanNodeType::KUDU_SCAN_NODE};
+      TPlanNodeType::KUDU_SCAN_NODE, TPlanNodeType::SYSTEM_TABLE_SCAN_NODE};
   vector<TPlanNodeId> scan_node_ids;
   FindNodes(fragment.plan, scan_node_types, &scan_node_ids);
   vector<TNetworkAddress> scan_hosts;
@@ -645,7 +645,7 @@
 PlanNodeId Scheduler::FindLeftmostScan(const TPlan& plan) {
   vector<TPlanNodeType::type> scan_node_types{TPlanNodeType::HDFS_SCAN_NODE,
       TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE,
-      TPlanNodeType::KUDU_SCAN_NODE};
+      TPlanNodeType::KUDU_SCAN_NODE, TPlanNodeType::SYSTEM_TABLE_SCAN_NODE};
   return FindLeftmostNode(plan, scan_node_types);
 }
 
@@ -946,4 +946,8 @@
     executor_heap_.decrease(handle);
   }
 }
+
+void Scheduler::GetAllKnownBackends(list<TBackendDescriptor>* backends) {
+  executors_config_->GetAllBackends(backends);
+}
 }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 2fe90b8..f4af666 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -72,9 +72,6 @@
  public:
   static const std::string IMPALA_MEMBERSHIP_TOPIC;
 
-  /// List of server descriptors.
-  typedef std::vector<TBackendDescriptor> BackendList;
-
   /// Initialize with a subscription manager that we can register with for updates to the
   /// set of available backends.
   ///  - backend_id - unique identifier for this Impala backend (usually a host:port)
@@ -96,6 +93,9 @@
   /// ranges in the query exec request.
   Status Schedule(QuerySchedule* schedule);
 
+  /// Return a list of all backends registered with the scheduler.
+  void GetAllKnownBackends(std::list<TBackendDescriptor>* backends);
+
  private:
   /// Map from a host's IP address to the next executor to be round-robin scheduled for
   /// that host (needed for setups with multiple executors on a single host)
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index b7a5e81..11ee617 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -40,6 +40,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/raw-value.h"
 #include "runtime/runtime-state.h"
+#include "scheduling/scheduler.h"
 #include "service/impala-server.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -445,6 +446,23 @@
   return result_bytes;
 }
 
+extern "C" JNIEXPORT jbyteArray JNICALL
+Java_com_cloudera_impala_service_FeSupport_NativeGetBackends(
+    JNIEnv* env, jclass caller_class) {
+  TBackendsList backends_container;
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+
+  list<TBackendDescriptor> backends;
+  exec_env->scheduler()->GetAllKnownBackends(&backends);
+
+  vector<TBackendDescriptor> vec1(backends.begin(), backends.end());
+  backends_container.__set_backend_descs(vec1);
+  jbyteArray result_bytes = nullptr;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &backends_container, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -468,6 +486,10 @@
     (char*)"NativePrioritizeLoad", (char*)"([B)[B",
     (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad
   },
+  {
+    (char*)"NativeGetBackends", (char*)"()[B",
+    (void*)::Java_com_cloudera_impala_service_FeSupport_NativeGetBackends
+  },
 };
 
 void InitFeSupport(bool disable_codegen) {
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 12d6df3..d43638e 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -348,6 +348,12 @@
 
   const std::string& name() const { return name_; }
 
+  typedef std::map<std::string, MetricGroup*> ChildGroupMap;
+  const ChildGroupMap& getChildren() const { return children_; }
+
+  typedef std::map<std::string, Metric*> MetricMap;
+  const MetricMap& getMetricMap() const { return metric_map_; }
+
  private:
   /// Pool containing all metric objects
   boost::scoped_ptr<ObjectPool> obj_pool_;
@@ -359,11 +365,9 @@
   SpinLock lock_;
 
   /// Contains all Metric objects, indexed by key
-  typedef std::map<std::string, Metric*> MetricMap;
   MetricMap metric_map_;
 
   /// All child metric groups
-  typedef std::map<std::string, MetricGroup*> ChildGroupMap;
   ChildGroupMap children_;
 
   /// Webserver callback for /metrics. Produces a tree of JSON values, each representing a
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 7894f75..b48a721 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -47,6 +47,7 @@
   VIEW,
   DATA_SOURCE_TABLE,
   KUDU_TABLE,
+  SYSTEM_TABLE
 }
 
 // TODO: Separate the storage engines (e.g. Kudu) from the file formats.
@@ -373,6 +374,16 @@
   4: required list<TKuduPartitionParam> partition_by
 }
 
+enum TSystemTableName {
+  METRICS,
+  QUERIES
+}
+
+// Represents a System Table
+struct TSystemTable {
+  1: required TSystemTableName table_name
+}
+
 // Represents a table or view.
 struct TTable {
   // Name of the parent database. Case insensitive, expected to be stored as lowercase.
@@ -417,6 +428,9 @@
 
   // Set iff this a kudu table
   13: optional TKuduTable kudu_table
+
+  // Set Set iff this a system table
+  14: optional TSystemTable system_table
 }
 
 // Represents a database.
diff --git a/common/thrift/Descriptors.thrift b/common/thrift/Descriptors.thrift
index c3f1397..f9f08f3 100644
--- a/common/thrift/Descriptors.thrift
+++ b/common/thrift/Descriptors.thrift
@@ -69,6 +69,7 @@
   6: optional CatalogObjects.THBaseTable hbaseTable
   9: optional CatalogObjects.TDataSourceTable dataSourceTable
   10: optional CatalogObjects.TKuduTable kuduTable
+  11: optional CatalogObjects.TSystemTable systemTable
 
   // Unqualified name of table
   7: required string tableName
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c04d08a..78d07c0 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -46,7 +46,8 @@
   SINGULAR_ROW_SRC_NODE,
   UNNEST_NODE,
   SUBPLAN_NODE,
-  KUDU_SCAN_NODE
+  KUDU_SCAN_NODE,
+  SYSTEM_TABLE_SCAN_NODE
 }
 
 // phases of an execution node
@@ -266,6 +267,11 @@
   // Indicates whether the MT scan node implementation should be used.
   // If this is true, then the MT_DOP query option must be > 0.
   2: optional bool use_mt_scan_node
+}
+
+struct TSystemTableScanNode {
+  1: required Types.TTupleId tuple_id
+  2: required CatalogObjects.TSystemTableName table_name
 }
 
 struct TEqJoinCondition {
@@ -535,22 +541,23 @@
   18: optional TExchangeNode exchange_node
   19: optional TAnalyticNode analytic_node
   20: optional TUnnestNode unnest_node
+  21: optional TSystemTableScanNode system_table_scan_node
 
   // Label that should be used to print this node to the user.
-  21: optional string label
+  22: optional string label
 
   // Additional details that should be printed to the user. This is node specific
   // e.g. table name, join strategy, etc.
-  22: optional string label_detail
+  23: optional string label_detail
 
   // Estimated execution stats generated by the planner.
-  23: optional ExecStats.TExecStats estimated_stats
+  24: optional ExecStats.TExecStats estimated_stats
 
   // Runtime filters assigned to this plan node
-  24: optional list<TRuntimeFilterDesc> runtime_filters
+  25: optional list<TRuntimeFilterDesc> runtime_filters
 
   // Resource profile for this plan node.
-  25: required TBackendResourceProfile resource_profile
+  26: required TBackendResourceProfile resource_profile
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index f04650e..3db64ce 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -73,6 +73,11 @@
   7: optional Types.TNetworkAddress krpc_address;
 }
 
+// a list of TBackendDescriptor
+struct TBackendsList {
+  1: required list<TBackendDescriptor> backend_descs;
+}
+
 // Description of a single entry in a topic
 struct TTopicItem {
   // Human-readable topic entry identifier
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 53aa000..70cf76f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -584,6 +584,7 @@
    * Throws an AuthorizationException if the dbName is a system db
    * and the user is trying to modify it.
    * Returns true if this is a system db and the action is allowed.
+   * Return false if authorization should be checked in the usual way.
    */
   private boolean checkSystemDbAccess(String dbName, Privilege privilege)
       throws AuthorizationException {
@@ -593,6 +594,9 @@
         case VIEW_METADATA:
         case ANY:
           return true;
+        case SELECT:
+          // Check authorization for SELECT on system tables in the usual way.
+          return false;
         default:
           throw new AuthorizationException("Cannot modify system database.");
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index df5cf97..add1f04 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -45,6 +45,7 @@
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.SystemTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
@@ -609,7 +610,8 @@
       Preconditions.checkState(table instanceof HdfsTable ||
           table instanceof KuduTable ||
           table instanceof HBaseTable ||
-          table instanceof DataSourceTable);
+          table instanceof DataSourceTable ||
+          table instanceof SystemTable);
       return new BaseTableRef(tableRef, resolvedPath);
     } else {
       return new CollectionTableRef(tableRef, resolvedPath);
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
index 6977f3b..d82794e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java
@@ -24,6 +24,7 @@
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.SystemTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
@@ -124,7 +125,15 @@
 
     table_ = path_.getRootTable();
     // Register authorization and audit events.
-    analyzer.getTable(table_.getTableName(), getPrivilegeRequirement());
+    Table table = analyzer.getTable(table_.getTableName(), getPrivilegeRequirement());
+
+    if (table instanceof SystemTable
+        && (outputStyle_ == TDescribeOutputStyle.FORMATTED
+        || outputStyle_ == TDescribeOutputStyle.EXTENDED)) {
+      throw new AnalysisException(String.format("User '%s' does not have " +
+          "privileges to see table on: 'system' database",
+          analyzer.getUser().getName()));
+    }
 
     // Describing a table.
     if (path_.destTable() != null) return;
diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java
index d2b57e8..37d0656 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.SystemTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.AnalysisException;
@@ -68,6 +69,10 @@
       // statement references a column by its implicitly defined column names.
       viewAnalyzer.setUseHiveColLabels(true);
       viewQuery.analyze(viewAnalyzer);
+    } else if (table instanceof SystemTable) {
+      throw new AnalysisException(String.format("User '%s' does not have " +
+          "privileges to see detailed infomation about table on: 'system' database",
+          analyzer.getUser().getName()));
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 994aa37..43f805c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -59,6 +59,7 @@
   public final static long INITIAL_CATALOG_VERSION = 0L;
   public static final String DEFAULT_DB = "default";
   public static final String BUILTINS_DB = "_impala_builtins";
+  public static final String SYSTEM_DB = "system";
 
   protected final MetaStoreClientPool metaStoreClientPool_ =
       new MetaStoreClientPool(0, 0);
@@ -77,6 +78,9 @@
   // DB that contains all builtins
   private static Db builtinsDb_;
 
+  // System database, containing system table, e.g. "metrics"
+  private static Db systemDb_;
+
   // Cache of data sources.
   protected final CatalogObjectCache<DataSource> dataSources_;
 
@@ -89,6 +93,8 @@
     dataSources_ = new CatalogObjectCache<DataSource>();
     builtinsDb_ = new BuiltinsDb(BUILTINS_DB, this);
     addDb(builtinsDb_);
+    systemDb_ = new SystemDb(SYSTEM_DB, this);
+    addDb(systemDb_);
   }
 
   /**
@@ -104,6 +110,7 @@
   }
 
   public Db getBuiltinsDb() { return builtinsDb_; }
+  public Db getSystemDb() { return systemDb_; }
 
   /**
    * Adds a new database to the catalog, replacing any existing database with the same
diff --git a/fe/src/main/java/org/apache/impala/catalog/SystemDb.java b/fe/src/main/java/org/apache/impala/catalog/SystemDb.java
new file mode 100644
index 0000000..141dcc8
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/SystemDb.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.Collections;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+public final class SystemDb extends Db {
+  public SystemDb(String name, Catalog catalog) {
+    super(name, catalog, createMetastoreDb(name));
+    AddMetricTable();
+    setIsSystemDb(true);
+  }
+
+  private void AddMetricTable() {
+    /// Table for all Impala daemon metrics.
+    Table table = new SystemTable(null, this, "metrics", "system");
+    table.addColumn(new Column("impalad_address", Type.STRING, 0));
+    table.addColumn(new Column("group", Type.STRING, 1));
+    table.addColumn(new Column("name", Type.STRING, 2));
+    table.addColumn(new Column("value", Type.STRING, 3));
+    table.addColumn(new Column("description", Type.STRING, 4));
+    addTable(table);
+  }
+
+  private static final String SYSTEM_DB_COMMENT = "System database for Impala cluster";
+
+  private static Database createMetastoreDb(String name) {
+    return new org.apache.hadoop.hive.metastore.api.Database(
+        name, SYSTEM_DB_COMMENT, "", Collections.<String, String>emptyMap());
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
new file mode 100644
index 0000000..8d28c88
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.thrift.TSystemTable;
+import org.apache.impala.thrift.TSystemTableName;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.TResultRowBuilder;
+import com.google.common.base.Preconditions;
+
+public final class SystemTable extends Table {
+  private static HashMap<String, TSystemTableName> SYSTEM_TABLE_NAME_MAP =
+      new HashMap<String, TSystemTableName>();
+  static {
+    SYSTEM_TABLE_NAME_MAP.put("metrics", TSystemTableName.METRICS);
+    SYSTEM_TABLE_NAME_MAP.put("queries", TSystemTableName.QUERIES);
+  }
+
+  public SystemTable(org.apache.hadoop.hive.metastore.api.Table msTable,
+      Db db, String name, String owner) {
+    super(msTable, db, name, owner);
+    systemTableName_ = SYSTEM_TABLE_NAME_MAP.get(name);
+  }
+
+  private TSystemTableName systemTableName_;
+  public TSystemTableName getSystemTableName() { return systemTableName_; }
+
+  @Override
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
+    // Create thrift descriptors to send to the BE.
+    TTableDescriptor tableDescriptor =
+        new TTableDescriptor(tableId, TTableType.SYSTEM_TABLE, getTColumnDescriptors(),
+            numClusteringCols_, name_, db_.getName());
+    tableDescriptor.setSystemTable(getTSystemTable());
+    return tableDescriptor;
+  }
+
+  /**
+   * Returns a thrift structure for the system table.
+   */
+  private TSystemTable getTSystemTable() { return new TSystemTable(systemTableName_); }
+
+  @Override
+  public TCatalogObjectType getCatalogObjectType() {
+    return TCatalogObjectType.TABLE;
+  }
+
+  @Override
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
+      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+    // Table is always loaded.
+    Preconditions.checkState(false);
+  }
+
+  /**
+   * Returns a thrift structure representing the table.
+   */
+  @Override
+  public TTable toThrift() {
+    TTable table = super.toThrift();
+    table.setTable_type(TTableType.SYSTEM_TABLE);
+    table.setSystem_table(getTSystemTable());
+    return table;
+  }
+
+  /**
+   * Returns statistics on this table as a tabular result set. Used for the SHOW
+   * TABLE STATS statement. The schema of the returned TResultSet is set inside
+   * this method.
+   */
+  public TResultSet getTableStats() {
+    TResultSet result = new TResultSet();
+    TResultSetMetadata resultSchema = new TResultSetMetadata();
+    resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
+    result.setSchema(resultSchema);
+    TResultRowBuilder rowBuilder = new TResultRowBuilder();
+    rowBuilder.add(getNumRows());
+    result.addToRows(rowBuilder.get());
+    return result;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index d72e818..a0bae05 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -37,7 +37,6 @@
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprId;
 import org.apache.impala.analysis.ExprSubstitutionMap;
-import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.InlineViewRef;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.NullLiteral;
@@ -48,7 +47,6 @@
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TableRef;
-import org.apache.impala.analysis.TableSampleClause;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
@@ -60,9 +58,9 @@
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.SystemTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
@@ -1307,10 +1305,14 @@
     } else if (table instanceof HBaseTable) {
       // HBase table
       scanNode = new HBaseScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
-    } else if (tblRef.getTable() instanceof KuduTable) {
+    } else if (table instanceof KuduTable) {
       scanNode = new KuduScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts);
       scanNode.init(analyzer);
       return scanNode;
+    } else if (table instanceof SystemTable) {
+      scanNode = new SystemTableScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
+      scanNode.init(analyzer);
+      return scanNode;
     } else {
       throw new NotImplementedException(
           "Planning not implemented for table ref class: " + tblRef.getClass());
diff --git a/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java b/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java
new file mode 100644
index 0000000..0463fd4
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.catalog.SystemTable;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TBackendDescriptor;
+import org.apache.impala.thrift.TBackendsList;
+import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPlanNode;
+import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.thrift.TScanRange;
+import org.apache.impala.thrift.TScanRangeLocation;
+import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TSystemTableName;
+import org.apache.impala.thrift.TSystemTableScanNode;
+
+public class SystemTableScanNode extends ScanNode {
+  private final static Logger LOG = LoggerFactory.getLogger(SystemTableScanNode.class);
+
+  /// Estimate for the number of rows returned per node for each system table.
+  private final static Map<TSystemTableName, Integer> ESTIMATED_NUM_ROWS_PER_NODE =
+      ImmutableMap.<TSystemTableName, Integer>builder()
+      .put(TSystemTableName.METRICS, 300).build();
+
+
+  public SystemTableScanNode(PlanNodeId id, TupleDescriptor desc) {
+    super(id, desc, "SCAN SYSTEM_TABLE");
+    table_ = (SystemTable) desc_.getTable();
+  }
+
+  private final SystemTable table_;
+
+  @Override
+  public void init(Analyzer analyzer) throws ImpalaException {
+    checkForSupportedFileFormats();
+    assignConjuncts(analyzer);
+    analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
+    conjuncts_ = orderConjunctsByCost(conjuncts_);
+    // materialize slots in remaining conjuncts_
+    analyzer.materializeSlots(conjuncts_);
+    computeMemLayout(analyzer);
+    computeScanRangeLocations(analyzer);
+    computeStats(analyzer);
+  }
+
+  /**
+   * Create a single scan range for each node in cluster.
+   */
+  private void computeScanRangeLocations(Analyzer analyzer) throws InternalException {
+    TBackendsList backends_container = FeSupport.GetBackends();
+    List<TBackendDescriptor> backends = backends_container.getBackend_descs();
+
+    scanRanges_ = Lists.newArrayList();
+    for (TBackendDescriptor backend: backends) {
+      TNetworkAddress networkAddress = backend.getAddress();
+      // Translate from network address to the global (to this request) host index.
+      int globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
+
+      TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList();
+      scanRangeLocations.scan_range = new TScanRange();
+      scanRangeLocations.locations =
+          Lists.newArrayList(new TScanRangeLocation(globalHostIdx));
+      scanRanges_.add(scanRangeLocations);
+    }
+  }
+
+  @Override
+  public void computeStats(Analyzer analyzer) {
+    super.computeStats(analyzer);
+    numNodes_ = scanRanges_.size();
+    int rowsPerNode = ESTIMATED_NUM_ROWS_PER_NODE.get(table_.getSystemTableName());
+    int numRowsEstimate_ = numNodes_ * rowsPerNode;
+    inputCardinality_ = numRowsEstimate_;
+    cardinality_ = numRowsEstimate_;
+    cardinality_ *= computeSelectivity();
+    cardinality_ = Math.max(1, cardinality_);
+    cardinality_ = capAtLimit(cardinality_);
+
+    LOG.debug("inputCardinality=" + inputCardinality_);
+    LOG.debug("cardinality=" + cardinality_);
+    LOG.debug(" #nodes=" + numNodes_);
+  }
+
+  @Override
+  protected String debugString() {
+    return Objects.toStringHelper(this)
+        .add("tid", desc_.getId().asInt())
+        .add("TblName", desc_.getTable().getFullName())
+        .addValue(super.debugString())
+        .toString();
+  }
+
+  @Override
+  protected void toThrift(TPlanNode msg) {
+    msg.node_type = TPlanNodeType.SYSTEM_TABLE_SCAN_NODE;
+    msg.system_table_scan_node =
+        new TSystemTableScanNode(desc_.getId().asInt(), table_.getSystemTableName());
+  }
+
+  @Override
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
+    // System scan nodes should use limited resources.
+    nodeResourceProfile_ = ResourceProfile.noReservation(10L * 1024L * 1024L);
+  }
+
+  @Override
+  protected String getNodeExplainString(
+      String prefix, String detailPrefix, TExplainLevel detailLevel) {
+    StringBuilder output = new StringBuilder();
+    String aliasStr = "";
+    if (!table_.getFullName().equalsIgnoreCase(desc_.getAlias())
+        && !table_.getName().equalsIgnoreCase(desc_.getAlias())) {
+      aliasStr = " " + desc_.getAlias();
+    }
+
+    output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_,
+        table_.getFullName(), aliasStr));
+
+    if (!conjuncts_.isEmpty()) {
+      output.append(prefix + "predicates: " + getExplainString(conjuncts_) + "\n");
+    }
+
+    // Add table and column stats in verbose mode.
+    if (detailLevel == TExplainLevel.VERBOSE) {
+      output.append(getStatsExplainString(prefix, detailLevel));
+      output.append("\n");
+    }
+    return output.toString();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 8b87962..3a788ab 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -34,6 +34,7 @@
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.TBackendsList;
 import org.apache.impala.thrift.TCacheJarParams;
 import org.apache.impala.thrift.TCacheJarResult;
 import org.apache.impala.thrift.TCatalogObject;
@@ -83,6 +84,9 @@
   // we make make all RPCs in the BE layer instead of calling the Catalog Server
   // using Java Thrift bindings.
   public native static byte[] NativePrioritizeLoad(byte[] thriftReq);
+
+  // Returns a serialized TBackEndDescriptor
+  public native static byte[] NativeGetBackends();
 
   /**
    * Locally caches the jar at the specified HDFS location.
@@ -261,6 +265,20 @@
     }
   }
 
+  public static TBackendsList GetBackends() throws InternalException {
+    try {
+      byte[] result = NativeGetBackends();
+      Preconditions.checkNotNull(result);
+      TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+      TBackendsList backends_list = new TBackendsList();
+      deserializer.deserialize(backends_list, result);
+      return backends_list;
+    } catch (TException e) {
+      throw new InternalException(
+          "Error retrieving startup options: " + e.getMessage(), e);
+     }
+   }
+
   /**
    * This function should only be called explicitly by the FeSupport to ensure that
    * native functions are loaded.
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 63941c1..06522b0 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -76,6 +76,7 @@
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.SystemTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -730,6 +731,8 @@
       } else {
         return ((KuduTable) table).getTableStats();
       }
+    } else if (table instanceof SystemTable) {
+      return ((SystemTable) table).getTableStats();
     } else {
       throw new InternalException("Invalid table class: " + table.getClass());
     }
diff --git a/fe/src/test/resources/authz-policy.ini.template b/fe/src/test/resources/authz-policy.ini.template
index 57db74c..72da191 100644
--- a/fe/src/test/resources/authz-policy.ini.template
+++ b/fe/src/test/resources/authz-policy.ini.template
@@ -24,7 +24,7 @@
           select_functional_alltypesagg, insert_functional_alltypes,\
           select_functional_complex_view, select_functional_view_view,\
           insert_parquet, new_table_uri, tpch_data_uri, select_column_level_functional,\
-          upper_case_uri
+          upper_case_uri, select_system_database_metrics
 auth_to_local_group = test_role
 server_admin = all_server
 
@@ -46,6 +46,8 @@
     server=server1->db=functional->table=complex_view->action=select
 select_functional_view_view =\
     server=server1->db=functional->table=view_view->action=select
+select_system_database_metrics =\
+    server=server1->db=system->table=metrics->action=select
 insert_parquet = server=server1->db=functional_parquet->table=*->action=insert
 select_column_level_functional =\
     server=server1->db=functional->table=alltypessmall->column=id->action=select,\
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/system-db.test b/testdata/workloads/functional-planner/queries/PlannerTest/system-db.test
new file mode 100644
index 0000000..29b79f4
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/system-db.test
@@ -0,0 +1,8 @@
+# Basic test with system table scan
+select * from system.metrics
+---- PLAN
+00:SCAN SYSTEM_TABLE [system.metrics]
+---- DISTRIBUTEDPLAN
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN SYSTEM_TABLE [system.metrics]
diff --git a/testdata/workloads/functional-query/queries/QueryTest/system-database.test b/testdata/workloads/functional-query/queries/QueryTest/system-database.test
new file mode 100644
index 0000000..d5b2107
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/system-database.test
@@ -0,0 +1,10 @@
+====
+---- QUERY
+select count(*)
+from system.metrics
+where name="tcmalloc.bytes-in-use"
+---- RESULTS
+3
+---- TYPES
+BIGINT
+====
diff --git a/tests/metadata/test_metadata_query_statements.py b/tests/metadata/test_metadata_query_statements.py
index bf21ecb..37be475 100644
--- a/tests/metadata/test_metadata_query_statements.py
+++ b/tests/metadata/test_metadata_query_statements.py
@@ -177,3 +177,8 @@
     self.cleanup_db('impala_test_desc_db2')
     self.cleanup_db('impala_test_desc_db3')
     self.cleanup_db('impala_test_desc_db4')
+
+  # TODO: should this be restricted to the three-node minicluster?
+  def test_system_db(self, vector):
+    self.run_test_case('QueryTest/system-database', new_vector)
+
