diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bd90a30..50187ff 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -610,6 +610,13 @@
HIVE_ENTITY_SEPARATOR("hive.entity.separator", "@"),
+ HIVE_SERVER2_SERVERMODE("hive.server2.servermode", "thrift"), //thrift or http
+ HIVE_SERVER2_HTTP_PORT("hive.server2.http.port", 10000),
+ HIVE_SERVER2_HTTP_PATH1("hive.server2.http.path1", "servlets"),
+ HIVE_SERVER2_HTTP_PATH2("hive.server2.http.path2", "thrifths2"),
+ HIVE_SERVER2_HTTP_MIN_WORKER_THREADS("hive.server2.http.min.worker.threads", 5),
+ HIVE_SERVER2_HTTP_MAX_WORKER_THREADS("hive.server2.http.max.worker.threads", 100),
+
HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS("hive.server2.thrift.min.worker.threads", 5),
HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 100),
diff --git common/src/packages/win/template/conf/hive-site.xml common/src/packages/win/template/conf/hive-site.xml
index 4ea60c8..d1b227a 100644
--- common/src/packages/win/template/conf/hive-site.xml
+++ common/src/packages/win/template/conf/hive-site.xml
@@ -13,7 +13,7 @@
hive.metastore.local
- false
+ true
@@ -95,9 +95,33 @@
+ hive.server2.servermode
+ http
+ HiveServer server type: thrift or http
+
+
+
hive.server2.thrift.port
10001
HiveServer2 thrift port
+
+ hive.server2.http.port
+ 10001
+ HiveServer2 http port
+
+
+
+ hive.server2.http.min.worker.threads
+ 5
+ HiveServer2 http mode - min worker threads
+
+
+
+ hive.server2.http.max.worker.threads
+ 100
+ HiveServer2 http mode - max worker threads
+
+
diff --git jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index 929b295..8cd3ba5 100644
--- jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -41,6 +41,7 @@
import javax.security.sasl.SaslException;
+import org.apache.hadoop.hive.cli.HttpBasicAuthInterceptor;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.auth.KerberosSaslHelper;
import org.apache.hive.service.auth.PlainSaslHelper;
@@ -51,6 +52,7 @@
import org.apache.hive.service.cli.thrift.TOpenSessionResp;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
import org.apache.hive.service.cli.thrift.TSessionHandle;
+import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -71,6 +73,7 @@
private static final String HIVE_ANONYMOUS_USER = "anonymous";
private static final String HIVE_ANONYMOUS_PASSWD = "anonymous";
+
private TTransport transport;
private TCLIService.Iface client;
private boolean isClosed = true;
@@ -127,31 +130,57 @@ private void configureConnection(Utils.JdbcConnectionParams connParams)
private void openTransport(String uri, String host, int port, Map sessConf )
throws SQLException {
- transport = new TSocket(host, port);
-
- // handle secure connection if specified
- if (!sessConf.containsKey(HIVE_AUTH_TYPE)
- || !sessConf.get(HIVE_AUTH_TYPE).equals(HIVE_AUTH_SIMPLE)){
+
+ if(uri.startsWith(Utils.URL_PREFIX_HTTP) || uri.startsWith(Utils.URL_PREFIX_HTTPS)){
+ DefaultHttpClient httpClient = new DefaultHttpClient();
+ String httpUri = uri.substring(Utils.URL_PREFIX_COMMON.length());
+
+ String userName = sessConf.get(HIVE_AUTH_USER);
+ if ((userName == null) || userName.isEmpty()) {
+ userName = HIVE_ANONYMOUS_USER;
+ }
+ String passwd = sessConf.get(HIVE_AUTH_PASSWD);
+ if ((passwd == null) || passwd.isEmpty()) {
+ passwd = HIVE_ANONYMOUS_PASSWD;
+ }
+ httpClient.addRequestInterceptor(new HttpBasicAuthInterceptor(userName, passwd));
try {
- if (sessConf.containsKey(HIVE_AUTH_PRINCIPAL)) {
- transport = KerberosSaslHelper.getKerberosTransport(
- sessConf.get(HIVE_AUTH_PRINCIPAL), host, transport);
- } else {
- String userName = sessConf.get(HIVE_AUTH_USER);
- if ((userName == null) || userName.isEmpty()) {
- userName = HIVE_ANONYMOUS_USER;
- }
- String passwd = sessConf.get(HIVE_AUTH_PASSWD);
- if ((passwd == null) || passwd.isEmpty()) {
- passwd = HIVE_ANONYMOUS_PASSWD;
+ transport = new org.apache.thrift.transport.THttpClient(httpUri, httpClient);
+ }
+ catch (TTransportException tte){
+ throw new SQLException("Could not establish connection to " + uri + ". " + tte.getMessage(), " 08S01");
+ }
+ }
+ else if(uri.startsWith(Utils.URL_PREFIX_SOCKET)){
+ transport = new TSocket(host, port);
+
+ // handle secure connection if specified
+ if (!sessConf.containsKey(HIVE_AUTH_TYPE)
+ || !sessConf.get(HIVE_AUTH_TYPE).equals(HIVE_AUTH_SIMPLE)){
+ try {
+ if (sessConf.containsKey(HIVE_AUTH_PRINCIPAL)) {
+ transport = KerberosSaslHelper.getKerberosTransport(
+ sessConf.get(HIVE_AUTH_PRINCIPAL), host, transport);
+ } else {
+ String userName = sessConf.get(HIVE_AUTH_USER);
+ if ((userName == null) || userName.isEmpty()) {
+ userName = HIVE_ANONYMOUS_USER;
+ }
+ String passwd = sessConf.get(HIVE_AUTH_PASSWD);
+ if ((passwd == null) || passwd.isEmpty()) {
+ passwd = HIVE_ANONYMOUS_PASSWD;
+ }
+ transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport);
}
- transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport);
+ } catch (SaslException e) {
+ throw new SQLException("Could not establish secure connection to "
+ + uri + ": " + e.getMessage(), " 08S01");
}
- } catch (SaslException e) {
- throw new SQLException("Could not establish secure connection to "
- + uri + ": " + e.getMessage(), " 08S01");
}
}
+ else {
+ throw new SQLException("Unknown transport protocol prefix: " + uri);
+ }
TProtocol protocol = new TBinaryProtocol(transport);
client = new TCLIService.Client(protocol);
diff --git jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
index 6093070..70fc5aa 100644
--- jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
+++ jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
@@ -28,6 +28,7 @@
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import java.util.regex.Pattern;
+
/**
* HiveDriver.
*
@@ -48,11 +49,6 @@
private static final boolean JDBC_COMPLIANT = false;
/**
- * The required prefix for the connection URL.
- */
- private static final String URL_PREFIX = "jdbc:hive2://";
-
- /**
* If host is provided, without a port.
*/
private static final String DEFAULT_PORT = "10000";
@@ -97,7 +93,8 @@ public HiveDriver() {
*/
public boolean acceptsURL(String url) throws SQLException {
- return Pattern.matches(URL_PREFIX + ".*", url);
+
+ return Pattern.matches(Utils.URL_PREFIX_COMMON + ".*", url);
}
public Connection connect(String url, Properties info) throws SQLException {
@@ -166,12 +163,13 @@ public int getMinorVersion() {
return HiveDriver.getMinorDriverVersion();
}
+ //@@TODO -- thejas - do you know how this code gets used? It has its own parseURL & property extraction that differs from Utils.parseURL()
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
if (info == null) {
info = new Properties();
}
- if ((url != null) && url.startsWith(URL_PREFIX)) {
+ if ((url != null) && url.startsWith(Utils.URL_PREFIX_COMMON)) {
info = parseURL(url, info);
}
@@ -220,16 +218,16 @@ private Properties parseURL(String url, Properties defaults) throws SQLException
Properties urlProps = (defaults != null) ? new Properties(defaults)
: new Properties();
- if (url == null || !url.startsWith(URL_PREFIX)) {
+ if (url == null || !url.startsWith(Utils.URL_PREFIX_COMMON)) {
throw new SQLException("Invalid connection url: " + url);
}
- if (url.length() <= URL_PREFIX.length()) {
+ if (url.length() <= Utils.URL_PREFIX_COMMON.length()) {
return urlProps;
}
// [hostname]:[port]/[db_name]
- String connectionInfo = url.substring(URL_PREFIX.length());
+ String connectionInfo = url.substring(Utils.URL_PREFIX_COMMON.length());
// [hostname]:[port] [db_name]
String[] hostPortAndDatabase = connectionInfo.split("/", 2);
diff --git jdbc/src/java/org/apache/hive/jdbc/Utils.java jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 2dd2ac7..0c8f57f 100644
--- jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -33,7 +33,11 @@
/**
* The required prefix for the connection URL.
*/
- public static final String URL_PREFIX = "jdbc:hive2://";
+ public static final String URL_PREFIX_COMMON = "jdbc:hive2:";
+ public static final String URL_PREFIX_SOCKET = "jdbc:hive2://";
+ public static final String URL_PREFIX_HTTP = "jdbc:hive2:http://";
+ public static final String URL_PREFIX_HTTPS = "jdbc:hive2:https://";
+
/**
* If host is provided, without a port.
@@ -179,17 +183,22 @@ public static void verifySuccess(TStatus status, boolean withInfo) throws SQLExc
public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentException {
JdbcConnectionParams connParams = new JdbcConnectionParams();
- if (!uri.startsWith(URL_PREFIX)) {
+ if (! (uri.startsWith(URL_PREFIX_SOCKET) || uri.startsWith(URL_PREFIX_HTTP) || uri.startsWith(URL_PREFIX_HTTPS))) {
throw new IllegalArgumentException("Bad URL format");
}
// Don't parse URL with no other configuration.
- if (uri.equalsIgnoreCase(URL_PREFIX)) {
+ if (uri.equalsIgnoreCase(URL_PREFIX_SOCKET)) {
connParams.setEmbeddedMode(true);
return connParams;
}
- URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length()));
-
+ URI jdbcURI;
+ if (uri.startsWith(URL_PREFIX_HTTP) || uri.startsWith(URL_PREFIX_HTTPS)) {
+ jdbcURI = URI.create(uri.substring(URL_PREFIX_COMMON.length())); // leave only http://... or https://...
+ }
+ else {
+ jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length())); // leave only hive2://..
+ }
connParams.setHost(jdbcURI.getHost());
if (connParams.getHost() == null) {
connParams.setEmbeddedMode(true);
@@ -205,7 +214,7 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx
Pattern pattern = Pattern.compile("([^;]*)=([^;]*)[;]?");
// dbname and session settings
- String sessVars = jdbcURI.getPath();
+ String sessVars = jdbcURI.getPath(); // @@TODO:BUG HTTP connection strings are parsed as databasename=uriPathComponent. Q: how to specify path & database?
if ((sessVars == null) || sessVars.isEmpty()) {
connParams.setDbName(DEFAULT_DATABASE);
} else {
diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 95ebe02..82abb24 100644
--- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -26,6 +26,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.service.HttpServlet;
+import org.apache.hadoop.hive.service.ThriftHive;
+import org.apache.hadoop.hive.service.HiveServer.HiveServerHandler;
+import org.apache.hadoop.hive.service.ThriftHive.Iface;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.FetchOrientation;
@@ -39,13 +43,21 @@
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.TableSchema;
import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServlet;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.thread.QueuedThreadPool;
+import org.apache.hadoop.util.Shell;
/**
* SQLService.
@@ -362,48 +374,109 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
@Override
public void run() {
try {
- hiveAuthFactory = new HiveAuthFactory();
- TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
- TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
-
- String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
- if (portString != null) {
- portNum = Integer.valueOf(portString);
- } else {
- portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ String serverMode = System.getenv("HIVE_SERVER2_SERVERMODE");
+ if(serverMode == null){
+ serverMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_SERVERMODE);
}
- String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
- if (hiveHost == null) {
- hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
- }
+ if(serverMode.equals("thrift")){
+ hiveAuthFactory = new HiveAuthFactory();
+ TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+ TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
- if (hiveHost != null && !hiveHost.isEmpty()) {
- serverAddress = new InetSocketAddress(hiveHost, portNum);
- } else {
- serverAddress = new InetSocketAddress(portNum);
- }
+ String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ }
+
+ String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+ }
+ if (hiveHost != null && !hiveHost.isEmpty()) {
+ serverAddress = new InetSocketAddress(hiveHost, portNum);
+ } else {
+ serverAddress = new InetSocketAddress(portNum);
+ }
- minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
- maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+ minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+ maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
- TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new TServerSocket(serverAddress))
- .processorFactory(processorFactory)
- .transportFactory(transportFactory)
- .protocolFactory(new TBinaryProtocol.Factory())
- .minWorkerThreads(minWorkerThreads)
- .maxWorkerThreads(maxWorkerThreads);
- server = new TThreadPoolServer(sargs);
+ TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new TServerSocket(serverAddress))
+ .processorFactory(processorFactory)
+ .transportFactory(transportFactory)
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .minWorkerThreads(minWorkerThreads)
+ .maxWorkerThreads(maxWorkerThreads);
- LOG.info("ThriftSQLService listening on " + serverAddress);
+ server = new TThreadPoolServer(sargs);
- server.serve();
- } catch (Throwable t) {
+ LOG.info("ThriftSQLService (tcp thrift mode) listening on " + serverAddress);
+
+ server.serve();
+ }
+ if(serverMode.equals("http")){
+ // Configure Jetty to serve http requests
+ // Example of a client connection URI: http://localhost:10000/servlets/thrifths2/
+ // a gateway may cause actual target URI to differ, eg http://gateway:port/hive2/servlets/thrifths2/
+
+ String portString = System.getenv("HIVE_SERVER2_HTTP_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_HTTP_PORT);
+ }
+
+ minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_HTTP_MIN_WORKER_THREADS);
+ maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_HTTP_MAX_WORKER_THREADS);
+
+ String path1 = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_HTTP_PATH1);
+ if(!path1.startsWith("/")){
+ path1 = "/" + path1;
+ }
+ String path2 = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_HTTP_PATH2);
+ if(!path2.startsWith("/")){
+ path2 = "/" + path2;
+ }
+
+ org.mortbay.jetty.Server httpServer = new org.mortbay.jetty.Server();
+
+ QueuedThreadPool threadPool = new QueuedThreadPool();
+ threadPool.setMinThreads(minWorkerThreads);
+ threadPool.setMaxThreads(maxWorkerThreads);
+ httpServer.setThreadPool(threadPool);
+ SelectChannelConnector connector = new SelectChannelConnector();
+ connector.setPort(portNum);
+
+ connector.setReuseAddress(!Shell.WINDOWS); // Linux:yes, Windows:no
+ httpServer.addConnector(connector);
+
+ new org.apache.hive.service.cli.thrift.TCLIService.Processor(new EmbeddedThriftCLIService());
+ org.apache.hive.service.cli.thrift.TCLIService.Processor processor = new org.apache.hive.service.cli.thrift.TCLIService.Processor(new EmbeddedThriftCLIService());
+ TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+ TServlet thriftServlet = new HttpServlet(processor,protocolFactory);
+ final Context context = new Context(httpServer, path1, Context.SESSIONS);
+ context.addServlet(new ServletHolder(thriftServlet), path2);
+
+ //TODO: check defaults: maxTimeout,keepalive,maxBodySize,bodyRecieveDuration, etc.
+ httpServer.start();
+ String msg = "Starting HiveServer2 in Http mode on port " + portNum +
+ " path=" + path1 + path2 +
+ " with " + minWorkerThreads + ".." + maxWorkerThreads + " worker threads";
+ HiveServerHandler.LOG.info(msg);
+ httpServer.join();
+ }
+ else {
+ throw new Exception("unknown serverMode: " + serverMode);
+ }
+ }
+ catch (Throwable t) {
t.printStackTrace();
}
}
-
}
diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java
index 57bd346..6781eb0 100644
--- service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -82,6 +82,7 @@ public static void main(String[] args) {
System.exit(-1);
}
HiveConf hiveConf = new HiveConf();
+ //@@TODO: consider adding -hiveconf x=y settings to hiveConf
HiveServer2 server = new HiveServer2();
server.init(hiveConf);
server.start();