diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java index 0685790..a0ff062 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.pig; import java.io.IOException; +import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; import java.util.List; @@ -31,6 +32,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.Credentials; @@ -161,6 +163,12 @@ public void setLocation(String location, Job job) throws IOException { if (requiredFieldsInfo != null) { // convert to hcatschema and pass to HCatInputFormat try { + //push down projections to columnar store works for RCFile and ORCFile + ArrayList list = new ArrayList(requiredFieldsInfo.getFields().size()); + for (RequiredField rf : requiredFieldsInfo.getFields()) { + list.add(rf.getIndex()); + } + ColumnProjectionUtils.appendReadColumns(job.getConfiguration(), list); outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass()); HCatInputFormat.setOutputSchema(job, outputSchema); } catch (Exception e) { @@ -170,6 +178,7 @@ public void setLocation(String location, Job job) throws IOException { // else - this means pig's optimizer never invoked the pushProjection // method - so we need all fields and hence we should not call the // setOutputSchema on HCatInputFormat + ColumnProjectionUtils.setReadAllColumns(job.getConfiguration()); if (HCatUtil.checkJobContextIfRunningFromBackend(job)) { try { HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java index 45a219c..c5a3543 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java @@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.pig; import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; import java.io.IOException; import java.io.RandomAccessFile; import java.sql.Date; @@ -34,7 +36,9 @@ import java.util.Set; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -44,6 +48,7 @@ import org.apache.hadoop.hive.ql.io.StorageFormats; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Shell; @@ -60,6 +65,9 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import org.apache.pig.PigRunner; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigStats; import org.joda.time.DateTime; import org.junit.After; @@ -482,6 +490,39 @@ public void testProjectionsBasic() throws IOException { } @Test + public void testColumnarStorePushdown() throws Exception { + String PIGOUTPUT_DIR = TEST_DATA_DIR+ "/colpushdownop"; + String PIG_FILE = "test.pig"; + String expectedCols = "0,1"; + PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); + w.println("A = load '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();"); + w.println("B = foreach A generate name,studentid;"); + w.println("C = filter B by name is not null;"); + w.println("store C into '" + PIGOUTPUT_DIR + "' using PigStorage();"); + w.close(); + + try { + String[] args = { "-x", "local", PIG_FILE }; + PigStats stats = PigRunner.run(args, null); + //Pig script was successful + assertTrue(stats.isSuccessful()); + //Single MapReduce job is launched + OutputStats outstats = stats.getOutputStats().get(0); + assertTrue(outstats!= null); + assertEquals(expectedCols,outstats.getConf() + .get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + //delete output file on exit + FileSystem fs = FileSystem.get(outstats.getConf()); + if (fs.exists(new Path(PIGOUTPUT_DIR))) + { + fs.delete(new Path(PIGOUTPUT_DIR), true); + } + }finally { + new File(PIG_FILE).delete(); + } + } + + @Test public void testGetInputBytes() throws Exception { assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS)); File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000");