Index: service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (.../code/hive-0.14.0.2.2.0.0/service/src/java/org/apache/hive/service/cli) (revision 2244) +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (.../branches/hive-0.14-hiveserver2/service/src/java/org/apache/hive/service/cli) (working copy) @@ -20,10 +20,15 @@ import java.io.IOException; import java.net.InetAddress; +import java.net.URL; import java.net.UnknownHostException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.regex.Matcher; import javax.security.auth.login.LoginException; @@ -41,6 +46,11 @@ import org.apache.thrift.TException; import org.apache.thrift.server.TServer; + +import java.util.Properties; +import java.io.InputStream; +import java.io.FileInputStream; +import java.io.File; /** * ThriftCLIService. * @@ -241,9 +251,11 @@ LOG.warn("Error opening session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); } + + return resp; } - + private String getIpAddress() { String clientIpAddress; // Http transport mode. @@ -391,7 +403,7 @@ } @Override - public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException { + public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException{ TExecuteStatementResp resp = new TExecuteStatementResp(); try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); Index: service/src/java/org/apache/hive/service/cli/CLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/CLIService.java (.../code/hive-0.14.0.2.2.0.0/service/src/java/org/apache/hive/service/cli) (revision 2244) +++ service/src/java/org/apache/hive/service/cli/CLIService.java (.../branches/hive-0.14-hiveserver2/service/src/java/org/apache/hive/service/cli) (working copy) @@ -18,13 +18,21 @@ package org.apache.hive.service.cli; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.Map.Entry; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.security.auth.login.LoginException; @@ -45,6 +53,8 @@ import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.cli.thrift.TProtocolVersion; import org.apache.hive.service.server.HiveServer2; +import org.apache.hive.service.cli.session.HiveSession; +import java.io.FileNotFoundException; /** * CLIService. @@ -137,7 +147,63 @@ public synchronized void stop() { super.stop(); } + + private void initCustomUDF(SessionHandle sessionHandle) throws HiveSQLException{ + if(hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_CUSTOMIZED_UDF_ENABLED)){ + String jarsPath = hiveConf.getVar(ConfVars.HIVE_SERVER2_CUSTOMIZED_UDF_JARS_PATH); + jarsPath = hiveConf.getVar(ConfVars.HADOOPFS)+jarsPath; + if(!jarsPath.endsWith("/")) + jarsPath += "/"; + + String hiveConfLocation =hiveConf.getHiveSiteLocation().getPath(); + String udfPropertyPath = hiveConfLocation.substring(0, hiveConfLocation.lastIndexOf('/')+1)+ hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_CUSTOMIZED_UDF_PROPERTIES); + InputStream inputStream = null; + try{ + inputStream = new FileInputStream(new File(udfPropertyPath)); + }catch(FileNotFoundException e){ + e.printStackTrace(); +// return null; + } + + Properties properties = new Properties(); + try{ + properties.load(inputStream); + }catch(IOException e){ + e.printStackTrace(); + } + Iterator itr = properties.entrySet().iterator(); + while(itr.hasNext()){ + Entry e = (Entry)itr.next(); + String func_name = e.getKey(); + LOG.debug("e.getKey() - "+func_name); + String jar_class = e.getValue(); + LOG.debug("e.getValue() - " + jar_class); + Pattern p = Pattern.compile("\\{(\\S+),(\\S+)\\}"); + Matcher m = p.matcher(jar_class); + if(m.matches()){ + String jar_name = m.group(1); + String class_name = m.group(2); + StringBuilder statement_pre = new StringBuilder(); + statement_pre.append(" CREATE TEMPORARY FUNCTION ").append(func_name).append(" AS \'") + .append(class_name).append("\' USING JAR \'").append(jarsPath).append(jar_name).append("\'"); + LOG.debug("statement_pre:"+statement_pre); + this.executeStatement(sessionHandle,statement_pre.toString(), null); + + }else{ + LOG.warn("bad format defination of udf!"); + + continue; + } + } + try{ + inputStream.close(); + }catch(IOException e){ + e.printStackTrace(); + } + } + } + /** * @deprecated Use {@link #openSession(TProtocolVersion, String, String, String, Map)} */ @@ -145,7 +211,8 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, String password, Map configuration) throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, null, configuration, false, null); - LOG.debug(sessionHandle + ": openSession()"); + LOG.debug(sessionHandle + ": openSession() "); + initCustomUDF(sessionHandle); return sessionHandle; } @@ -159,6 +226,7 @@ SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, null, configuration, true, delegationToken); LOG.debug(sessionHandle + ": openSessionWithImpersonation()"); + initCustomUDF(sessionHandle); return sessionHandle; } @@ -166,6 +234,7 @@ Map configuration) throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, ipAddress, configuration, false, null); LOG.debug(sessionHandle + ": openSession()"); + initCustomUDF(sessionHandle); return sessionHandle; } @@ -175,6 +244,7 @@ SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, ipAddress, configuration, true, delegationToken); LOG.debug(sessionHandle + ": openSession()"); + initCustomUDF(sessionHandle); return sessionHandle; } @@ -185,7 +255,8 @@ public SessionHandle openSession(String username, String password, Map configuration) throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, null, configuration, false, null); - LOG.debug(sessionHandle + ": openSession()"); + LOG.debug(sessionHandle + ": openSession() "); + initCustomUDF(sessionHandle); return sessionHandle; } @@ -197,7 +268,8 @@ String delegationToken) throws HiveSQLException { SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, null, configuration, true, delegationToken); - LOG.debug(sessionHandle + ": openSession()"); + LOG.debug(sessionHandle + ": openSession() "); + initCustomUDF(sessionHandle); return sessionHandle; } @@ -232,8 +304,8 @@ Map confOverlay) throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) - .executeStatement(statement, confOverlay); - LOG.debug(sessionHandle + ": executeStatement()"); + .executeStatement(statement, confOverlay); + LOG.debug(sessionHandle + ": executeStatement() "); return opHandle; } Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (.../code/hive-0.14.0.2.2.0.0/cli/src/java/org/apache/hadoop/hive/cli) (revision 2244) +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (.../branches/hive-0.14-hiveserver2/cli/src/java/org/apache/hadoop/hive/cli) (working copy) @@ -22,8 +22,10 @@ import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.io.UnsupportedEncodingException; @@ -30,8 +32,13 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import jline.ArgumentCompletor; import jline.ArgumentCompletor.AbstractArgumentDelimiter; @@ -728,14 +735,66 @@ // Execute -i init files (always in silent mode) cli.processInitFiles(ss); - + + // get prefix-statement for custom udf autoload + StringBuilder pre_execute_str = null; + if(HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_CUSTOMIZED_UDF_ENABLED)){ + String jarsPath = HiveConf.getVar(conf,HiveConf.ConfVars.HIVE_SERVER2_CUSTOMIZED_UDF_JARS_PATH); + jarsPath = HiveConf.getVar(conf,HiveConf.ConfVars.HADOOPFS)+jarsPath; + if(!jarsPath.endsWith("/")) + jarsPath += "/"; + + String hiveConfLocation =HiveConf.getHiveSiteLocation().getPath(); + String udfPropertyPath = hiveConfLocation.substring(0, hiveConfLocation.lastIndexOf('/')+1)+ HiveConf.getVar(conf,HiveConf.ConfVars.HIVE_SERVER2_CUSTOMIZED_UDF_PROPERTIES); + InputStream inputStream = null; + try{ + inputStream = new FileInputStream(new File(udfPropertyPath)); + }catch(FileNotFoundException e){ + e.printStackTrace(); + } + + Properties properties = new Properties(); + try{ + properties.load(inputStream); + }catch(IOException e){ + e.printStackTrace(); + } + pre_execute_str = new StringBuilder(); + Iterator itr = properties.entrySet().iterator(); + while(itr.hasNext()){ + Entry e = (Entry)itr.next(); + String func_name = e.getKey(); + String jar_class = e.getValue(); + Pattern p = Pattern.compile("\\{(\\S+),(\\S+)\\}"); + Matcher m = p.matcher(jar_class); + if(m.matches()){ + String jar_name = m.group(1); + String class_name = m.group(2); + + pre_execute_str.append(" CREATE TEMPORARY FUNCTION ").append(func_name).append(" AS \'") + .append(class_name).append("\' USING JAR \'").append(jarsPath).append(jar_name).append("\'").append(";"); + + }else{ + console.printError("bad format defination of udf: " + func_name + " = " + jar_class); + + continue; + } + } + try{ + inputStream.close(); + }catch(IOException e){ + e.printStackTrace(); + } + } + if (ss.execString != null) { - int cmdProcessStatus = cli.processLine(ss.execString); + int cmdProcessStatus = cli.processLine(pre_execute_str + ss.execString); return cmdProcessStatus; } try { if (ss.fileName != null) { + cli.processLine(pre_execute_str.toString()); return cli.processFile(ss.fileName); } } catch (FileNotFoundException e) { @@ -773,7 +832,11 @@ String curDB = getFormattedDb(conf, ss); String curPrompt = prompt + curDB; String dbSpaces = spacesForString(curDB); - + // load custom udf + ret = cli.processLine(pre_execute_str.toString(), true); + if(ret != 0){ + throw new Exception("load custom udf failed: "); + } while ((line = reader.readLine(curPrompt + "> ")) != null) { if (!prefix.equals("")) { prefix += '\n'; Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (.../code/hive-0.14.0.2.2.0.0/common/src/java/org/apache/hadoop/hive/conf) (revision 2244) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (.../branches/hive-0.14-hiveserver2/common/src/java/org/apache/hadoop/hive/conf) (working copy) @@ -324,11 +324,18 @@ "Make column names unique in the result set by qualifying column names with table alias if needed.\n" + "Table alias will be added to column names for queries of type \"select *\" or \n" + "if query explicitly uses table alias \"select r1.x..\"."), + + HIVE_SERVER2_CUSTOMIZED_UDF_ENABLED("hive.server2.customized.udf.enabled", false, + "Whether to enable HiveServer2 or the Hive CLI to add a customized UDF from a jar file, default value is false"), + + HIVE_SERVER2_CUSTOMIZED_UDF_JARS_PATH("hive.server2.customized.udf.jars.path","/tmp/hive/udf","The path on hdfs" + + " which contains customized UDF jar files. The value pattern is [hdfs://namenode:namenode-port]/path/to/UDF/jar." + + " When hdfs:// missing, means the hdfs path is on the default cluster."), - // Hadoop Configuration Properties - // Properties with null values are ignored and exist only for the purpose of giving us - // a symbolic name to reference in the Hive source code. Properties with non-null - // values will override any values set in the underlying Hadoop configuration. + HIVE_SERVER2_CUSTOMIZED_UDF_PROPERTIES("hive.server2.customized.udf.properties","hive-udf.properties", + "The customized udf functions' defination file. It's must be under $HIVE_HOME/conf directory. The patern is as below." + + "func_name={udf_jar_name,udf_class_name}"), + HADOOPBIN("hadoop.bin.path", findHadoopBinary(), "", true), HIVE_FS_HAR_IMPL("fs.har.impl", "org.apache.hadoop.hive.shims.HiveHarFileSystem", "The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop versions less than 0.20"),