diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 39e5bf1..e6aa247 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -223,10 +223,10 @@ public String getName() { String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); rcwork.setDistCpDoAsUser(distCpDoAsUser); } - copyTask = TaskFactory.get(rcwork, conf); + copyTask = TaskFactory.get(rcwork, conf, true); } else { LOG.debug("ReplCopyTask:\tcwork"); - copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf); + copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf, true); } return copyTask; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 6193b90..ab495cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -160,7 +160,9 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext this.queryPlan = queryPlan; setInitialized(); this.queryState = queryState; - this.conf = queryState.getConf(); + if (null == this.conf) { + this.conf = queryState.getConf(); + } this.driverContext = driverContext; console = new LogHelper(LOG); } @@ -422,7 +424,9 @@ public boolean isRunnable() { return isrunnable; } - + public void setConf(HiveConf conf) { + this.conf = conf; + } public void setWork(T work) { this.work = work; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index e1969bb..f341cec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -154,10 +154,13 @@ public static void resetId() { } @SafeVarargs - public static Task get(T work, HiveConf conf, - Task... tasklist) { + public static Task get(T work, HiveConf conf, boolean setConf, + Task... tasklist) { Task ret = get((Class) work.getClass(), conf); ret.setWork(work); + if (setConf && (null != conf)) { + ret.setConf(conf); + } if (tasklist.length == 0) { return (ret); } @@ -170,6 +173,12 @@ public static void resetId() { return (ret); } + @SafeVarargs + public static Task get(T work, HiveConf conf, + Task... tasklist) { + return get(work, conf, false, tasklist); + } + public static Task getAndMakeChild(T work, HiveConf conf, Task... tasklist) { Task ret = get((Class) work.getClass(), conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 4d8d06a..bf5c819 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -287,7 +287,7 @@ private void createBuilderTask(List> rootTasks, use loadTask as dependencyCollection */ if (shouldCreateAnotherLoadTask) { - Task loadTask = TaskFactory.get(work, conf); + Task loadTask = TaskFactory.get(work, conf, true); dependency(rootTasks, loadTask); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index d238833..020a300 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -330,6 +330,7 @@ TOK_DBPROPLIST; TOK_ALTERDATABASE_PROPERTIES; TOK_ALTERDATABASE_OWNER; TOK_ALTERDATABASE_LOCATION; +TOK_DBNAME; TOK_TABNAME; TOK_TABSRC; TOK_RESTRICT; @@ -393,6 +394,8 @@ TOK_DELETE; TOK_REPL_DUMP; TOK_REPL_LOAD; TOK_REPL_STATUS; +TOK_REPL_CONFIG; +TOK_REPL_CONFIG_LIST; TOK_TO; TOK_ONLY; TOK_SUMMARY; @@ -842,9 +845,24 @@ replLoadStatement : KW_REPL KW_LOAD ((dbName=identifier) (DOT tblName=identifier)?)? KW_FROM (path=StringLiteral) - -> ^(TOK_REPL_LOAD $path $dbName? $tblName?) + (KW_WITH replConf=replConfigs)? + -> ^(TOK_REPL_LOAD $path ^(TOK_DBNAME $dbName)? ^(TOK_TABNAME $tblName)? $replConf?) ; +replConfigs +@init { pushMsg("repl configurations", state); } +@after { popMsg(state); } + : + LPAREN replConfigsList RPAREN -> ^(TOK_REPL_CONFIG replConfigsList) + ; + +replConfigsList +@init { pushMsg("repl configurations list", state); } +@after { popMsg(state); } + : + keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST keyValueProperty+) + ; + replStatusStatement @init { pushMsg("replication load statement", state); } @after { popMsg(state); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index e7e7f9b..ade47ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -22,6 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -56,11 +57,14 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { @@ -73,6 +77,9 @@ Licensed to the Apache Software Foundation (ASF) under one private Integer maxEventLimit; // Base path for REPL LOAD private String path; + // Added conf member to set the REPL command specific config entries without affecting the configs + // of any other queries running in the session + private HiveConf conf; private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; @@ -82,6 +89,7 @@ Licensed to the Apache Software Foundation (ASF) under one ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); + this.conf = new HiveConf(super.conf); } @Override @@ -189,14 +197,30 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } // REPL LOAD - private void initReplLoad(ASTNode ast) { - int numChildren = ast.getChildCount(); + private void initReplLoad(ASTNode ast) throws SemanticException { path = PlanUtils.stripQuotes(ast.getChild(0).getText()); - if (numChildren > 1) { - dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText()); - } - if (numChildren > 2) { - tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(2).getText()); + int numChildren = ast.getChildCount(); + for (int i = 1; i < numChildren; i++) { + ASTNode childNode = (ASTNode) ast.getChild(i); + switch (childNode.getToken().getType()) { + case TOK_DBNAME: + dbNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + break; + case TOK_TABNAME: + tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + break; + case TOK_REPL_CONFIG: + Map replConfigs + = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0)); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + conf.set(config.getKey(), config.getValue()); + } + } + break; + default: + throw new SemanticException("Unrecognized token in REPL LOAD statement"); + } } } @@ -289,7 +313,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); - rootTasks.add(TaskFactory.get(replLoadWork, conf)); + rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); return; } @@ -320,7 +344,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); - rootTasks.add(TaskFactory.get(replLoadWork, conf)); + rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); // // for (FileStatus dir : dirsInLoadPath) { // analyzeDatabaseLoad(dbNameOrPattern, fs, dir); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java index 17cf4d0..3305998 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java @@ -191,11 +191,14 @@ public void testReplLoadParse() throws Exception { ParseDriver pd = new ParseDriver(); ASTNode root; ASTNode child; + ASTNode subChild; + ASTNode configNode; String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); Path dumpRoot = new Path(replRoot, "next"); System.out.println(replRoot); System.out.println(dumpRoot); String newDB = "default_bak"; + String newDB2= "default_bak_2"; String query = "repl load from '" + dumpRoot.toString() + "'"; root = (ASTNode) pd.parse(query).getChild(0); @@ -213,8 +216,42 @@ public void testReplLoadParse() throws Exception { assertEquals(child.getText(), "'" + dumpRoot.toString() + "'"); assertEquals(child.getChildCount(), 0); child = (ASTNode) root.getChild(1); - assertEquals(child.getText(), newDB); + assertEquals(child.getText(), "TOK_DBNAME"); + assertEquals(child.getChildCount(), 1); + subChild = (ASTNode) child.getChild(0); + assertEquals(subChild.getText(), newDB); + assertEquals(subChild.getChildCount(), 0); + + query = "repl load " + newDB2 + " from '" + dumpRoot.toString() + + "' with ('mapred.job.queue.name'='repl','hive.repl.approx.max.load.tasks'='100')"; + root = (ASTNode) pd.parse(query).getChild(0); + assertEquals(root.getText(), "TOK_REPL_LOAD"); + assertEquals(root.getChildCount(), 3); + child = (ASTNode) root.getChild(0); + assertEquals(child.getText(), "'" + dumpRoot.toString() + "'"); assertEquals(child.getChildCount(), 0); + child = (ASTNode) root.getChild(1); + assertEquals(child.getText(), "TOK_DBNAME"); + assertEquals(child.getChildCount(), 1); + subChild = (ASTNode) child.getChild(0); + assertEquals(subChild.getText(), newDB2); + assertEquals(subChild.getChildCount(), 0); + child = (ASTNode) root.getChild(2); + assertEquals(child.getText(), "TOK_REPL_CONFIG"); + assertEquals(child.getChildCount(), 1); + subChild = (ASTNode) child.getChild(0); + assertEquals(subChild.getText(), "TOK_REPL_CONFIG_LIST"); + assertEquals(subChild.getChildCount(), 2); + configNode = (ASTNode) subChild.getChild(0); + assertEquals(configNode.getText(), "TOK_TABLEPROPERTY"); + assertEquals(configNode.getChildCount(), 2); + assertEquals(configNode.getChild(0).getText(), "'mapred.job.queue.name'"); + assertEquals(configNode.getChild(1).getText(), "'repl'"); + configNode = (ASTNode) subChild.getChild(1); + assertEquals(configNode.getText(), "TOK_TABLEPROPERTY"); + assertEquals(configNode.getChildCount(), 2); + assertEquals(configNode.getChild(0).getText(), "'hive.repl.approx.max.load.tasks'"); + assertEquals(configNode.getChild(1).getText(), "'100'"); } //@Test