diff --git a/service/pom.xml b/service/pom.xml index c73a621b04..a75021ebeb 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -300,6 +300,18 @@ ${apache-directory-server.version} test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito + ${powermock.version} + test + diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2f3767fdee..5d81668441 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -106,6 +106,7 @@ import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; import org.apache.hive.service.servlet.HS2LeadershipStatus; import org.apache.hive.service.servlet.HS2Peers; +import org.apache.hive.service.servlet.QueriesRESTfulAPIServlet; import org.apache.hive.service.servlet.QueryProfileServlet; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; @@ -170,6 +171,11 @@ public HiveServer2(PamAuthenticator pamAuthenticator) { this.pamAuthenticator = pamAuthenticator; } + @VisibleForTesting + public CLIService getCliService() { + return this.cliService; + } + @VisibleForTesting public void setPamAuthenticator(PamAuthenticator pamAuthenticator) { this.pamAuthenticator = pamAuthenticator; @@ -403,6 +409,7 @@ public void run() { webServer = builder.build(); webServer.addServlet("query_page", "/query_page", QueryProfileServlet.class); + webServer.addServlet("api", "/api/*", QueriesRESTfulAPIServlet.class); } } } catch (IOException ie) { diff --git a/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java b/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java new file mode 100644 index 0000000000..990ec6ef2e --- /dev/null +++ b/service/src/java/org/apache/hive/service/servlet/QueriesRESTfulAPIServlet.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.servlet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.QueryInfo; +import org.apache.hive.service.cli.operation.OperationManager; +import org.apache.hive.service.cli.session.HiveSession; +import org.apache.hive.service.cli.session.SessionManager; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.JsonSerializer; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializerProvider; +import org.codehaus.jackson.map.module.SimpleModule; + +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; + +/** + * QueriesRESTfulAPIServlet. + * + */ +public class QueriesRESTfulAPIServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(QueriesRESTfulAPIServlet.class); + + private static final String API_V1 = "v1"; + private static final String REQ_QUERIES = "queries"; + private static final String REQ_SESSIONS = "sessions"; + private static final String REQ_ACTIVE = "active"; + private static final String REQ_HISTORICAL = "historical"; + + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + /* + Available endpoints are: + - /v1/queries/active + - /v1/queries/historical + - /v1/sessions + */ + + String pathInfo = request.getPathInfo(); + if (pathInfo == null || "/".equals(pathInfo)) { + sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Path to the endpoint is missing"); + return; + } + + + String[] splits = pathInfo.split("/"); + if (splits.length < 3) { //expecting at least 2 parts in the path + sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Expecting at least 2 parts in the path"); + return; + } + + ServletContext ctx = getServletContext(); + SessionManager sessionManager = + (SessionManager) ctx.getAttribute("hive.sm"); + OperationManager operationManager = sessionManager.getOperationManager(); + + String apiVersion = splits[1]; + if (apiVersion.equals(API_V1)) { + String reqType = splits[2]; + if (reqType.equals(REQ_QUERIES)) { + if (splits.length != 4) { + sendError(response, HttpServletResponse.SC_NOT_FOUND, + "Expecting 3 parts in the path: /v1/queries/active or /v1/queries/historical"); + return; + } + String queriesType = splits[3]; + if (queriesType.equals(REQ_ACTIVE)) { + Collection operations = operationManager.getLiveQueryInfos(); + LOG.info("Returning active SQL operations via the RESTful API"); + sendAsJson(response, operations); + } else if (queriesType.equals(REQ_HISTORICAL)) { + Collection operations = operationManager.getHistoricalQueryInfos(); + LOG.info("Returning historical SQL operations via the RESTful API"); + sendAsJson(response, operations); + } else { + sendError(response, HttpServletResponse.SC_BAD_REQUEST, "Unknown query type: " + queriesType); + return; + } + } else if (reqType.equals(REQ_SESSIONS)) { + Collection hiveSessions = sessionManager.getSessions(); + LOG.info("Returning active sessions via the RESTful API"); + sendAsJson(response, hiveSessions); + } else { // unrecognized request + sendError(response, HttpServletResponse.SC_NOT_FOUND, "Unknown request type: " + reqType); + return; + } + } else { // unrecognized API version + sendError(response, HttpServletResponse.SC_BAD_REQUEST, "This server only handles API v1"); + return; + } + } + + private void sendError(HttpServletResponse response, + Integer errorCode, + String message) { + response.setStatus(errorCode); + response.setContentType("application/json"); + response.setCharacterEncoding("UTF-8"); + try { + response.getWriter().write("{\"message\" : " + message + "}"); + } catch (IOException e) { + LOG.error("Caught an exception while writing an HTTP error status", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } + + private void sendAsJson( + HttpServletResponse response, + Object obj) { + response.setContentType("application/json"); + response.setStatus(HttpServletResponse.SC_OK); + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule("CustomSessionModule", new Version(1, 0, 0, null)); + module.addSerializer(HiveSession.class, new HiveSessionSerializer()); + mapper.registerModule(module); + + try { + PrintWriter out = response.getWriter(); + String objectAsJson = mapper.writeValueAsString(obj); + out.print(objectAsJson); + out.flush(); + out.close(); + } catch (IOException e) { + LOG.error("Caught an exception while writing an HTTP response", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } + + private static class HiveSessionSerializer extends JsonSerializer { + @Override + public void serialize( + HiveSession hiveSession, + JsonGenerator jgen, + SerializerProvider serializerProvider) + throws IOException, JsonProcessingException { + long currentTime = System.currentTimeMillis(); + + jgen.writeStartObject(); + jgen.writeStringField("sessionId", hiveSession.getSessionHandle().getSessionId().toString()); + jgen.writeStringField("username", hiveSession.getUserName()); + jgen.writeStringField("ipAddress", hiveSession.getIpAddress()); + jgen.writeNumberField("operationCount", hiveSession.getOpenOperationCount()); + jgen.writeNumberField("activeTime", (currentTime - hiveSession.getCreationTime()) / 1000); + jgen.writeNumberField("idleTime", (currentTime - hiveSession.getLastAccessTime()) / 1000); + jgen.writeEndObject(); + } + } +} diff --git a/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java b/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java index bb6a2313f2..3047443aeb 100644 --- a/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java +++ b/service/src/test/org/apache/hive/service/server/TestHS2HttpServer.java @@ -18,52 +18,65 @@ package org.apache.hive.service.server; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.StringWriter; -import java.net.HttpURLConnection; -import java.net.URL; - +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertNotNull; - +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.SessionHandle; +import org.apache.hive.service.cli.session.SessionManager; +import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; /** - * TestHS2HttpServer -- executes tests of HiveServer2 HTTP Server + * TestHS2HttpServer -- executes tests of HiveServer2 HTTP Server. */ public class TestHS2HttpServer { private static HiveServer2 hiveServer2 = null; + private static CLIService client = null; + private static SessionManager sm = null; private static HiveConf hiveConf = null; private static String metastorePasswd = "61ecbc41cdae3e6b32712a06c73606fa"; //random md5 private static Integer webUIPort = null; + private static String apiBaseURL = null; + @BeforeClass public static void beforeTests() throws Exception { webUIPort = MetaStoreTestUtils.findFreePortExcepting( Integer.valueOf(ConfVars.HIVE_SERVER2_WEBUI_PORT.getDefaultValue())); + apiBaseURL = "http://localhost:" + webUIPort + "/api/v1"; hiveConf = new HiveConf(); hiveConf.set(ConfVars.METASTOREPWD.varname, metastorePasswd); hiveConf.set(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, webUIPort.toString()); - hiveConf - .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); Exception hs2Exception = null; boolean hs2Started = false; @@ -72,6 +85,7 @@ public static void beforeTests() throws Exception { hiveServer2 = new HiveServer2(); hiveServer2.init(hiveConf); hiveServer2.start(); + client = hiveServer2.getCliService(); Thread.sleep(5000); hs2Started = true; break; @@ -85,10 +99,10 @@ public static void beforeTests() throws Exception { webUIPort = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT); } } - if (!hs2Started) { - throw(hs2Exception); + throw (hs2Exception); } + sm = hiveServer2.getCliService().getSessionManager(); } @Test @@ -122,6 +136,121 @@ public void testBaseUrlResponseHeader() throws Exception{ assertNotNull(xContentTypeHeader); } + private BufferedReader getReaderForUrl(String urlString) throws Exception { + URL url = new URL(urlString); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + BufferedReader reader = + new BufferedReader(new InputStreamReader(conn.getInputStream())); + return reader; + } + + private String readFromUrl(String urlString) throws Exception { + BufferedReader reader = getReaderForUrl(urlString); + StringBuilder response = new StringBuilder(); + String inputLine; + + while ((inputLine = reader.readLine()) != null) { + response.append(inputLine); + } + reader.close(); + return response.toString(); + } + + private static List getListOfNodes(String json) throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode rootNode = objectMapper.readTree(json); + + ArrayList nodes = new ArrayList<>(); + if (rootNode.isArray()) { + for (final JsonNode node : rootNode) { + nodes.add(node); + } + } + return nodes; + } + + @Test + public void testApiServletHistoricalQueries() throws Exception { + String historicalQueriesRoute = "/queries/historical"; + + final SessionHandle handle = + sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1", + new HashMap()); + + String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + + " = false"; + OperationHandle opHandle = client.executeStatement(handle, queryString, new HashMap()); + client.closeOperation(opHandle); + + opHandle = client.executeStatement(handle, "SELECT 1", new HashMap()); + client.closeOperation(opHandle); + + String queriesResponse = readFromUrl(apiBaseURL + historicalQueriesRoute); + List historicalQueries = getListOfNodes(queriesResponse); + Assert.assertTrue(historicalQueries.size() == 1); + + JsonNode historicalQuery = historicalQueries.get(0); + Assert.assertEquals(historicalQuery.path("running").asBoolean(), false); + Assert.assertEquals(historicalQuery.path("state").asText(), "FINISHED"); + Assert.assertTrue(historicalQuery.path("runtime").canConvertToInt()); + Assert.assertTrue(historicalQuery.path("queryDisplay").isObject()); + } + + @Test + public void testApiServletActiveSessions() throws Exception { + String sessionsRoute = "/sessions"; + + String initNoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute); + Assert.assertTrue("[]".equals(initNoSessionsResponse)); + + SessionHandle handle1 = + sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1", + new HashMap()); + + String oneSessionResponse = readFromUrl(apiBaseURL + sessionsRoute); + + List sessionNodes = getListOfNodes(oneSessionResponse); + Assert.assertEquals(sessionNodes.size(), 1); + + JsonNode session = sessionNodes.get(0); + Assert.assertEquals(session.path("sessionId").asText(), handle1.getSessionId().toString()); + Assert.assertEquals(session.path("username").asText(), "user"); + Assert.assertEquals(session.path("ipAddress").asText(), "127.0.0.1"); + Assert.assertEquals(session.path("operationCount").asInt(), 0); + Assert.assertTrue(session.path("activeTime").canConvertToInt()); + Assert.assertTrue(session.path("idleTime").canConvertToInt()); + + SessionHandle handle2 = sm.openSession(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9, "user", "passw", "127.0.0.1", + new HashMap()); + + String twoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute); + List twoSessionsNodes = getListOfNodes(twoSessionsResponse); + Assert.assertEquals(twoSessionsNodes.size(), 2); + + sm.closeSession(handle1); + sm.closeSession(handle2); + + String endNoSessionsResponse = readFromUrl(apiBaseURL + sessionsRoute); + Assert.assertTrue("[]".equals(endNoSessionsResponse)); + } + + @Test + public void testWrongApiVersion() throws Exception { + String wrongApiVersionUrl = "http://localhost:" + webUIPort + "/api/v2"; + URL url = new URL(wrongApiVersionUrl); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + Assert.assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); + } + + @Test + public void testWrongRoute() throws Exception { + String wrongRouteUrl = "http://localhost:" + webUIPort + "/api/v1/nonexistingRoute"; + URL url = new URL(wrongRouteUrl); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + Assert.assertEquals(HttpURLConnection.HTTP_NOT_FOUND, conn.getResponseCode()); + } + @Test public void testContextRootUrlRewrite() throws Exception { String datePattern = "[a-zA-Z]{3} [a-zA-Z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}"; @@ -145,18 +274,18 @@ public void testConfStrippedFromWebUI() throws Exception { CloseableHttpClient httpclient = null; try { httpclient = HttpClients.createDefault(); - HttpGet httpGet = new HttpGet("http://localhost:"+webUIPort+"/conf"); + HttpGet httpGet = new HttpGet("http://localhost:" + webUIPort + "/conf"); CloseableHttpResponse response1 = httpclient.execute(httpGet); try { HttpEntity entity1 = response1.getEntity(); BufferedReader br = new BufferedReader(new InputStreamReader(entity1.getContent())); String line; - while ((line = br.readLine())!= null) { - if (line.contains(metastorePasswd)){ + while ((line = br.readLine()) != null) { + if (line.contains(metastorePasswd)) { pwdValFound = line; } - if (line.contains(ConfVars.METASTOREPWD.varname)){ + if (line.contains(ConfVars.METASTOREPWD.varname)) { pwdKeyFound = line; } } @@ -165,7 +294,7 @@ public void testConfStrippedFromWebUI() throws Exception { response1.close(); } } finally { - if (httpclient != null){ + if (httpclient != null) { httpclient.close(); } }