diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLine.java b/beeline/src/java/org/apache/hive/beeline/BeeLine.java index b7d2f2e..c5ca772 100644 --- a/beeline/src/java/org/apache/hive/beeline/BeeLine.java +++ b/beeline/src/java/org/apache/hive/beeline/BeeLine.java @@ -87,6 +87,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.IOUtils; import org.apache.hive.beeline.cli.CliOptionsProcessor; import org.apache.hive.jdbc.Utils; @@ -901,27 +902,6 @@ private int executeFile(String fileName, boolean isSourceCMD) { } } - private boolean isSourceCMD(String cmd) { - if (cmd == null || cmd.isEmpty()) - return false; - String[] tokens = tokenizeCmd(cmd); - return tokens[0].equalsIgnoreCase("!source"); - } - - private boolean sourceFile(String cmd) { - String[] tokens = tokenizeCmd(cmd); - String cmd_1 = getFirstCmd(cmd, tokens[0].length()); - File sourceFile = new File(cmd_1); - if (!sourceFile.isFile()) { - return false; - } else { - boolean ret = (executeFile(cmd_1, true) == ERRNO_OK); - // For source command, we should not exit even when meeting some empty line. - setExit(false); - return ret; - } - } - private int execute(ConsoleReader reader, boolean exitOnError) { String line; while (!exit) { @@ -933,10 +913,6 @@ private int execute(ConsoleReader reader, boolean exitOnError) { // trim line line = (line == null) ? null : line.trim(); - if (!isBeeLine) { - line = cliToBeelineCmd(line); - } - if (!dispatch(line) && exitOnError) { return ERRNO_OTHER; } @@ -1043,31 +1019,6 @@ void usage() { } /** - * Extract and clean up the first command in the input. - */ - private String getFirstCmd(String cmd, int length) { - return cmd.substring(length).trim(); - } - - private String cliToBeelineCmd(String cmd) { - if (cmd == null) - return null; - String[] tokens = tokenizeCmd(cmd); - if (cmd.equalsIgnoreCase("quit") || cmd.equalsIgnoreCase("exit") - || cmd.equalsIgnoreCase("quit;") || cmd.equals("exit;")) { - return null; - } else if (tokens[0].equalsIgnoreCase("source")) { - return COMMAND_PREFIX + cmd; - } else if (cmd.startsWith("!")) { - String shell_cmd = cmd.substring(1); - return "!sh " + shell_cmd; - } else { // local mode - // command like dfs - return cmd; - } - } - - /** * Dispatch the specified line to the appropriate {@link CommandHandler}. * * @param line @@ -1089,10 +1040,6 @@ boolean dispatch(String line) { return true; } - if(isSourceCMD(line)){ - return sourceFile(line); - } - line = line.trim(); // save it to the current script, if any @@ -2034,6 +1981,10 @@ void setCompletions() throws SQLException, IOException { } } + public HiveConf getHiveConf(){ + return commands.getHiveConf(); + } + public BeeLineOpts getOpts() { return opts; } @@ -2121,4 +2072,12 @@ void setBatch(List batch) { protected Reflector getReflector() { return reflector; } + + public boolean isBeeLine() { + return isBeeLine; + } + + public void setBeeLine(boolean isBeeLine) { + this.isBeeLine = isBeeLine; + } } diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java index a42baa3..8ae5466 100644 --- a/beeline/src/java/org/apache/hive/beeline/Commands.java +++ b/beeline/src/java/org/apache/hive/beeline/Commands.java @@ -22,6 +22,10 @@ */ package org.apache.hive.beeline; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.SystemVariables; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.io.IOUtils; import java.io.BufferedReader; @@ -32,7 +36,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.lang.reflect.Method; -import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.sql.CallableStatement; @@ -44,9 +47,11 @@ import java.sql.Statement; import java.sql.SQLWarning; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TreeSet; @@ -709,6 +714,19 @@ public boolean sql(String line) { return execute(line, false); } + private String substitution(HiveConf conf, String line) { + if (!beeLine.isBeeLine()) { + // Substitution is only supported in non-beeline mode. + //return new BeelineVariableSubstitution(beeLine).substitute(conf, line); + return new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return beeLine.getCommands().getHiveVariables(); + } + }).substitute(conf, line); + } + return line; + } + public boolean sh(String line) { if (line == null || line.length() == 0) { return false; @@ -719,9 +737,7 @@ public boolean sh(String line) { } line = line.substring("sh".length()).trim(); - - // Support variable substitution. HIVE-6791. - // line = new VariableSubstitution().substitute(new HiveConf(BeeLine.class), line.trim()); + line = substitution(getHiveConf(), line.trim()); try { ShellCmdExecutor executor = new ShellCmdExecutor(line, beeLine.getOutputStream(), @@ -743,6 +759,272 @@ public boolean call(String line) { return execute(line, true); } + public Map getHiveVariables() { + Map result = new HashMap<>(); + Statement stmnt = null; + boolean hasResults; + try { + stmnt = beeLine.createStatement(); + hasResults = stmnt.execute("set"); + if (hasResults) { + ResultSet rs = stmnt.getResultSet(); + BufferedRows rows = new BufferedRows(beeLine, rs); + + while (rows.hasNext()) { + Rows.Row row = (Rows.Row) rows.next(); + if (!row.isMeta) { + result.put(row.values[0], row.values[1]); + } + } + } + } catch (SQLException e) { + beeLine.error(e); + } + return result; + } + + public HiveConf getHiveConf() { + HiveConf conf = new HiveConf(); + Statement stmnt; + boolean hasResults; + try { + stmnt = beeLine.createStatement(); + hasResults = stmnt.execute("set"); + if (hasResults) { + ResultSet rs = stmnt.getResultSet(); + BufferedRows rows = new BufferedRows(beeLine, rs); + while (rows.hasNext()) { + addConf((Rows.Row) rows.next(), conf); + } + } + } catch (SQLException e) { + beeLine.error(e); + } + return conf; + } + + private void addConf(Rows.Row r, HiveConf hiveConf) { + if (r.isMeta) { + return; + } + if (r.values == null || r.values[0] == null || r.values[0].isEmpty()) { + return; + } + String val = r.values[0]; + if (r.values[0].startsWith(SystemVariables.SYSTEM_PREFIX) || r.values[0].startsWith(SystemVariables.ENV_PREFIX)) { + return; + } else { + String[] kv = val.split("=",2); + hiveConf.set(kv[0], kv[1]); + } + } + + /** + * Extract and clean up the first command in the input. + */ + private String getFirstCmd(String cmd, int length) { + return cmd.substring(length).trim(); + } + + private String[] tokenizeCmd(String cmd) { + return cmd.split("\\s+"); + } + + private boolean isSourceCMD(String cmd) { + if (cmd == null || cmd.isEmpty()) + return false; + String[] tokens = tokenizeCmd(cmd); + return tokens[0].equalsIgnoreCase("!source"); + } + + private boolean sourceFile(String cmd) { + String[] tokens = tokenizeCmd(cmd); + String cmd_1 = getFirstCmd(cmd, tokens[0].length()); + + cmd_1 = substitution(getHiveConf(), cmd_1); + File sourceFile = new File(cmd_1); + if (!sourceFile.isFile()) { + return false; + } else { + boolean ret; + try { + ret = sourceFileInternal(sourceFile, true); + } catch (IOException e) { + beeLine.error(e); + return false; + } + return ret; + } + } + + private boolean sourceFileInternal(File sourceFile, boolean call) throws IOException { + BufferedReader reader = new BufferedReader(new FileReader(sourceFile)); + String extra = reader.readLine(); + String lines = null; + while (extra != null) { + if (beeLine.isComment(extra)) { + continue; + } + if (lines == null) { + lines = extra; + } else { + lines += "\n" + extra; + } + extra = reader.readLine(); + } + String[] cmds = lines.split(";"); + for (String c : cmds) { + if (!executeInternal(c, false)) { + return false; + } + } + return true; + } + + private String cliToBeelineCmd(String cmd) { + if (cmd == null) + return null; + String[] tokens = tokenizeCmd(cmd); + if (tokens[0].equalsIgnoreCase("source")) { + return BeeLine.COMMAND_PREFIX + cmd; + } else if (cmd.startsWith("!")) { + String shell_cmd = cmd.substring(1); + return "!sh " + shell_cmd; + } else { // local mode + // command like dfs + return cmd; + } + } + + // Return false only occurred error when execution the sql and the sql should follow the rules + // of beeline. + private boolean executeInternal(String sql, boolean call){ + if (sql == null || sql.length() == 0) { + return true; + } + + if (beeLine.isComment(sql)) { + //skip this and rest cmds in the line + return true; + } + + if (sql.startsWith(BeeLine.COMMAND_PREFIX)) { + sql = sql.substring(1); + } + + String prefix = call ? "call" : "sql"; + + if (sql.startsWith(prefix)) { + sql = sql.substring(prefix.length()); + } + + // batch statements? + if (beeLine.getBatch() != null) { + beeLine.getBatch().add(sql); + return true; + } + + try { + Statement stmnt = null; + boolean hasResults; + Thread logThread = null; + + try { + long start = System.currentTimeMillis(); + + if (call) { + stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql); + hasResults = ((CallableStatement) stmnt).execute(); + } else { + stmnt = beeLine.createStatement(); + if (beeLine.getOpts().isSilent()) { + hasResults = stmnt.execute(sql); + } else { + logThread = new Thread(createLogRunnable(stmnt)); + logThread.setDaemon(true); + logThread.start(); + hasResults = stmnt.execute(sql); + logThread.interrupt(); + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + } + } + + beeLine.showWarnings(); + + if (hasResults) { + do { + ResultSet rs = stmnt.getResultSet(); + try { + int count = beeLine.print(rs); + long end = System.currentTimeMillis(); + + beeLine.info( + beeLine.loc("rows-selected", count) + " " + beeLine.locElapsedTime(end - start)); + } finally { + if (logThread != null) { + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + showRemainingLogsIfAny(stmnt); + logThread = null; + } + rs.close(); + } + } while (BeeLine.getMoreResults(stmnt)); + } else { + int count = stmnt.getUpdateCount(); + long end = System.currentTimeMillis(); + beeLine.info( + beeLine.loc("rows-affected", count) + " " + beeLine.locElapsedTime(end - start)); + } + } finally { + if (logThread != null) { + if (!logThread.isInterrupted()) { + logThread.interrupt(); + } + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + showRemainingLogsIfAny(stmnt); + } + if (stmnt != null) { + stmnt.close(); + } + } + } catch (Exception e) { + return beeLine.error(e); + } + beeLine.showWarnings(); + return true; + } + + public String handleMultiLineCmd(String line) throws IOException { + //When using -e, console reader is not initialized and command is a single line + while (beeLine.getConsoleReader() != null && !(line.trim().endsWith(";")) && beeLine.getOpts() + .isAllowMultiLineCommand()) { + + if (!beeLine.getOpts().isSilent()) { + StringBuilder prompt = new StringBuilder(beeLine.getPrompt()); + for (int i = 0; i < prompt.length() - 1; i++) { + if (prompt.charAt(i) != '>') { + prompt.setCharAt(i, i % 2 == 0 ? '.' : ' '); + } + } + } + + String extra; + if (beeLine.getOpts().isSilent() && beeLine.getOpts().getScriptFile() != null) { + extra = beeLine.getConsoleReader().readLine(null, jline.console.ConsoleReader.NULL_MASK); + } else { + extra = beeLine.getConsoleReader().readLine(beeLine.getPrompt()); + } + + if (extra == null) { //it happens when using -f and the line of cmds does not end with ; + break; + } + if (!beeLine.isComment(extra)) { + line += "\n" + extra; + } + } + return line; + } + private boolean execute(String line, boolean call) { if (line == null || line.length() == 0) { return false; // ??? @@ -756,33 +1038,7 @@ private boolean execute(String line, boolean call) { // use multiple lines for statements not terminated by ";" try { - //When using -e, console reader is not initialized and command is a single line - while (beeLine.getConsoleReader() != null && !(line.trim().endsWith(";")) - && beeLine.getOpts().isAllowMultiLineCommand()) { - - if (!beeLine.getOpts().isSilent()) { - StringBuilder prompt = new StringBuilder(beeLine.getPrompt()); - for (int i = 0; i < prompt.length() - 1; i++) { - if (prompt.charAt(i) != '>') { - prompt.setCharAt(i, i % 2 == 0 ? '.' : ' '); - } - } - } - - String extra = null; - if (beeLine.getOpts().isSilent() && beeLine.getOpts().getScriptFile() != null) { - extra = beeLine.getConsoleReader().readLine(null, jline.console.ConsoleReader.NULL_MASK); - } else { - extra = beeLine.getConsoleReader().readLine(beeLine.getPrompt()); - } - - if (extra == null) { //it happens when using -f and the line of cmds does not end with ; - break; - } - if (!beeLine.isComment(extra)) { - line += "\n" + extra; - } - } + line = handleMultiLineCmd(line); } catch (Exception e) { beeLine.handleException(e); } @@ -796,93 +1052,23 @@ private boolean execute(String line, boolean call) { for (int i = 0; i < cmds.length; i++) { String sql = cmds[i].trim(); if (sql.length() != 0) { - if (beeLine.isComment(sql)) { - //skip this and rest cmds in the line - break; - } - if (sql.startsWith(BeeLine.COMMAND_PREFIX)) { - sql = sql.substring(1); - } - - String prefix = call ? "call" : "sql"; - - if (sql.startsWith(prefix)) { - sql = sql.substring(prefix.length()); + if (!beeLine.isBeeLine()) { + sql = cliToBeelineCmd(sql); + if (sql.equalsIgnoreCase("quit") || sql.equalsIgnoreCase("exit")) { + beeLine.setExit(true); + return true; + } } - // batch statements? - if (beeLine.getBatch() != null) { - beeLine.getBatch().add(sql); + // is source CMD + if (isSourceCMD(sql)) { + sourceFile(sql); continue; } - try { - Statement stmnt = null; - boolean hasResults; - Thread logThread = null; - - try { - long start = System.currentTimeMillis(); - - if (call) { - stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql); - hasResults = ((CallableStatement) stmnt).execute(); - } else { - stmnt = beeLine.createStatement(); - if (beeLine.getOpts().isSilent()) { - hasResults = stmnt.execute(sql); - } else { - logThread = new Thread(createLogRunnable(stmnt)); - logThread.setDaemon(true); - logThread.start(); - hasResults = stmnt.execute(sql); - logThread.interrupt(); - logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); - } - } - - beeLine.showWarnings(); - - if (hasResults) { - do { - ResultSet rs = stmnt.getResultSet(); - try { - int count = beeLine.print(rs); - long end = System.currentTimeMillis(); - - beeLine.info(beeLine.loc("rows-selected", count) + " " - + beeLine.locElapsedTime(end - start)); - } finally { - if (logThread != null) { - logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); - showRemainingLogsIfAny(stmnt); - logThread = null; - } - rs.close(); - } - } while (BeeLine.getMoreResults(stmnt)); - } else { - int count = stmnt.getUpdateCount(); - long end = System.currentTimeMillis(); - beeLine.info(beeLine.loc("rows-affected", count) - + " " + beeLine.locElapsedTime(end - start)); - } - } finally { - if (logThread != null) { - if (!logThread.isInterrupted()) { - logThread.interrupt(); - } - logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); - showRemainingLogsIfAny(stmnt); - } - if (stmnt != null) { - stmnt.close(); - } - } - } catch (Exception e) { - return beeLine.error(e); + if (!executeInternal(sql, call)) { + return false; } - beeLine.showWarnings(); } } return true; diff --git a/beeline/src/test/org/apache/hive/beeline/cli/TestHiveCli.java b/beeline/src/test/org/apache/hive/beeline/cli/TestHiveCli.java index 6cbb030..8764f7e 100644 --- a/beeline/src/test/org/apache/hive/beeline/cli/TestHiveCli.java +++ b/beeline/src/test/org/apache/hive/beeline/cli/TestHiveCli.java @@ -42,6 +42,10 @@ private final static String SOURCE_CONTEXT = "create table if not exists test.testSrcTbl(a string, b string);"; + private final static String SOURCE_CONTEXT2 = + "create table if not exists test.testSrcTbl2(a string);"; + private final static String SOURCE_CONTEXT3 = + "create table if not exists test.testSrcTbl3(a string);"; final static String CMD = "create database if not exists test;\ncreate table if not exists test.testTbl(a string, b " + "string);\n"; @@ -60,7 +64,7 @@ private void executeCMD(String[] args, String input, int retCode) { inputStream = IOUtils.toInputStream(input); } ret = cli.runWithArgs(args, inputStream); - } catch (IOException e) { + } catch (Throwable e) { LOG.error("Failed to execute command due to the error: " + e); } finally { if (retCode != ret) { @@ -78,7 +82,7 @@ private void verifyCMD(String CMD, String keywords, OutputStream os, String[] op } @Test public void testInValidCmd() { - verifyCMD("!lss\n", "Failed to execute lss", errS, null, ERRNO_OK); + verifyCMD("!lss\n", "Unknown command: lss", errS, null, ERRNO_OK); } @Test public void testHelp() { @@ -86,7 +90,7 @@ private void verifyCMD(String CMD, String keywords, OutputStream os, String[] op } @Test public void testInvalidDatabaseOptions() { - verifyCMD("\nshow tables\nquit\n", "Database does not exist: invalidDB", errS, + verifyCMD("\nshow tables;\nquit;\n", "Database does not exist: invalidDB", errS, new String[] { "--database", "invalidDB" }, ERRNO_OK); } @@ -97,8 +101,16 @@ private void verifyCMD(String CMD, String keywords, OutputStream os, String[] op @Test public void testSourceCmd() { File f = generateTmpFile(SOURCE_CONTEXT); - verifyCMD("source " + f.getPath() + "\n" + "desc testSrcTbl\n" + "quit\n", "col_name", os, + verifyCMD("source " + f.getPath() + ";" + "desc testSrcTbl;\nquit;\n", "col_name", os, new String[] { "--database", "test" }, ERRNO_OK); + f.delete(); + } + + @Test public void testSourceCmd2() { + File f = generateTmpFile(SOURCE_CONTEXT3); + verifyCMD("source " + f.getPath() + ";" + "desc testSrcTbl3;\nquit;\n", "col_name", os, + new String[] { "--database", "test" }, ERRNO_OK); + f.delete(); } @Test public void testSqlFromCmd() { @@ -119,6 +131,19 @@ private void verifyCMD(String CMD, String keywords, OutputStream os, String[] op verifyCMD(null, "Unrecognized option: -k", errS, new String[] { "-k" }, ERRNO_ARGS); } + @Test public void testVariables() { + verifyCMD("set system:xxx=5;\nset system:yyy=${system:xxx};\nset system:yyy;", "", os, null, + ERRNO_OK); + } + + @Test public void testVariablesForSource() { + File f = generateTmpFile(SOURCE_CONTEXT2); + verifyCMD( + "set hiveconf:zzz=" + f.getAbsolutePath() + ";\nsource ${hiveconf:zzz};\ndesc testSrcTbl2;", + "col_name", os, new String[] { "--database", "test" }, ERRNO_OK); + f.delete(); + } + private void redirectOutputStream() { // Setup output stream to redirect output to os = new ByteArrayOutputStream(); diff --git a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index d62fd5c..1106b80 100644 --- a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -61,7 +61,9 @@ import org.apache.hadoop.hive.common.io.CachingPrintStream; import org.apache.hadoop.hive.common.io.FetchConverter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.Validator; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; @@ -69,7 +71,6 @@ import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper; import org.apache.hadoop.hive.ql.exec.tez.TezJobExecHelper; import org.apache.hadoop.hive.ql.parse.HiveParser; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -127,7 +128,11 @@ public int processCmd(String cmd) { } else if (tokens[0].equalsIgnoreCase("source")) { String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length()); - cmd_1 = new VariableSubstitution().substitute(ss.getConf(), cmd_1); + cmd_1 = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(), cmd_1); File sourceFile = new File(cmd_1); if (! sourceFile.isFile()){ @@ -145,7 +150,11 @@ public int processCmd(String cmd) { } else if (cmd_trimmed.startsWith("!")) { String shell_cmd = cmd_trimmed.substring(1); - shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd); + shell_cmd = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(), shell_cmd); // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'"; try { @@ -671,7 +680,11 @@ public int run(String[] args) throws Exception { // read prompt configuration and substitute variables. prompt = conf.getVar(HiveConf.ConfVars.CLIPROMPT); - prompt = new VariableSubstitution().substitute(conf, prompt); + prompt = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(conf, prompt); prompt2 = spacesForString(prompt); SessionState.start(ss); diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveVariableSource.java b/common/src/java/org/apache/hadoop/hive/conf/HiveVariableSource.java new file mode 100644 index 0000000..86f7a9c --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveVariableSource.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.conf; + +import java.util.Map; + +public interface HiveVariableSource { + Map getHiveVariable(); +} diff --git a/common/src/java/org/apache/hadoop/hive/conf/VariableSubstitution.java b/common/src/java/org/apache/hadoop/hive/conf/VariableSubstitution.java new file mode 100644 index 0000000..72fd0fc --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/conf/VariableSubstitution.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.conf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +public class VariableSubstitution extends SystemVariables{ + private static final Log l4j = LogFactory.getLog(VariableSubstitution.class); + + private HiveVariableSource hiveHiveVariableSource; + + public VariableSubstitution(HiveVariableSource hiveHiveVariableSource) { + this.hiveHiveVariableSource = hiveHiveVariableSource; + } + + @Override + protected String getSubstitute(Configuration conf, String var) { + String val = super.getSubstitute(conf, var); + if (val == null && hiveHiveVariableSource != null) { + Map vars = hiveHiveVariableSource.getHiveVariable(); + if (var.startsWith(HIVEVAR_PREFIX)) { + val = vars.get(var.substring(HIVEVAR_PREFIX.length())); + } else { + val = vars.get(var); + } + } + return val; + } + + public String substitute(HiveConf conf, String expr) { + if (expr == null) { + return expr; + } + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEVARIABLESUBSTITUTE)) { + l4j.debug("Substitution is on: " + expr); + } else { + return expr; + } + int depth = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVEVARIABLESUBSTITUTEDEPTH); + return substitute(conf, expr, depth); + } +} diff --git a/common/src/test/org/apache/hadoop/hive/conf/TestVariableSubstitution.java b/common/src/test/org/apache/hadoop/hive/conf/TestVariableSubstitution.java new file mode 100644 index 0000000..faa07a7 --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/conf/TestVariableSubstitution.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.conf; + +import junit.framework.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class TestVariableSubstitution { + private static class LocalMySource { + final Map map = new HashMap<>(); + + public void put(String k, String v) { + map.put(k, v); + } + + public String get(String k) { + return map.get(k); + } + } + + private static LocalMySource getMySource() { + return localSource.get(); + } + + private static ThreadLocal localSource = new ThreadLocal() { + @Override protected LocalMySource initialValue() { + return new LocalMySource(); + } + }; + + @Test public void testVariableSource() throws InterruptedException { + final VariableSubstitution variableSubstitution = + new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return TestVariableSubstitution.getMySource().map; + } + }); + + String v = variableSubstitution.substitute(new HiveConf(), "${a}"); + Assert.assertEquals("${a}", v); + TestVariableSubstitution.getMySource().put("a", "b"); + v = variableSubstitution.substitute(new HiveConf(), "${a}"); + Assert.assertEquals("b", v); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 338e755..f6320ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -96,7 +98,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -385,7 +386,11 @@ public int compile(String command, boolean resetTaskIds) { SessionState.get().setupQueryCurrentTimestamp(); try { - command = new VariableSubstitution().substitute(conf,command); + command = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(conf,command); ctx = new Context(conf); ctx.setTryCount(getTryCount()); ctx.setCmd(command); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java index a5f0a7f..06e1245 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java @@ -28,12 +28,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; @@ -216,10 +217,10 @@ private String genPartValueString (String partKey, String partVal) throws Semant //for other usually not used types, just quote the value returnVal = "'" + partVal + "'"; } - + return returnVal; } - + private String getColTypeOf (String partKey) throws SemanticException{ for (FieldSchema fs : tbl.getPartitionKeys()) { @@ -333,7 +334,11 @@ private String genRewrittenQuery(List colNames, int numBitVectors, Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(conf, rewrittenQuery); return rewrittenQuery; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/VariableSubstitution.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/VariableSubstitution.java deleted file mode 100644 index e8b1d96..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/VariableSubstitution.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.conf.SystemVariables; - -import java.util.Map; - -public class VariableSubstitution extends SystemVariables { - - private static final Log l4j = LogFactory.getLog(VariableSubstitution.class); - - @Override - protected String getSubstitute(Configuration conf, String var) { - String val = super.getSubstitute(conf, var); - if (val == null && SessionState.get() != null) { - Map vars = SessionState.get().getHiveVariables(); - if (var.startsWith(HIVEVAR_PREFIX)) { - val = vars.get(var.substring(HIVEVAR_PREFIX.length())); - } else { - val = vars.get(var); - } - } - return val; - } - - public String substitute(HiveConf conf, String expr) { - if (expr == null) { - return expr; - } - if (HiveConf.getBoolVar(conf, ConfVars.HIVEVARIABLESUBSTITUTE)) { - l4j.debug("Substitution is on: " + expr); - } else { - return expr; - } - int depth = HiveConf.getIntVar(conf, ConfVars.HIVEVARIABLESUBSTITUTEDEPTH); - return substitute(conf, expr, depth); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/AddResourceProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/AddResourceProcessor.java index 0558c53..64822a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/AddResourceProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/AddResourceProcessor.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hive.ql.processors; import java.util.Arrays; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -45,7 +47,11 @@ public void init() { @Override public CommandProcessorResponse run(String command) { SessionState ss = SessionState.get(); - command = new VariableSubstitution().substitute(ss.getConf(),command); + command = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(),command); String[] tokens = command.split("\\s+"); SessionState.ResourceType t; if (tokens.length < 2 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/CompileProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/CompileProcessor.java index 25ce168..0986209 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/CompileProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/CompileProcessor.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicInteger; @@ -32,9 +33,10 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -142,7 +144,11 @@ public CommandProcessorResponse run(String command) throws CommandNeedRetryExcep @VisibleForTesting void parse(SessionState ss) throws CompileProcessorException { if (ss != null){ - command = new VariableSubstitution().substitute(ss.getConf(), command); + command = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(), command); } if (command == null || command.length() == 0) { throw new CompileProcessorException("Command was empty"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/DeleteResourceProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/DeleteResourceProcessor.java index 9052c82..7246f4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/DeleteResourceProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/DeleteResourceProcessor.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hive.ql.processors; import java.util.Arrays; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -44,7 +46,11 @@ public void init() { @Override public CommandProcessorResponse run(String command) { SessionState ss = SessionState.get(); - command = new VariableSubstitution().substitute(ss.getConf(),command); + command = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(),command); String[] tokens = command.split("\\s+"); SessionState.ResourceType t; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java index cc0414d..a6905c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java @@ -20,14 +20,16 @@ import java.io.PrintStream; import java.util.Arrays; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -65,7 +67,11 @@ public CommandProcessorResponse run(String command) { try { SessionState ss = SessionState.get(); - command = new VariableSubstitution().substitute(ss.getConf(),command); + command = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(), command); String[] tokens = command.split("\\s+"); CommandProcessorResponse authErrResp = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java index bc9254c..9f01a4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java @@ -30,10 +30,11 @@ import java.util.TreeMap; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -121,17 +122,30 @@ public static int setVariable(String varname, String varvalue) throws Exception return 1; } else if (varname.startsWith(SYSTEM_PREFIX)){ String propName = varname.substring(SYSTEM_PREFIX.length()); - System.getProperties().setProperty(propName, new VariableSubstitution().substitute(ss.getConf(),varvalue)); + System.getProperties() + .setProperty(propName, new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(), varvalue)); } else if (varname.startsWith(HIVECONF_PREFIX)){ String propName = varname.substring(HIVECONF_PREFIX.length()); setConf(varname, propName, varvalue, false); } else if (varname.startsWith(HIVEVAR_PREFIX)) { String propName = varname.substring(HIVEVAR_PREFIX.length()); - ss.getHiveVariables().put(propName, new VariableSubstitution().substitute(ss.getConf(),varvalue)); + ss.getHiveVariables().put(propName, new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(), varvalue)); } else if (varname.startsWith(METACONF_PREFIX)) { String propName = varname.substring(METACONF_PREFIX.length()); Hive hive = Hive.get(ss.getConf()); - hive.setMetaConf(propName, new VariableSubstitution().substitute(ss.getConf(), varvalue)); + hive.setMetaConf(propName, new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(ss.getConf(), varvalue)); } else { setConf(varname, varname, varvalue, true); } @@ -142,7 +156,11 @@ public static int setVariable(String varname, String varvalue) throws Exception private static void setConf(String varname, String key, String varvalue, boolean register) throws IllegalArgumentException { HiveConf conf = SessionState.get().getConf(); - String value = new VariableSubstitution().substitute(conf, varvalue); + String value = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(conf, varvalue); if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) { HiveConf.ConfVars confVars = HiveConf.getConfVars(key); if (confVars != null) { diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 33ee16b..937077c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -32,6 +32,8 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; @@ -40,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hadoop.hive.ql.session.SessionState; @@ -106,7 +107,11 @@ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { // For now, we disable the test attempts. driver.setTryCount(Integer.MAX_VALUE); - String subStatement = new VariableSubstitution().substitute(sqlOperationConf, statement); + String subStatement = new VariableSubstitution(new HiveVariableSource() { + @Override public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(sqlOperationConf, statement); response = driver.compileAndRespond(subStatement); if (0 != response.getResponseCode()) { throw toSQLException("Error while compiling statement", response);