diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index dff3b0f..f7bca95 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -24,8 +24,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.ql.exec.DagUtils; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hive.spark.client.SparkClientUtilities; import org.apache.spark.util.CallSite; import org.slf4j.Logger; @@ -129,6 +133,17 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr refreshLocalResources(sparkWork, hiveConf); JobConf jobConf = new JobConf(hiveConf); + //predicate push down + for (BaseWork work : sparkWork.getAllWork()) { + Set> operatorSet = work.getAllOperators(); + for (Operator source : operatorSet) { + if (source instanceof TableScanOperator) { + TableScanOperator ts = (TableScanOperator)source; + HiveInputFormat.pushFilters(jobConf, ts, null); + } + } + } + // Create temporary scratch dir Path emptyScratchDir; emptyScratchDir = ctx.getMRTmpPath(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index e8f39ae..fe7d624 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -28,11 +28,15 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.NullScanFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,6 +209,17 @@ private SparkJobRef submit(final DriverContext driverContext, final SparkWork sp //update the credential provider location in the jobConf HiveConfUtil.updateJobCredentialProviders(jobConf); + //predicate push down + for (BaseWork work : sparkWork.getAllWork()) { + Set> operatorSet = work.getAllOperators(); + for (Operator source : operatorSet) { + if (source instanceof TableScanOperator) { + TableScanOperator ts = (TableScanOperator)source; + HiveInputFormat.pushFilters(jobConf, ts, null); + } + } + } + // Create temporary scratch dir final Path emptyScratchDir = ctx.getMRTmpPath(); FileSystem fs = emptyScratchDir.getFileSystem(jobConf);