diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bd90a30..50187ff 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -610,6 +610,13 @@ HIVE_ENTITY_SEPARATOR("hive.entity.separator", "@"), + HIVE_SERVER2_SERVERMODE("hive.server2.servermode", "thrift"), //thrift or http + HIVE_SERVER2_HTTP_PORT("hive.server2.http.port", 10000), + HIVE_SERVER2_HTTP_PATH1("hive.server2.http.path1", "servlets"), + HIVE_SERVER2_HTTP_PATH2("hive.server2.http.path2", "thrifths2"), + HIVE_SERVER2_HTTP_MIN_WORKER_THREADS("hive.server2.http.min.worker.threads", 5), + HIVE_SERVER2_HTTP_MAX_WORKER_THREADS("hive.server2.http.max.worker.threads", 100), + HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS("hive.server2.thrift.min.worker.threads", 5), HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 100), diff --git common/src/packages/win/template/conf/hive-site.xml common/src/packages/win/template/conf/hive-site.xml index 4ea60c8..d1b227a 100644 --- common/src/packages/win/template/conf/hive-site.xml +++ common/src/packages/win/template/conf/hive-site.xml @@ -13,7 +13,7 @@ hive.metastore.local - false + true @@ -95,9 +95,33 @@ + hive.server2.servermode + http + HiveServer server type: thrift or http + + + hive.server2.thrift.port 10001 HiveServer2 thrift port + + hive.server2.http.port + 10001 + HiveServer2 http port + + + + hive.server2.http.min.worker.threads + 5 + HiveServer2 http mode - min worker threads + + + + hive.server2.http.max.worker.threads + 100 + HiveServer2 http mode - max worker threads + + diff --git jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 929b295..8cd3ba5 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -41,6 +41,7 @@ import javax.security.sasl.SaslException; +import org.apache.hadoop.hive.cli.HttpBasicAuthInterceptor; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.auth.KerberosSaslHelper; import org.apache.hive.service.auth.PlainSaslHelper; @@ -51,6 +52,7 @@ import org.apache.hive.service.cli.thrift.TOpenSessionResp; import org.apache.hive.service.cli.thrift.TProtocolVersion; import org.apache.hive.service.cli.thrift.TSessionHandle; +import org.apache.http.impl.client.DefaultHttpClient; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -71,6 +73,7 @@ private static final String HIVE_ANONYMOUS_USER = "anonymous"; private static final String HIVE_ANONYMOUS_PASSWD = "anonymous"; + private TTransport transport; private TCLIService.Iface client; private boolean isClosed = true; @@ -127,31 +130,57 @@ private void configureConnection(Utils.JdbcConnectionParams connParams) private void openTransport(String uri, String host, int port, Map sessConf ) throws SQLException { - transport = new TSocket(host, port); - - // handle secure connection if specified - if (!sessConf.containsKey(HIVE_AUTH_TYPE) - || !sessConf.get(HIVE_AUTH_TYPE).equals(HIVE_AUTH_SIMPLE)){ + + if(uri.startsWith(Utils.URL_PREFIX_HTTP) || uri.startsWith(Utils.URL_PREFIX_HTTPS)){ + DefaultHttpClient httpClient = new DefaultHttpClient(); + String httpUri = uri.substring(Utils.URL_PREFIX_COMMON.length()); + + String userName = sessConf.get(HIVE_AUTH_USER); + if ((userName == null) || userName.isEmpty()) { + userName = HIVE_ANONYMOUS_USER; + } + String passwd = sessConf.get(HIVE_AUTH_PASSWD); + if ((passwd == null) || passwd.isEmpty()) { + passwd = HIVE_ANONYMOUS_PASSWD; + } + httpClient.addRequestInterceptor(new HttpBasicAuthInterceptor(userName, passwd)); try { - if (sessConf.containsKey(HIVE_AUTH_PRINCIPAL)) { - transport = KerberosSaslHelper.getKerberosTransport( - sessConf.get(HIVE_AUTH_PRINCIPAL), host, transport); - } else { - String userName = sessConf.get(HIVE_AUTH_USER); - if ((userName == null) || userName.isEmpty()) { - userName = HIVE_ANONYMOUS_USER; - } - String passwd = sessConf.get(HIVE_AUTH_PASSWD); - if ((passwd == null) || passwd.isEmpty()) { - passwd = HIVE_ANONYMOUS_PASSWD; + transport = new org.apache.thrift.transport.THttpClient(httpUri, httpClient); + } + catch (TTransportException tte){ + throw new SQLException("Could not establish connection to " + uri + ". " + tte.getMessage(), " 08S01"); + } + } + else if(uri.startsWith(Utils.URL_PREFIX_SOCKET)){ + transport = new TSocket(host, port); + + // handle secure connection if specified + if (!sessConf.containsKey(HIVE_AUTH_TYPE) + || !sessConf.get(HIVE_AUTH_TYPE).equals(HIVE_AUTH_SIMPLE)){ + try { + if (sessConf.containsKey(HIVE_AUTH_PRINCIPAL)) { + transport = KerberosSaslHelper.getKerberosTransport( + sessConf.get(HIVE_AUTH_PRINCIPAL), host, transport); + } else { + String userName = sessConf.get(HIVE_AUTH_USER); + if ((userName == null) || userName.isEmpty()) { + userName = HIVE_ANONYMOUS_USER; + } + String passwd = sessConf.get(HIVE_AUTH_PASSWD); + if ((passwd == null) || passwd.isEmpty()) { + passwd = HIVE_ANONYMOUS_PASSWD; + } + transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport); } - transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport); + } catch (SaslException e) { + throw new SQLException("Could not establish secure connection to " + + uri + ": " + e.getMessage(), " 08S01"); } - } catch (SaslException e) { - throw new SQLException("Could not establish secure connection to " - + uri + ": " + e.getMessage(), " 08S01"); } } + else { + throw new SQLException("Unknown transport protocol prefix: " + uri); + } TProtocol protocol = new TBinaryProtocol(transport); client = new TCLIService.Client(protocol); diff --git jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java index 6093070..70fc5aa 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java @@ -28,6 +28,7 @@ import java.util.jar.Attributes; import java.util.jar.Manifest; import java.util.regex.Pattern; + /** * HiveDriver. * @@ -48,11 +49,6 @@ private static final boolean JDBC_COMPLIANT = false; /** - * The required prefix for the connection URL. - */ - private static final String URL_PREFIX = "jdbc:hive2://"; - - /** * If host is provided, without a port. */ private static final String DEFAULT_PORT = "10000"; @@ -97,7 +93,8 @@ public HiveDriver() { */ public boolean acceptsURL(String url) throws SQLException { - return Pattern.matches(URL_PREFIX + ".*", url); + + return Pattern.matches(Utils.URL_PREFIX_COMMON + ".*", url); } public Connection connect(String url, Properties info) throws SQLException { @@ -166,12 +163,13 @@ public int getMinorVersion() { return HiveDriver.getMinorDriverVersion(); } + //@@TODO -- thejas - do you know how this code gets used? It has its own parseURL & property extraction that differs from Utils.parseURL() public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { if (info == null) { info = new Properties(); } - if ((url != null) && url.startsWith(URL_PREFIX)) { + if ((url != null) && url.startsWith(Utils.URL_PREFIX_COMMON)) { info = parseURL(url, info); } @@ -220,16 +218,16 @@ private Properties parseURL(String url, Properties defaults) throws SQLException Properties urlProps = (defaults != null) ? new Properties(defaults) : new Properties(); - if (url == null || !url.startsWith(URL_PREFIX)) { + if (url == null || !url.startsWith(Utils.URL_PREFIX_COMMON)) { throw new SQLException("Invalid connection url: " + url); } - if (url.length() <= URL_PREFIX.length()) { + if (url.length() <= Utils.URL_PREFIX_COMMON.length()) { return urlProps; } // [hostname]:[port]/[db_name] - String connectionInfo = url.substring(URL_PREFIX.length()); + String connectionInfo = url.substring(Utils.URL_PREFIX_COMMON.length()); // [hostname]:[port] [db_name] String[] hostPortAndDatabase = connectionInfo.split("/", 2); diff --git jdbc/src/java/org/apache/hive/jdbc/Utils.java jdbc/src/java/org/apache/hive/jdbc/Utils.java index 2dd2ac7..0c8f57f 100644 --- jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -33,7 +33,11 @@ /** * The required prefix for the connection URL. */ - public static final String URL_PREFIX = "jdbc:hive2://"; + public static final String URL_PREFIX_COMMON = "jdbc:hive2:"; + public static final String URL_PREFIX_SOCKET = "jdbc:hive2://"; + public static final String URL_PREFIX_HTTP = "jdbc:hive2:http://"; + public static final String URL_PREFIX_HTTPS = "jdbc:hive2:https://"; + /** * If host is provided, without a port. @@ -179,17 +183,22 @@ public static void verifySuccess(TStatus status, boolean withInfo) throws SQLExc public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentException { JdbcConnectionParams connParams = new JdbcConnectionParams(); - if (!uri.startsWith(URL_PREFIX)) { + if (! (uri.startsWith(URL_PREFIX_SOCKET) || uri.startsWith(URL_PREFIX_HTTP) || uri.startsWith(URL_PREFIX_HTTPS))) { throw new IllegalArgumentException("Bad URL format"); } // Don't parse URL with no other configuration. - if (uri.equalsIgnoreCase(URL_PREFIX)) { + if (uri.equalsIgnoreCase(URL_PREFIX_SOCKET)) { connParams.setEmbeddedMode(true); return connParams; } - URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length())); - + URI jdbcURI; + if (uri.startsWith(URL_PREFIX_HTTP) || uri.startsWith(URL_PREFIX_HTTPS)) { + jdbcURI = URI.create(uri.substring(URL_PREFIX_COMMON.length())); // leave only http://... or https://... + } + else { + jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length())); // leave only hive2://.. + } connParams.setHost(jdbcURI.getHost()); if (connParams.getHost() == null) { connParams.setEmbeddedMode(true); @@ -205,7 +214,7 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx Pattern pattern = Pattern.compile("([^;]*)=([^;]*)[;]?"); // dbname and session settings - String sessVars = jdbcURI.getPath(); + String sessVars = jdbcURI.getPath(); // @@TODO:BUG HTTP connection strings are parsed as databasename=uriPathComponent. Q: how to specify path & database? if ((sessVars == null) || sessVars.isEmpty()) { connParams.setDbName(DEFAULT_DATABASE); } else { diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 95ebe02..82abb24 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -26,6 +26,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.service.HttpServlet; +import org.apache.hadoop.hive.service.ThriftHive; +import org.apache.hadoop.hive.service.HiveServer.HiveServerHandler; +import org.apache.hadoop.hive.service.ThriftHive.Iface; import org.apache.hive.service.AbstractService; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.FetchOrientation; @@ -39,13 +43,21 @@ import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TServlet; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportFactory; +import org.mortbay.jetty.nio.SelectChannelConnector; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.ServletHolder; +import org.mortbay.thread.QueuedThreadPool; +import org.apache.hadoop.util.Shell; /** * SQLService. @@ -362,48 +374,109 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { @Override public void run() { try { - hiveAuthFactory = new HiveAuthFactory(); - TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); - TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); - - String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT"); - if (portString != null) { - portNum = Integer.valueOf(portString); - } else { - portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); + String serverMode = System.getenv("HIVE_SERVER2_SERVERMODE"); + if(serverMode == null){ + serverMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_SERVERMODE); } - String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); - if (hiveHost == null) { - hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); - } + if(serverMode.equals("thrift")){ + hiveAuthFactory = new HiveAuthFactory(); + TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); + TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); - if (hiveHost != null && !hiveHost.isEmpty()) { - serverAddress = new InetSocketAddress(hiveHost, portNum); - } else { - serverAddress = new InetSocketAddress(portNum); - } + String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT"); + if (portString != null) { + portNum = Integer.valueOf(portString); + } else { + portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); + } + + String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); + if (hiveHost == null) { + hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); + } + if (hiveHost != null && !hiveHost.isEmpty()) { + serverAddress = new InetSocketAddress(hiveHost, portNum); + } else { + serverAddress = new InetSocketAddress(portNum); + } - minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); - maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); + minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); + maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); - TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new TServerSocket(serverAddress)) - .processorFactory(processorFactory) - .transportFactory(transportFactory) - .protocolFactory(new TBinaryProtocol.Factory()) - .minWorkerThreads(minWorkerThreads) - .maxWorkerThreads(maxWorkerThreads); - server = new TThreadPoolServer(sargs); + TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new TServerSocket(serverAddress)) + .processorFactory(processorFactory) + .transportFactory(transportFactory) + .protocolFactory(new TBinaryProtocol.Factory()) + .minWorkerThreads(minWorkerThreads) + .maxWorkerThreads(maxWorkerThreads); - LOG.info("ThriftSQLService listening on " + serverAddress); + server = new TThreadPoolServer(sargs); - server.serve(); - } catch (Throwable t) { + LOG.info("ThriftSQLService (tcp thrift mode) listening on " + serverAddress); + + server.serve(); + } + if(serverMode.equals("http")){ + // Configure Jetty to serve http requests + // Example of a client connection URI: http://localhost:10000/servlets/thrifths2/ + // a gateway may cause actual target URI to differ, eg http://gateway:port/hive2/servlets/thrifths2/ + + String portString = System.getenv("HIVE_SERVER2_HTTP_PORT"); + if (portString != null) { + portNum = Integer.valueOf(portString); + } else { + portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_HTTP_PORT); + } + + minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_HTTP_MIN_WORKER_THREADS); + maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_HTTP_MAX_WORKER_THREADS); + + String path1 = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_HTTP_PATH1); + if(!path1.startsWith("/")){ + path1 = "/" + path1; + } + String path2 = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_HTTP_PATH2); + if(!path2.startsWith("/")){ + path2 = "/" + path2; + } + + org.mortbay.jetty.Server httpServer = new org.mortbay.jetty.Server(); + + QueuedThreadPool threadPool = new QueuedThreadPool(); + threadPool.setMinThreads(minWorkerThreads); + threadPool.setMaxThreads(maxWorkerThreads); + httpServer.setThreadPool(threadPool); + SelectChannelConnector connector = new SelectChannelConnector(); + connector.setPort(portNum); + + connector.setReuseAddress(!Shell.WINDOWS); // Linux:yes, Windows:no + httpServer.addConnector(connector); + + new org.apache.hive.service.cli.thrift.TCLIService.Processor(new EmbeddedThriftCLIService()); + org.apache.hive.service.cli.thrift.TCLIService.Processor processor = new org.apache.hive.service.cli.thrift.TCLIService.Processor(new EmbeddedThriftCLIService()); + TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); + TServlet thriftServlet = new HttpServlet(processor,protocolFactory); + final Context context = new Context(httpServer, path1, Context.SESSIONS); + context.addServlet(new ServletHolder(thriftServlet), path2); + + //TODO: check defaults: maxTimeout,keepalive,maxBodySize,bodyRecieveDuration, etc. + httpServer.start(); + String msg = "Starting HiveServer2 in Http mode on port " + portNum + + " path=" + path1 + path2 + + " with " + minWorkerThreads + ".." + maxWorkerThreads + " worker threads"; + HiveServerHandler.LOG.info(msg); + httpServer.join(); + } + else { + throw new Exception("unknown serverMode: " + serverMode); + } + } + catch (Throwable t) { t.printStackTrace(); } } - } diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 57bd346..6781eb0 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -82,6 +82,7 @@ public static void main(String[] args) { System.exit(-1); } HiveConf hiveConf = new HiveConf(); + //@@TODO: consider adding -hiveconf x=y settings to hiveConf HiveServer2 server = new HiveServer2(); server.init(hiveConf); server.start();