From 7ea6af6c7216812145d2f0b9653adb77b2026a7b Mon Sep 17 00:00:00 2001
From: Alex Behm <alex.behm@cloudera.com>
Date: Mon, 5 Feb 2018 18:47:45 -0800
Subject: [PATCH] Projection prototype.

Change-Id: I5dc56a75233ecd339d54bcc25225cc8152a18e6c
---
 be/src/service/query-options.cc                    |   5 +
 be/src/service/query-options.h                     |   4 +-
 common/thrift/DataSinks.thrift                     |   7 ++
 common/thrift/ImpalaInternalService.thrift         |   3 +
 common/thrift/ImpalaService.thrift                 |   3 +
 .../main/java/org/apache/impala/analysis/Expr.java |  17 +++-
 .../org/apache/impala/analysis/OrderByElement.java |   2 +-
 .../java/org/apache/impala/analysis/SortInfo.java  |  17 +---
 .../org/apache/impala/planner/AggregationNode.java |  32 +++++-
 .../apache/impala/planner/AnalyticEvalNode.java    |  45 +++++++-
 .../apache/impala/planner/DistributedPlanner.java  |   8 +-
 .../org/apache/impala/planner/ExchangeNode.java    |  68 +++++++------
 .../java/org/apache/impala/planner/JoinNode.java   |  39 ++++++-
 .../java/org/apache/impala/planner/PlanNode.java   |  15 +++
 .../java/org/apache/impala/planner/Planner.java    | 113 +++++++++++++++++++--
 .../org/apache/impala/planner/ProjectionInfo.java  |  37 +++++++
 .../impala/planner/RuntimeFilterGenerator.java     |   6 +-
 .../java/org/apache/impala/planner/ScanNode.java   |  14 +++
 .../java/org/apache/impala/planner/SelectNode.java |   5 +-
 .../java/org/apache/impala/planner/SortNode.java   |  30 +++++-
 .../java/org/apache/impala/planner/UnionNode.java  |  52 +++++++++-
 .../org/apache/impala/planner/PlannerTest.java     |  21 ++++
 22 files changed, 458 insertions(+), 85 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/planner/ProjectionInfo.java

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 3c56f89..5548fe8 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -632,6 +632,11 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_exec_time_limit_s(time_limit);
         break;
       }
+      case TImpalaQueryOptions::ENABLE_PROJECTION_TRIMMING: {
+        query_options->__set_enable_projection_trimming(
+            iequals(value, "true") || iequals(value, "1"));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 2280cff..45300ad 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::EXEC_TIME_LIMIT_S + 1);\
+      TImpalaQueryOptions::ENABLE_PROJECTION_TRIMMING + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
@@ -129,6 +129,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(enable_projection_trimming, ENABLE_PROJECTION_TRIMMING,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 1b26c92..e07c703 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -43,6 +43,11 @@ enum TTableSinkType {
   KUDU
 }
 
+struct TProjectionInfo {
+  1: list<Exprs.TExpr> src_exprs
+  2: Types.TTupleId dst_tid
+}
+
 // Sink which forwards data to a remote plan fragment,
 // according to the given output partition specification
 // (ie, the m:1 part of an m:n data stream)
@@ -54,6 +59,8 @@ struct TDataStreamSink {
   // If the partitioning type is UNPARTITIONED, the output is broadcast
   // to each destination host.
   2: required Partitions.TDataPartition output_partition
+
+  3: optional TProjectionInfo projection
 }
 
 // Creates a new Hdfs files according to the evaluation of the partitionKeyExprs,
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index dc37fc2..f56a9d0 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -272,6 +272,9 @@ struct TQueryOptions {
   // not include time spent in planning, scheduling or admission control. A value of 0
   // means no time limit.
   63: optional i32 exec_time_limit_s = 0;
+
+  // TODO(Alex): Comment
+  64: optional bool enable_projection_trimming = true;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 356f5e5..0b7b38e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -283,6 +283,9 @@ enum TImpalaQueryOptions {
   // not include time spent in planning, scheduling or admission control. A value of 0
   // means no time limit.
   EXEC_TIME_LIMIT_S,
+
+  // TODO(Alex): Comment
+  ENABLE_PROJECTION_TRIMMING,
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index a235662..5d9f63e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -20,6 +20,7 @@ package org.apache.impala.analysis;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
@@ -1093,6 +1094,18 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     return true;
   }
 
+  public static <T extends Expr> boolean isExprListBoundByTupleIds(
+      Collection<T> exprs, List<TupleId> tids) {
+    for (Expr e: exprs) if (!e.isBoundByTupleIds(tids)) return false;
+    return true;
+  }
+
+  public static <T extends Expr> boolean isExprListBound(
+      Collection<T> exprs, TupleId tid) {
+    for (Expr e: exprs) if (!e.isBound(tid)) return false;
+    return true;
+  }
+
   /**
    * Returns true if expr is fully bound by slotIds, otherwise false.
    */
@@ -1127,9 +1140,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   public static <C extends Expr> void getIds(List<? extends Expr> exprs,
       List<TupleId> tupleIds, List<SlotId> slotIds) {
     if (exprs == null) return;
-    for (Expr e: exprs) {
-      e.getIds(tupleIds, slotIds);
-    }
+    for (Expr e: exprs) e.getIds(tupleIds, slotIds);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/OrderByElement.java b/fe/src/main/java/org/apache/impala/analysis/OrderByElement.java
index 4dd90c1..722cfba 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OrderByElement.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OrderByElement.java
@@ -121,7 +121,7 @@ public class OrderByElement {
       ExprSubstitutionMap smap, Analyzer analyzer) {
     List<OrderByElement> result = Lists.newArrayListWithCapacity(src.size());
     for (OrderByElement element: src) {
-      result.add(new OrderByElement(element.getExpr().substitute(smap, analyzer, false),
+      result.add(new OrderByElement(element.getExpr().substitute(smap, analyzer, true),
           element.isAsc_, element.nullsFirstParam_));
     }
     return result;
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 745de6d..a830858 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -16,13 +16,11 @@
 // under the License.
 
 package org.apache.impala.analysis;
-import org.apache.impala.common.TreeNode;
-
-import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Set;
 
+import org.apache.impala.common.TreeNode;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
@@ -51,7 +49,7 @@ public class SortInfo {
   private final List<Boolean> nullsFirstParams_;
   // Subset of ordering exprs that are materialized. Populated in
   // createMaterializedOrderExprs(), used for EXPLAIN output.
-  private List<Expr> materializedOrderingExprs_;
+  private final List<Expr> materializedOrderingExprs_;
   // The single tuple that is materialized, sorted, and output by a sort operator
   // (i.e. SortNode or TopNNode)
   private TupleDescriptor sortTupleDesc_;
@@ -150,15 +148,6 @@ public class SortInfo {
     orderingExprs_ = Expr.substituteList(orderingExprs_, smap, analyzer, false);
   }
 
-  /**
-   * Asserts that all ordering exprs are bound by the sort tuple.
-   */
-  public void checkConsistency() {
-    for (Expr orderingExpr: orderingExprs_) {
-      Preconditions.checkState(orderingExpr.isBound(sortTupleDesc_.getId()));
-    }
-  }
-
   @Override
   public SortInfo clone() { return new SortInfo(this); }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index c31f448..020e700 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -21,14 +21,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.SlotId;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TAggregationNode;
 import org.apache.impala.thrift.TExplainLevel;
@@ -37,6 +36,8 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.util.BitUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -343,4 +344,29 @@ public class AggregationNode extends PlanNode {
         .setSpillableBufferBytes(bufferSize)
         .setMaxRowBufferBytes(maxRowBufferSize).build();
   }
+
+  @Override
+  public void getExprs(List<Expr> exprs) {
+    super.getExprs(exprs);
+    exprs.addAll(aggInfo_.getAggregateExprs());
+    exprs.addAll(aggInfo_.getGroupingExprs());
+  }
+
+  @Override
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
+      throws ImpalaException {
+    super.substitute(smap, analyzer);
+    aggInfo_.substitute(smap, analyzer);
+    aggInfo_.checkConsistency();
+  }
+
+  @Override
+  public void validateExprs() {
+    super.validateExprs();
+    Preconditions.checkState(Expr.isExprListBoundByTupleIds(
+        aggInfo_.getAggregateExprs(), getChild(0).getTupleIds()));
+    Preconditions.checkState(Expr.isExprListBoundByTupleIds(
+        aggInfo_.getGroupingExprs(), getChild(0).getTupleIds()));
+    aggInfo_.checkConsistency();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 1ce4884..33733f0 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -19,20 +19,21 @@ package org.apache.impala.planner;
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.AnalyticWindow;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.OrderByElement;
 import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TAnalyticNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -64,8 +65,8 @@ public class AnalyticEvalNode extends PlanNode {
 
   // predicates constructed from partitionExprs_/orderingExprs_ to
   // compare input to buffered tuples
-  private final Expr partitionByEq_;
-  private final Expr orderByEq_;
+  private Expr partitionByEq_;
+  private Expr orderByEq_;
   private final TupleDescriptor bufferedTupleDesc_;
 
   public AnalyticEvalNode(
@@ -259,4 +260,38 @@ public class AnalyticEvalNode extends PlanNode {
         .setMinReservationBytes(perInstanceMinBufferBytes)
         .setSpillableBufferBytes(bufferSize).setMaxRowBufferBytes(bufferSize).build();
   }
+
+  @Override
+  public void getExprs(List<Expr> exprs) {
+    super.getExprs(exprs);
+    exprs.addAll(analyticFnCalls_);
+    exprs.addAll(substitutedPartitionExprs_);
+    if (partitionByEq_ != null) exprs.add(partitionByEq_);
+    if (orderByEq_ != null) exprs.add(orderByEq_);
+    for (OrderByElement e: orderByElements_) exprs.add(e.getExpr());
+  }
+
+  @Override
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
+      throws ImpalaException {
+    super.substitute(smap, analyzer);
+    analyticFnCalls_ = Expr.substituteList(analyticFnCalls_, smap, analyzer, true);
+    substitutedPartitionExprs_ =
+        Expr.substituteList(substitutedPartitionExprs_, smap, analyzer, true);
+    orderByElements_ = OrderByElement.substitute(orderByElements_, smap, analyzer);
+    if (partitionByEq_ != null) partitionByEq_ = partitionByEq_.substitute(smap, analyzer, true);
+    if (orderByEq_ != null) orderByEq_ = orderByEq_.substitute(smap, analyzer, true);
+  }
+
+  @Override
+  public void validateExprs() {
+    super.validateExprs();
+    Preconditions.checkState(Expr.isExprListBoundByTupleIds(
+        analyticFnCalls_, getChild(0).getTupleIds()));
+    Preconditions.checkState(Expr.isExprListBoundByTupleIds(
+        substitutedPartitionExprs_, getChild(0).getTupleIds()));
+    for (OrderByElement e: orderByElements_) {
+      Preconditions.checkState(e.getExpr().isBoundByTupleIds(getChild(0).getTupleIds()));
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 241a71e..5fc877d 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -26,13 +26,11 @@ import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
-import static org.apache.impala.analysis.JoinOperator.*;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
-import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.util.KuduUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -395,11 +393,11 @@ public class DistributedPlanner {
     // The new fragment is hash-partitioned on the lhs input join exprs.
     ExchangeNode lhsExchange =
         new ExchangeNode(ctx_.getNextNodeId(), leftChildFragment.getPlanRoot());
-    lhsExchange.computeStats(null);
+    lhsExchange.computeStats(ctx_.getRootAnalyzer());
     node.setChild(0, lhsExchange);
     ExchangeNode rhsExchange =
         new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot());
-    rhsExchange.computeStats(null);
+    rhsExchange.computeStats(ctx_.getRootAnalyzer());
     node.setChild(1, rhsExchange);
 
     // Connect the child fragments in a new fragment, and set the data partition
@@ -1033,7 +1031,7 @@ public class DistributedPlanner {
     // Set limit, offset and merge parameters in the exchange node.
     exchNode.unsetLimit();
     if (hasLimit) exchNode.setLimit(limit);
-    exchNode.setMergeInfo(node.getSortInfo(), offset);
+    exchNode.setMergeInfo(node.getSortInfo().clone(), offset);
 
     // Child nodes should not process the offset. If there is a limit,
     // the child nodes need only return (offset + limit) rows.
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 87d2fd2..ba3aa4d 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -17,8 +17,11 @@
 
 package org.apache.impala.planner;
 
+import java.util.List;
+
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.ImpalaException;
@@ -28,6 +31,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -51,7 +55,7 @@ public class ExchangeNode extends PlanNode {
 
   // The parameters based on which sorted input streams are merged by this
   // exchange node. Null if this exchange does not merge sorted streams
-  private SortInfo mergeInfo_;
+  public SortInfo mergeInfo_;
 
   // Offset after which the exchange begins returning rows. Currently valid
   // only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
@@ -82,37 +86,11 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   public void computeStats(Analyzer analyzer) {
-    Preconditions.checkState(!children_.isEmpty(),
-        "ExchangeNode must have at least one child");
-    cardinality_ = 0;
-    for (PlanNode child: children_) {
-      if (child.getCardinality() == -1) {
-        cardinality_ = -1;
-        break;
-      }
-      cardinality_ = checkedAdd(cardinality_, child.getCardinality());
-    }
-
-    if (hasLimit()) {
-      if (cardinality_ == -1) {
-        cardinality_ = limit_;
-      } else {
-        cardinality_ = Math.min(limit_, cardinality_);
-      }
-    }
-
+    super.computeStats(analyzer);
+    Preconditions.checkState(children_.size() == 1);
+    cardinality_ = capAtLimit(children_.get(0).getCardinality());
     // Apply the offset correction if there's a valid cardinality
-    if (cardinality_ > -1) {
-      cardinality_ = Math.max(0, cardinality_ - offset_);
-    }
-
-    // Pick the max numNodes_ and avgRowSize_ of all children.
-    numNodes_ = Integer.MIN_VALUE;
-    avgRowSize_ = Integer.MIN_VALUE;
-    for (PlanNode child: children_) {
-      numNodes_ = Math.max(child.numNodes_, numNodes_);
-      avgRowSize_ = Math.max(child.avgRowSize_, avgRowSize_);
-    }
+    if (cardinality_ > -1) cardinality_ = Math.max(0, cardinality_ - offset_);
   }
 
   /**
@@ -210,4 +188,32 @@ public class ExchangeNode extends PlanNode {
       msg.exchange_node.setOffset(offset_);
     }
   }
+
+  @Override
+  public void getExprs(List<Expr> exprs) {
+    DataPartition partition = getChild(0).getFragment().getOutputPartition();
+    exprs.addAll(partition.getPartitionExprs());
+    if (mergeInfo_ != null) exprs.addAll(mergeInfo_.getOrderingExprs());
+  }
+
+  @Override
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
+      throws ImpalaException {
+    super.substitute(smap, analyzer);
+    DataPartition partition = getChild(0).getFragment().getOutputPartition();
+    if (partition.isHashPartitioned()) partition.substitute(smap, analyzer);
+    if (mergeInfo_ != null) mergeInfo_.substituteOrderingExprs(smap, analyzer);
+  }
+
+  @Override
+  public void validateExprs() {
+    super.validateExprs();
+    DataPartition partition = getChild(0).getFragment().getOutputPartition();
+    Preconditions.checkState(
+        Expr.isExprListBoundByTupleIds(partition.getPartitionExprs(), tupleIds_));
+    if (mergeInfo_ != null) {
+      Preconditions.checkState(
+          Expr.isExprListBoundByTupleIds(mergeInfo_.getOrderingExprs(), tupleIds_));
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 030d706..a50c1aa 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
@@ -32,8 +33,8 @@ import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.thrift.TJoinDistributionMode;
-import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TQueryOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -690,4 +691,40 @@ public abstract class JoinNode extends PlanNode {
         finishedBuildProfile.sum(probeSideProfile.postOpenProfile);
     return new ExecPhaseResourceProfiles(duringOpenProfile, probePhaseProfile);
   }
+
+  @Override
+  public void getExprs(List<Expr> exprs) {
+    super.getExprs(exprs);
+    exprs.addAll(eqJoinConjuncts_);
+    exprs.addAll(otherJoinConjuncts_);
+  }
+
+  @Override
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
+      throws ImpalaException {
+    super.substitute(smap, analyzer);
+    List<BinaryPredicate> newEqJoinConjuncts = Lists.newArrayList();
+    for (BinaryPredicate bp: eqJoinConjuncts_) {
+      newEqJoinConjuncts.add((BinaryPredicate) bp.substitute(smap, analyzer, true));
+    }
+    eqJoinConjuncts_ = newEqJoinConjuncts;
+    otherJoinConjuncts_ = Expr.substituteList(otherJoinConjuncts_, smap, analyzer, true);
+  }
+
+  @Override
+  public void validateExprs() {
+    super.validateExprs();
+    // TODO(Alex): Refine comment.
+    // Semi joins output a subset of its child tuples. The join conjuncts are evaluated
+    // over the child tuples.
+    List<TupleId> childTids = Lists.newArrayList();
+    for (PlanNode child: children_) childTids.addAll(child.getTupleIds());
+    Preconditions.checkState(
+        Expr.isExprListBoundByTupleIds(eqJoinConjuncts_, childTids));
+    Preconditions.checkState(
+        Expr.isExprListBoundByTupleIds(otherJoinConjuncts_, childTids));
+    for (RuntimeFilter rf: runtimeFilters_) {
+      Preconditions.checkState(rf.getSrcExpr().isBoundByTupleIds(childTids));
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index a14c89a..3fc59c1 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -802,4 +802,19 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public void setDisableCodegen(boolean disableCodegen) {
     disableCodegen_ = disableCodegen;
   }
+
+  // TODO(Alex): Need a better name.
+  public void getExprs(List<Expr> exprs) {
+    exprs.addAll(conjuncts_);
+  }
+
+  // TODO(Alex): Implement all of these.
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws ImpalaException {
+    conjuncts_ = Expr.substituteList(conjuncts_, smap, analyzer, true);
+    for (RuntimeFilter rf: runtimeFilters_) rf.substituteSrcExpr(smap, analyzer);
+  }
+
+  public void validateExprs() {
+    Preconditions.checkState(Expr.isExprListBoundByTupleIds(conjuncts_, tupleIds_));
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 6bef39f..ad663e8 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -30,14 +30,20 @@ import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.QueryStmt;
+import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotId;
+import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.SortInfo;
+import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
+import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.common.TreeNode;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TQueryCtx;
@@ -53,6 +59,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 
 /**
@@ -119,14 +126,7 @@ public class Planner {
       fragments = distributedPlanner.createPlanFragments(singleNodePlan);
     }
 
-    // Create runtime filters.
     PlanFragment rootFragment = fragments.get(fragments.size() - 1);
-    if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
-      RuntimeFilterGenerator.generateRuntimeFilters(ctx_, rootFragment.getPlanRoot());
-      ctx_.getTimeline().markEvent("Runtime filters computed");
-    }
-
-    rootFragment.verifyTree();
     ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap();
     List<Expr> resultExprs = null;
     if (ctx_.isInsertOrCtas()) {
@@ -158,6 +158,24 @@ public class Planner {
     }
     rootFragment.setOutputExprs(resultExprs);
 
+    // Create runtime filters. This step relies on the value transfer graph. It
+    // must be performed before projection trimming which may add new tuples/slots
+    // and substitute join exprs without updating the value transfer graph.
+    if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
+      RuntimeFilterGenerator.generateRuntimeFilters(ctx_, rootFragment.getPlanRoot());
+      ctx_.getTimeline().markEvent("Runtime filters computed");
+    }
+    rootFragment.verifyTree();
+
+    // Apply projection.
+    if (ctx_.getQueryOptions().enable_projection_trimming) {
+      List<Expr> parentExprs = Lists.newArrayList(resultExprs);
+      projectSlots(rootFragment.getPlanRoot(), parentExprs, ctx_.getRootAnalyzer());
+      rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap();
+      resultExprs = Expr.substituteList(resultExprs, rootNodeSmap, ctx_.getRootAnalyzer(), true);
+      rootFragment.setOutputExprs(resultExprs);
+    }
+
     // The check for disabling codegen uses estimates of rows per node so must be done
     // on the distributed plan.
     checkForDisableCodegen(rootFragment.getPlanRoot());
@@ -221,6 +239,87 @@ public class Planner {
     return fragments;
   }
 
+  private ProjectionInfo computeProjection(
+      List<TupleId> tids, List<Expr> exprs, Analyzer analyzer) {
+    // Slot ids required to evaluate exprs.
+    List<SlotId> projectedSids = Lists.newArrayList();
+    Expr.getIds(exprs, null, projectedSids);
+
+    int numMaterializedSlots = 0;
+    List<SlotDescriptor> usedSlotDescs = Lists.newArrayList();
+    for (TupleId tid: tids) {
+      TupleDescriptor tupleDesc = analyzer.getTupleDesc(tid);
+      for (SlotDescriptor slotDesc: tupleDesc.getSlots()) {
+        if (projectedSids.contains(slotDesc.getId())) {
+          Preconditions.checkState(slotDesc.isMaterialized());
+          usedSlotDescs.add(slotDesc);
+        }
+        if (slotDesc.isMaterialized()) ++numMaterializedSlots;
+      }
+    }
+    // Only apply projection if necessary.
+    if (usedSlotDescs.size() == numMaterializedSlots) return null;
+    Preconditions.checkState(usedSlotDescs.size() < numMaterializedSlots);
+
+    TupleDescriptor projectedTuple =
+        analyzer.getDescTbl().createTupleDescriptor("projection");
+    ExprSubstitutionMap projectionSmap = new ExprSubstitutionMap();
+    for (SlotDescriptor slotDesc: usedSlotDescs) {
+      SlotDescriptor projectedSlotDesc = analyzer.copySlotDescriptor(slotDesc, projectedTuple);
+      projectedSlotDesc.setIsMaterialized(true);
+      projectedSlotDesc.setSourceExpr(new SlotRef(slotDesc));
+      projectionSmap.put(new SlotRef(slotDesc), new SlotRef(projectedSlotDesc));
+    }
+
+    // Materialize TupleIsNullPredicates.
+    List<TupleIsNullPredicate> tupleIsNullPreds = Lists.newArrayList();
+    TreeNode.collect(exprs, Predicates.instanceOf(TupleIsNullPredicate.class), tupleIsNullPreds);
+    Expr.removeDuplicates(tupleIsNullPreds);
+    for (TupleIsNullPredicate tupleIsNullPred: tupleIsNullPreds) {
+      SlotDescriptor slotDesc = analyzer.addSlotDescriptor(projectedTuple);
+      slotDesc.initFromExpr(tupleIsNullPred);
+      projectionSmap.put(tupleIsNullPred.clone(), new SlotRef(slotDesc));
+    }
+
+    projectedTuple.computeMemLayout();
+    return new ProjectionInfo(projectedTuple, projectionSmap);
+  }
+
+  public void projectSlots(PlanNode node, List<Expr> parentExprs, Analyzer analyzer) throws ImpalaException {
+    List<Expr> localParentExprs = Lists.newArrayList(parentExprs);
+    node.getExprs(parentExprs);
+    // TODO: Add optimizations to clear parentExprs.
+    for (PlanNode child: node.getChildren()) projectSlots(child, parentExprs, analyzer);
+
+    ProjectionInfo projection = null;
+    if (node instanceof ExchangeNode) {
+      node.getExprs(localParentExprs);
+      ExchangeNode exchNode = (ExchangeNode) node;
+      List<Expr> substLocalParentExprs = Expr.substituteList(
+          localParentExprs, exchNode.getChild(0).getOutputSmap(), analyzer, true);
+      projection = computeProjection(exchNode.getChild(0).getTupleIds(), substLocalParentExprs, analyzer);
+      if (projection != null) {
+        PlanFragment inputFragment = node.getChild(0).getFragment();
+        // TODO(Alex): Force no passthrough.
+        UnionNode unionNode = UnionNode.createProjection(ctx_.getNextNodeId(),
+            projection.tupleDesc.getId(), projection.smap.getRhs(), false);
+        unionNode.addChild(inputFragment.getPlanRoot(), projection.smap.getLhs());
+        unionNode.setOutputSmap(ExprSubstitutionMap.compose(inputFragment.getPlanRoot().getOutputSmap(), projection.smap, analyzer));
+        unionNode.init(analyzer);
+        inputFragment.setPlanRoot(unionNode);
+        node.setChild(0, unionNode);
+      }
+    }
+
+    // Compute row composition, apply child smaps and re-compute stats.
+    ExprSubstitutionMap smap = node.getCombinedChildSmap();
+    node.substitute(smap, analyzer);
+    node.setOutputSmap(smap);
+    node.computeTupleIds();
+    node.validateExprs(); // for debugging
+    node.computeStats(analyzer);
+  }
+
   /**
    * Return a list of plans, each represented by the root of their fragment trees.
    * TODO: roll into createPlan()
diff --git a/fe/src/main/java/org/apache/impala/planner/ProjectionInfo.java b/fe/src/main/java/org/apache/impala/planner/ProjectionInfo.java
new file mode 100644
index 0000000..af2257d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ProjectionInfo.java
@@ -0,0 +1,37 @@
+// Copyright 2015 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.thrift.TProjectionInfo;
+
+public class ProjectionInfo {
+  public final TupleDescriptor tupleDesc;
+  public final ExprSubstitutionMap smap;
+
+  public ProjectionInfo(TupleDescriptor tupleDesc, ExprSubstitutionMap smap) {
+    this.tupleDesc = tupleDesc;
+    this.smap = smap;
+  }
+
+  public TProjectionInfo toThrift() {
+    TProjectionInfo result = new TProjectionInfo();
+    for (Expr e: smap.getLhs()) result.addToSrc_exprs(e.treeToThrift());
+    result.setDst_tid(tupleDesc.getId().asInt());
+    return result;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 89f14d1..a0eb708 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -159,7 +159,7 @@ public final class RuntimeFilterGenerator {
     // Join node that builds the filter
     private final JoinNode src_;
     // Expr (rhs of join predicate) on which the filter is built
-    private final Expr srcExpr_;
+    private Expr srcExpr_;
     // Expr (lhs of join predicate) from which the targetExprs_ are generated.
     private final Expr origTargetExpr_;
     // The operator comparing 'srcExpr_' and 'origTargetExpr_'.
@@ -274,6 +274,10 @@ public final class RuntimeFilterGenerator {
     public void markFinalized() { finalized_ = true; }
     public boolean isFinalized() { return finalized_; }
 
+    public void substituteSrcExpr(ExprSubstitutionMap smap, Analyzer analyzer) {
+      srcExpr_ = srcExpr_.substitute(smap, analyzer, true);
+    }
+
     /**
      * Serializes a runtime filter to Thrift.
      */
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index eea9c50..a787f4d 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -19,12 +19,15 @@ package org.apache.impala.planner;
 
 import java.util.List;
 
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TTableStats;
@@ -227,4 +230,15 @@ abstract public class ScanNode extends PlanNode {
    * engine.
    */
   public boolean hasStorageLayerConjuncts() { return false; }
+
+  @Override
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) {
+  }
+
+  @Override
+  public void validateExprs() {
+    for (RuntimeFilter rf: runtimeFilters_) {
+      Preconditions.checkState(rf.getTargetExpr(id_).isBoundByTupleIds(tupleIds_));
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index 3ffc975..a5de672 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -19,15 +19,14 @@ package org.apache.impala.planner;
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 3ca50e0..28dec9e 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -19,15 +19,13 @@ package org.apache.impala.planner;
 
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.SortInfo;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
@@ -36,6 +34,9 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
 import org.apache.impala.thrift.TSortNode;
 import org.apache.impala.thrift.TSortType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -72,7 +73,7 @@ public class SortNode extends PlanNode {
   protected long offset_;
 
   // The type of sort. Determines the exec node used in the BE.
-  private TSortType type_;
+  private final TSortType type_;
 
   /**
    * Creates a new SortNode that implements a partial sort.
@@ -153,7 +154,6 @@ public class SortNode extends PlanNode {
     outputSmap_ = ExprSubstitutionMap.compose(childSmap, outputSmap_, analyzer);
 
     info_.substituteOrderingExprs(outputSmap_, analyzer);
-    info_.checkConsistency();
 
     if (LOG.isTraceEnabled()) {
       LOG.trace("sort id " + tupleIds_.get(0).toString() + " smap: "
@@ -314,4 +314,24 @@ public class SortNode extends PlanNode {
       return "SORT";
     }
   }
+
+  @Override
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
+      throws ImpalaException {
+    super.substitute(smap, analyzer);
+    resolvedTupleExprs_ = Expr.substituteList(resolvedTupleExprs_, smap, analyzer, true);
+  }
+
+  @Override
+  public void getExprs(List<Expr> exprs) {
+    exprs.addAll(resolvedTupleExprs_);
+  }
+
+  @Override
+  public void validateExprs() {
+    Preconditions.checkState(
+        Expr.isExprListBoundByTupleIds(resolvedTupleExprs_, getChild(0).getTupleIds()));
+    Preconditions.checkState(
+        Expr.isExprListBoundByTupleIds(info_.getOrderingExprs(), tupleIds_));
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index 52ce508..ac1f7e5 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -22,10 +22,12 @@ import java.util.List;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.analysis.TupleDescriptor;
-import org.apache.impala.analysis.TupleId;
+import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.analysis.TupleId;
+import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TPlanNode;
@@ -84,14 +86,24 @@ public class UnionNode extends PlanNode {
     isInSubplan_ = false;
   }
 
-  protected UnionNode(PlanNodeId id, TupleId tupleId,
-        List<Expr> unionResultExprs, boolean isInSubplan) {
-    super(id, tupleId.asList(), "UNION");
+  private UnionNode(String displayName, PlanNodeId id, TupleId tupleId,
+      List<Expr> unionResultExprs, boolean isInSubplan) {
+    super(id, tupleId.asList(), displayName);
     unionResultExprs_ = unionResultExprs;
     tupleId_ = tupleId;
     isInSubplan_ = isInSubplan;
   }
 
+  protected UnionNode(PlanNodeId id, TupleId tupleId,
+        List<Expr> unionResultExprs, boolean isInSubplan) {
+    this("UNION", id, tupleId, unionResultExprs, isInSubplan);
+  }
+
+  public static UnionNode createProjection(PlanNodeId id, TupleId tupleId,
+      List<Expr> unionResultExprs, boolean isInSubplan) {
+    return new UnionNode("PROJECT", id, tupleId, unionResultExprs, isInSubplan);
+  }
+
   public void addConstExprList(List<Expr> exprs) { constExprLists_.add(exprs); }
 
   /**
@@ -321,4 +333,34 @@ public class UnionNode extends PlanNode {
     }
     return output.toString();
   }
+
+  @Override
+  public void getExprs(List<Expr> exprs) {
+    for (List<Expr> childExprs: materializedResultExprLists_) exprs.addAll(childExprs);
+  }
+
+  @Override
+  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
+      throws ImpalaException {
+    super.substitute(smap, analyzer);
+    for (int i = 0; i < materializedResultExprLists_.size(); ++i) {
+      List<Expr> origExprs = materializedResultExprLists_.get(i);
+      List<Expr> substExps = Expr.substituteList(origExprs, smap, analyzer, true);
+      materializedResultExprLists_.set(i, substExps);
+    }
+  }
+
+  @Override
+  public void validateExprs() {
+    super.validateExprs();
+    Preconditions.checkState(materializedResultExprLists_.size() == children_.size());
+    for (int i = 0; i < materializedResultExprLists_.size(); ++i) {
+      List<Expr> exprs = materializedResultExprLists_.get(i);
+      Preconditions.checkState(
+          Expr.isExprListBoundByTupleIds(exprs, getChild(i).getTupleIds()));
+    }
+    for (List<Expr> constExprs: constExprLists_) {
+      for (Expr e: constExprs) Preconditions.checkState(e.isConstant());
+    }
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5dbba75..5d2fd00 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -40,6 +40,27 @@ import com.google.common.collect.Lists;
 public class PlannerTest extends PlannerTestBase {
 
   @Test
+  public void testFix() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("fix", options);
+  }
+
+  @Test
+  public void testFix2() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("fix2", options);
+  }
+
+  @Test
+  public void testFix3() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("fix3", options);
+  }
+
+  @Test
   public void testPredicatePropagation() {
     runPlannerTestFile("predicate-propagation");
   }
-- 
2.7.4

