diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java index 0685790..bb5dd4f 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.pig; import java.io.IOException; +import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; import java.util.List; @@ -31,6 +32,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.Credentials; @@ -161,6 +163,12 @@ public void setLocation(String location, Job job) throws IOException { if (requiredFieldsInfo != null) { // convert to hcatschema and pass to HCatInputFormat try { + //push down projections to columnar store works for RCFile and ORCFile + ArrayList list = new ArrayList(requiredFieldsInfo.getFields().size()); + for (RequiredField rf : requiredFieldsInfo.getFields()) { + list.add(rf.getIndex()); + } + ColumnProjectionUtils.setReadColumns(job.getConfiguration(), list); outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass()); HCatInputFormat.setOutputSchema(job, outputSchema); } catch (Exception e) { @@ -170,6 +178,7 @@ public void setLocation(String location, Job job) throws IOException { // else - this means pig's optimizer never invoked the pushProjection // method - so we need all fields and hence we should not call the // setOutputSchema on HCatInputFormat + ColumnProjectionUtils.setReadAllColumns(job.getConfiguration()); if (HCatUtil.checkJobContextIfRunningFromBackend(job)) { try { HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java index 45a219c..28e711a 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java @@ -19,7 +19,9 @@ package org.apache.hive.hcatalog.pig; import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.io.RandomAccessFile; import java.sql.Date; import java.sql.Timestamp; @@ -34,7 +36,9 @@ import java.util.Set; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -44,30 +48,30 @@ import org.apache.hadoop.hive.ql.io.StorageFormats; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapreduce.Job; - import org.apache.hadoop.util.Shell; import org.apache.hive.hcatalog.HcatTestUtils; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.data.Pair; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; - import org.apache.pig.ExecType; +import org.apache.pig.PigRunner; import org.apache.pig.PigServer; import org.apache.pig.ResourceStatistics; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.joda.time.DateTime; - import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +101,7 @@ add("testReadDataBasic"); add("testReadPartitionedBasic"); add("testProjectionsBasic"); + add("testColumnarStorePushdown2"); }}); }}; @@ -207,8 +212,8 @@ public void setup() throws Exception { HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input); HcatTestUtils.createTestDataFile(COMPLEX_FILE_NAME, new String[]{ - //"Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)},[PHARMACOLOGY#A-,PSYCHIATRY#B+],{(415-253-6367,cell),(408-253-6367,landline)}", - //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}", + "Henry Jekyll\t42\t(415-253-6367,hjekyll@contemporary.edu.uk)\t{(PHARMACOLOGY),(PSYCHIATRY)}\t[PHARMACOLOGY#A-,PSYCHIATRY#B+]\t{(415-253-6367,cell),(408-253-6367,landline)}", + "Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)}\t[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D]\t{(415-253-6367,cell),(408-253-6367,landline)}", } ); PigServer server = new PigServer(ExecType.LOCAL); @@ -482,6 +487,61 @@ public void testProjectionsBasic() throws IOException { } @Test + public void testColumnarStorePushdown() throws Exception { + String PIGOUTPUT_DIR = TEST_DATA_DIR+ "/colpushdownop"; + String PIG_FILE = "test.pig"; + String expectedCols = "0,1"; + PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); + w.println("A = load '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();"); + w.println("B = foreach A generate name,studentid;"); + w.println("C = filter B by name is not null;"); + w.println("store C into '" + PIGOUTPUT_DIR + "' using PigStorage();"); + w.close(); + + try { + String[] args = { "-x", "local", PIG_FILE }; + PigStats stats = PigRunner.run(args, null); + //Pig script was successful + assertTrue(stats.isSuccessful()); + //Single MapReduce job is launched + OutputStats outstats = stats.getOutputStats().get(0); + assertTrue(outstats!= null); + assertEquals(expectedCols,outstats.getConf() + .get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + //delete output file on exit + FileSystem fs = FileSystem.get(outstats.getConf()); + if (fs.exists(new Path(PIGOUTPUT_DIR))) { + fs.delete(new Path(PIGOUTPUT_DIR), true); + } + }finally { + new File(PIG_FILE).delete(); + } + } + + /** + * Tests the failure case caused by HIVE-10752 + * @throws Exception + */ + @Test + public void testColumnarStorePushdown2() throws Exception { + assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS)); + + PigServer server = new PigServer(ExecType.LOCAL); + server.registerQuery("A = load '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();"); + server.registerQuery("B = load '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();"); + server.registerQuery("C = join A by name, B by name;"); + server.registerQuery("D = foreach C generate B::studentid;"); + server.registerQuery("E = ORDER D by studentid asc;"); + + Iterator iter = server.openIterator("E"); + Tuple t = iter.next(); + assertEquals(42, t.get(0)); + + t = iter.next(); + assertEquals(1337, t.get(0)); + } + + @Test public void testGetInputBytes() throws Exception { assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS)); File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000"); diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index cbad3b2..9be62f1 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -72,8 +72,6 @@ public static void appendReadColumnIDs(Configuration conf, List ids) { appendReadColumns(conf, ids); } - - /** * Sets the READ_ALL_COLUMNS flag and removes any previously * set column ids. @@ -91,6 +89,15 @@ public static boolean isReadAllColumns(Configuration conf) { } /** + * Sets the READ_ALL_COLUMNS flag to false and overwrites column ids + * with the provided list. + */ + public static void setReadColumns(Configuration conf, List ids) { + setReadColumnIDConf(conf, READ_COLUMN_IDS_CONF_STR_DEFAULT); + appendReadColumns(conf, ids); + } + + /** * Appends read columns' ids (start from zero). Once a column * is included in the list, a underlying record reader of a columnar file format * (e.g. RCFile and ORC) can know what columns are needed. @@ -99,7 +106,7 @@ public static void appendReadColumns(Configuration conf, List ids) { String id = toReadColumnIDString(ids); String old = conf.get(READ_COLUMN_IDS_CONF_STR, null); String newConfStr = id; - if (old != null) { + if (old != null && !old.isEmpty()) { newConfStr = newConfStr + StringUtils.COMMA_STR + old; } setReadColumnIDConf(conf, newConfStr);