diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index 412a046..5aecd68 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -97,6 +97,7 @@ KW_EXPORT: 'EXPORT'; KW_IMPORT: 'IMPORT'; KW_DATA: 'DATA'; KW_INPATH: 'INPATH'; +KW_INSTREAM: 'INSTREAM'; KW_IS: 'IS'; KW_NULL: 'NULL'; KW_CREATE: 'CREATE'; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index f934ac4..740fc00 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -399,6 +399,7 @@ import java.util.HashMap; xlateMap.put("KW_LOAD", "LOAD"); xlateMap.put("KW_DATA", "DATA"); xlateMap.put("KW_INPATH", "INPATH"); + xlateMap.put("KW_INSTREAM", "INSTREAM"); xlateMap.put("KW_IS", "IS"); xlateMap.put("KW_NULL", "NULL"); xlateMap.put("KW_CREATE", "CREATE"); @@ -644,8 +645,8 @@ execStatement loadStatement @init { pushMsg("load statement", state); } @after { popMsg(state); } - : KW_LOAD KW_DATA (islocal=KW_LOCAL)? KW_INPATH (path=StringLiteral) (isoverwrite=KW_OVERWRITE)? KW_INTO KW_TABLE (tab=tableOrPartition) - -> ^(TOK_LOAD $path $tab $islocal? $isoverwrite?) + : KW_LOAD KW_DATA (islocal=KW_LOCAL)? (KW_INPATH|isstream=KW_INSTREAM) (path=StringLiteral) (isoverwrite=KW_OVERWRITE)? KW_INTO KW_TABLE (tab=tableOrPartition) + -> ^(TOK_LOAD $path $tab $islocal? $isoverwrite? $isstream?) ; exportStatement diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 8bd24d3..8b8b47a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; -import org.apache.commons.httpclient.URIException; import org.apache.commons.httpclient.util.URIUtil; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; @@ -41,6 +40,8 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; import java.net.URI; @@ -55,9 +56,6 @@ */ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { - private boolean isLocal; - private boolean isOverWrite; - public LoadSemanticAnalyzer(HiveConf conf) throws SemanticException { super(conf); } @@ -85,7 +83,7 @@ public boolean accept(Path p) { return (srcs); } - private URI initializeFromURI(String fromPath) throws IOException, + private URI initializeFromURI(String fromPath, boolean isLocal) throws IOException, URISyntaxException { URI fromURI = new Path(fromPath).toUri(); @@ -172,29 +170,35 @@ private void applyConstraints(URI fromURI, URI toURI, Tree ast, @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - isLocal = false; - isOverWrite = false; + // ^(TOK_LOAD $path $tab $islocal? $isoverwrite? $isStream?) + + boolean isLocal = false; + boolean isOverWrite = false; + boolean isStream = false; + Tree fromTree = ast.getChild(0); Tree tableTree = ast.getChild(1); - if (ast.getChildCount() == 4) { - isLocal = true; - isOverWrite = true; - } - - if (ast.getChildCount() == 3) { - if (ast.getChild(2).getText().toLowerCase().equals("local")) { + for (int i = 2; i < ast.getChildCount(); i++) { + String text = ast.getChild(i).getText(); + if (text.equalsIgnoreCase("LOCAL")) { isLocal = true; - } else { + } else if (text.equalsIgnoreCase("INSTREAM")) { + isStream = true; + } else if (text.equalsIgnoreCase("OVERWRITE")) { isOverWrite = true; } } + if (isStream && !conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + throw new SemanticException("'INSTREAM' is allowed only for test mode"); + } + isLocal |= isStream; // initialize load path URI fromURI; try { - String fromPath = stripQuotes(fromTree.getText()); - fromURI = initializeFromURI(fromPath); + String fromPath = getFromPath(fromTree.getText(), isStream); + fromURI = initializeFromURI(fromPath, isLocal); } catch (IOException e) { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e .getMessage()), e); @@ -315,4 +319,20 @@ else if (statTask != null) { childTask.addDependentTask(statTask); } } + + private String getFromPath(String value, boolean instream) throws IOException { + String path = stripQuotes(value); + if (instream) { + path = path.replaceAll("\\\\n", "\n"); + File temp = File.createTempFile("instream", null); + FileOutputStream fout = new FileOutputStream(temp); + try { + fout.write(path.getBytes()); + } finally { + fout.close(); + } + path = temp.getAbsolutePath(); + } + return path; + } } diff --git ql/src/test/queries/clientpositive/load_instream.q ql/src/test/queries/clientpositive/load_instream.q new file mode 100644 index 0000000..8b4b1c5 --- /dev/null +++ ql/src/test/queries/clientpositive/load_instream.q @@ -0,0 +1,5 @@ +create table kv_temp (key string, value string) ROW FORMAT delimited fields terminated by ' ' STORED AS TEXTFILE; + +load data local instream 'key value\nkey2 value2' into table kv_temp; + +select * from kv_temp; diff --git ql/src/test/results/clientpositive/load_instream.q.out ql/src/test/results/clientpositive/load_instream.q.out new file mode 100644 index 0000000..a213f71 --- /dev/null +++ ql/src/test/results/clientpositive/load_instream.q.out @@ -0,0 +1,25 @@ +PREHOOK: query: create table kv_temp (key string, value string) ROW FORMAT delimited fields terminated by ' ' STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: create table kv_temp (key string, value string) ROW FORMAT delimited fields terminated by ' ' STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kv_temp +PREHOOK: query: load data local instream 'key value\nkey2 value2' into table kv_temp +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@kv_temp +POSTHOOK: query: load data local instream 'key value\nkey2 value2' into table kv_temp +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@kv_temp +PREHOOK: query: select * from kv_temp +PREHOOK: type: QUERY +PREHOOK: Input: default@kv_temp +#### A masked pattern was here #### +POSTHOOK: query: select * from kv_temp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kv_temp +#### A masked pattern was here #### +key value +key2 value2