commit 8e03b6c70b56814c3d380ac7e3c5b575de00eb38 Author: Owen O'Malley Date: Wed Sep 17 14:52:10 2014 -0700 HIVE-6936. pass table properties to file formats. diff --git ql/pom.xml ql/pom.xml index 85d2a5f..b9f87d7 100644 --- ql/pom.xml +++ ql/pom.xml @@ -575,6 +575,7 @@ com.twitter:parquet-hadoop-bundle org.apache.thrift:libthrift commons-lang:commons-lang + org.apache.commons:commons-lang3 org.jodd:jodd-core org.json:json org.apache.avro:avro diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index c00ff59..3147ca4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,7 +50,6 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -61,11 +61,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; @@ -83,6 +80,9 @@ static Log LOG = LogFactory.getLog(FetchOperator.class.getName()); static LogHelper console = new LogHelper(LOG); + public static final String FETCH_OPERATOR_DIRECTORY_LIST = + "hive.complete.dir.list"; + private boolean isNativeTable; private FetchWork work; protected Operator operator; // operator tree for processing row further (option) @@ -353,6 +353,7 @@ private void getNextPath() throws Exception { } return; } else { + setFetchOperatorContext(job, work.getPartDir()); iterPath = work.getPartDir().iterator(); iterPartDesc = work.getPartDesc().iterator(); } @@ -381,6 +382,30 @@ private void getNextPath() throws Exception { } /** + * Set context for this fetch operator in to the jobconf. + * This helps InputFormats make decisions based on the scope of the complete + * operation. + * @param conf the configuration to modify + * @param partDirs the list of partition directories + */ + static void setFetchOperatorContext(JobConf conf, + ArrayList partDirs) { + if (partDirs != null) { + StringBuilder buff = new StringBuilder(); + boolean first = true; + for(Path p: partDirs) { + if (first) { + first = false; + } else { + buff.append('\t'); + } + buff.append(StringEscapeUtils.escapeJava(p.toString())); + } + conf.set(FETCH_OPERATOR_DIRECTORY_LIST, buff.toString()); + } + } + + /** * A cache of Object Inspector Settable Properties. */ private static Map oiSettableProperties = new HashMap(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 487bb33..2c9e81f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -697,6 +697,7 @@ public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf, // But if hive supports assigning bucket number for each partition, this can be vary public void setupContext(List paths) throws HiveException { int segmentLen = paths.size(); + FetchOperator.setFetchOperatorContext(jobConf, fetchWork.getPartDir()); FetchOperator[] segments = segmentsForSize(segmentLen); for (int i = 0 ; i < segmentLen; i++) { Path path = paths.get(i); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 973923c..537ed2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -27,6 +27,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; +import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -2275,13 +2276,15 @@ public static int getDefaultNotificationInterval(Configuration hconf) { * configuration which receives configured properties */ public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) { - String bucketString = tbl.getProperties() - .getProperty(hive_metastoreConstants.BUCKET_COUNT); - // copy the bucket count - if (bucketString != null) { - job.set(hive_metastoreConstants.BUCKET_COUNT, bucketString); + Properties tblProperties = tbl.getProperties(); + for(String name: tblProperties.stringPropertyNames()) { + if (job.get(name) == null) { + String val = (String) tblProperties.get(name); + if (val != null) { + job.set(name, StringEscapeUtils.escapeJava(val)); + } + } } - Map jobProperties = tbl.getJobProperties(); if (jobProperties == null) { return; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index bbde09c..640a9f9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -18,17 +18,24 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.CollectDesc; @@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.processors.CommandProcessor; +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -49,8 +60,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.junit.Test; /** * TestOperators. @@ -274,7 +291,7 @@ public void testScriptOperator() throws Throwable { cd, sop); op.initialize(new JobConf(TestOperators.class), - new ObjectInspector[] {r[0].oi}); + new ObjectInspector[]{r[0].oi}); // evaluate on row for (int i = 0; i < 5; i++) { @@ -379,4 +396,82 @@ public void testMapOperator() throws Throwable { throw (e); } } + + @Test + public void testFetchOperatorContextQuoting() throws Exception { + JobConf conf = new JobConf(); + ArrayList list = new ArrayList(); + list.add(new Path("hdfs://nn.example.com/fi\tl\\e\t1")); + list.add(new Path("hdfs://nn.example.com/file\t2")); + list.add(new Path("file:/file3")); + FetchOperator.setFetchOperatorContext(conf, list); + String[] parts = + conf.get(FetchOperator.FETCH_OPERATOR_DIRECTORY_LIST).split("\t"); + assertEquals(3, parts.length); + assertEquals("hdfs://nn.example.com/fi\\tl\\\\e\\t1", parts[0]); + assertEquals("hdfs://nn.example.com/file\\t2", parts[1]); + assertEquals("file:/file3", parts[2]); + } + + /** + * A custom input format that checks to make sure that the fetch operator + * sets the required attributes. + */ + public static class CustomInFmt extends TextInputFormat { + + @Override + public InputSplit[] getSplits(JobConf job, int splits) throws IOException { + + // ensure that the table properties were copied + assertEquals("val1", job.get("myprop1")); + assertEquals("val2", job.get("myprop2")); + + // ensure that both of the partitions are in the complete list. + String[] dirs = job.get("hive.complete.dir.list").split("\t"); + assertEquals(2, dirs.length); + assertEquals(true, dirs[0].endsWith("/state=CA")); + assertEquals(true, dirs[1].endsWith("/state=OR")); + return super.getSplits(job, splits); + } + } + + @Test + public void testFetchOperatorContext() throws Exception { + HiveConf conf = new HiveConf(); + conf.set("hive.support.concurrency", "false"); + SessionState.start(conf); + String cmd = "create table fetchOp (id int, name string) " + + "partitioned by (state string) " + + "row format delimited fields terminated by '|' " + + "stored as " + + "inputformat 'org.apache.hadoop.hive.ql.exec.TestOperators$CustomInFmt' " + + "outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " + + "tblproperties ('myprop1'='val1', 'myprop2' = 'val2')"; + Driver driver = new Driver(); + driver.init(); + CommandProcessorResponse response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + List result = new ArrayList(); + + cmd = "load data local inpath '../data/files/employee.dat' " + + "overwrite into table fetchOp partition (state='CA')"; + driver.init(); + response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + + cmd = "load data local inpath '../data/files/employee2.dat' " + + "overwrite into table fetchOp partition (state='OR')"; + driver.init(); + response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + + cmd = "select * from fetchOp"; + driver.init(); + driver.setMaxRows(500); + response = driver.run(cmd); + assertEquals(0, response.getResponseCode()); + driver.getResults(result); + assertEquals(20, result.size()); + driver.close(); + } }