diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 34df01e60e..154c6678da 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2691,6 +2691,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." + "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " + "system property (note the camelCase)."), + HIVE_ZOOKEEPER_KILLQUERY_ENABLE("hive.zookeeper.killquery.enable", true, + "Whether enabled kill query coordination with zookeeper, " + + "when hive.server2.support.dynamic.service.discovery is enabled."), + HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE("hive.zookeeper.killquery.namespace", "killQueries", + "When kill query coordination is enabled, uses this namespace for registering queries to kill with zookeeper"), // Transactions HIVE_TXN_MANAGER("hive.txn.manager", diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 3973ec9270..ff429a385e 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -784,6 +784,7 @@ public void run() { con2.close(); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index 68a515ccbe..9441b47089 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -412,6 +412,7 @@ public void testKillQueryById() throws Exception { testKillQueryInternal(System.getProperty("user.name"), System.getProperty("user.name"), false, tExecuteHolder, tKillHolder); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } @@ -431,6 +432,7 @@ public void testKillQueryByTagAdmin() throws Exception { ExceptionHolder tKillHolder = new ExceptionHolder(); testKillQueryInternal("user1", System.getProperty("user.name"), true, tExecuteHolder, tKillHolder); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } @@ -440,6 +442,7 @@ public void testKillQueryByTagOwner() throws Exception { ExceptionHolder tKillHolder = new ExceptionHolder(); testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java new file mode 100644 index 0000000000..f4ead9ebc5 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +/** + * Test JDBC driver when two HS2 instance is running with service discovery enabled. + */ +public class TestJdbcWithServiceDiscovery { + + private static final Logger LOG = LoggerFactory.getLogger(TestJdbcWithServiceDiscovery.class); + private static final String tableName = "testJdbcMinihs2Tbl"; + private static final String testDbName = "testJdbcMinihs2"; + + private static TestingServer zkServer; + private static MiniHS2 miniHS2_1; + private static MiniHS2 miniHS2_2; + private static String miniHS2_1_directUrl; + private static String miniHS2_2_directUrl; + private static Path kvDataFilePath; + + @BeforeClass public static void setup() throws Exception { + MiniHS2.cleanupLocalDir(); + zkServer = new TestingServer(); + + // Create one MiniHS2 with Tez and one with Local FS only + HiveConf hiveConf1 = getTezConf(); + HiveConf hiveConf2 = new HiveConf(); + + setSDConfigs(hiveConf1); + setSDConfigs(hiveConf2); + + miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).withMiniTez().build(); + miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build(); + + Class.forName(MiniHS2.getJdbcDriverName()); + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.start(getConfOverlay(instanceId1)); + miniHS2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + "/" + testDbName; + String instanceId2 = UUID.randomUUID().toString(); + miniHS2_2.start(getConfOverlay(instanceId2)); + miniHS2_2_directUrl = "jdbc:hive2://" + miniHS2_2.getHost() + ":" + miniHS2_2.getBinaryPort() + "/" + testDbName; + + String dataFileDir = hiveConf1.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + setupDb(); + } + + /** + * SleepMsUDF + */ + public static class SleepMsUDF extends UDF { + public Integer evaluate(int value, int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // No-op + } + return value; + } + } + + public static void setupDb() throws Exception { + Connection conDefault = DriverManager + .getConnection("jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + "/default", + System.getProperty("user.name"), "bar"); + Statement stmt = conDefault.createStatement(); + String tblName = testDbName + "." + tableName; + String udfName = SleepMsUDF.class.getName(); + stmt.execute("drop database if exists " + testDbName + " cascade"); + stmt.execute("create database " + testDbName); + stmt.execute("use " + testDbName); + stmt.execute("create table " + tblName + " (int_col int, value string) "); + stmt.execute("load data local inpath '" + kvDataFilePath.toString() + "' into table " + tblName); + stmt.execute("grant select on table " + tblName + " to role public"); + + stmt.close(); + conDefault.close(); + } + + @AfterClass public static void afterTest() throws Exception { + if ((miniHS2_1 != null) && miniHS2_1.isStarted()) { + try { + miniHS2_1.stop(); + } catch (Exception e) { + LOG.warn("Error why shutting down Hs2", e); + } + } + if ((miniHS2_2 != null) && miniHS2_2.isStarted()) { + try { + miniHS2_2.stop(); + } catch (Exception e) { + LOG.warn("Error why shutting down Hs2", e); + } + } + if (zkServer != null) { + zkServer.close(); + zkServer = null; + } + MiniHS2.cleanupLocalDir(); + } + + + private static HiveConf getTezConf() throws Exception { + String confDir = "../../data/conf/tez/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + HiveConf defaultConf = new HiveConf(); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + return defaultConf; + } + + private static void setSDConfigs(HiveConf conf) { + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, true); + conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE, false); + conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, 2, TimeUnit.SECONDS); + conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 100, TimeUnit.MILLISECONDS); + conf.setIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES, 1); + conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE, true); + } + + private static Map getConfOverlay(final String instanceId) { + Map confOverlay = new HashMap<>(); + confOverlay.put("hive.server2.zookeeper.publish.configs", "true"); + confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId); + return confOverlay; + } + + private static class ExceptionHolder { + Throwable throwable; + } + + private void executeQueryAndKill(Connection con1, Connection con2, ExceptionHolder tExecuteHolder, + ExceptionHolder tKillHolder) throws SQLException, InterruptedException { + final HiveStatement stmt = (HiveStatement) con1.createStatement(); + final Statement stmt2 = con2.createStatement(); + final StringBuffer stmtQueryId = new StringBuffer(); + + // Thread executing the query + Thread tExecute = new Thread(new Runnable() { + @Override public void run() { + try { + LOG.info("Executing waiting query."); + // The test table has 500 rows, so total query time should be ~ 500*500ms + stmt.executeAsync( + "select sleepMsUDF(t1.int_col, 10), t1.int_col, t2.int_col " + "from " + tableName + " t1 join " + + tableName + " t2 on t1.int_col = t2.int_col"); + stmtQueryId.append(stmt.getQueryId()); + stmt.getUpdateCount(); + } catch (SQLException e) { + tExecuteHolder.throwable = e; + } + } + }); + + tExecute.start(); + + // wait for other thread to create the stmt handle + int count = 0; + while (count < 10) { + try { + Thread.sleep(2000); + String queryId; + if (stmtQueryId.length() != 0) { + queryId = stmtQueryId.toString(); + } else { + count++; + continue; + } + + LOG.info("Killing query: " + queryId); + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + break; + } catch (SQLException e) { + LOG.warn("Exception when kill query", e); + tKillHolder.throwable = e; + break; + } + } + + tExecute.join(); + try { + stmt.close(); + con1.close(); + con2.close(); + } catch (Exception e) { + LOG.warn("Exception when close stmt and con", e); + } + } + + @Test public void testKillQueryWithSameServer() throws Exception { + Connection con1 = DriverManager.getConnection(miniHS2_1_directUrl, System.getProperty("user.name"), "bar"); + Connection con2 = DriverManager.getConnection(miniHS2_1_directUrl, System.getProperty("user.name"), "bar"); + + Statement stmt = con1.createStatement(); + stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'"); + stmt.close(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage()); + assertNull("tCancel", tKillHolder.throwable); + } + + @Test public void testKillQueryWithDifferentServer() throws Exception { + Connection con1 = DriverManager.getConnection(miniHS2_1_directUrl, System.getProperty("user.name"), "bar"); + Connection con2 = DriverManager.getConnection(miniHS2_2_directUrl, System.getProperty("user.name"), "bar"); + + Statement stmt = con1.createStatement(); + stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'"); + stmt.close(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage()); + assertNull("tCancel", tKillHolder.throwable); + } + + @Test public void testKillQueryWithDifferentServerZKTurnedOff() throws Exception { + Connection con1 = DriverManager.getConnection(miniHS2_1_directUrl, System.getProperty("user.name"), "bar"); + Connection con2 = DriverManager.getConnection(miniHS2_2_directUrl, System.getProperty("user.name"), "bar"); + + Statement stmt = con1.createStatement(); + stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'"); + stmt.close(); + + stmt = con2.createStatement(); + stmt.execute("set hive.zookeeper.killquery.enable = false"); + stmt.close(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder); + + assertNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } +} diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java new file mode 100644 index 0000000000..7b9b08d815 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.server; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link KillQueryZookeeperManager}. + */ +public class TestKillQueryZookeeperManager { + + private static final Logger LOG = LoggerFactory.getLogger(TestKillQueryZookeeperManager.class); + private final String BARRIER_ROOT_PATH = "/killqueries"; + private final String QUERYID = "QUERY1"; + private final String SERVER1 = "localhost:1234"; + private final String SERVER2 = "localhost:1235"; + private final String USER = "user"; + private final int TIMEOUT = 1000; + + TestingServer server; + + @Before public void setupZookeeper() throws Exception { + server = new TestingServer(); + } + + @After public void shutdown() { + if (server != null) { + CloseableUtils.closeQuietly(server); + } + } + + private CuratorFramework getClient() { + return CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(TIMEOUT * 100) + .connectionTimeoutMs(TIMEOUT).retryPolicy(new RetryOneTime(1)).build(); + } + + @Test public void testBarrierServerCrash() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + final ExecutorService service = Executors.newSingleThreadExecutor(); + Future future = service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + server.stop(); + return null; + }); + + barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS); + future.get(); + Assert.fail(); + } catch (KeeperException.ConnectionLossException expected) { + // expected + } + } + + @Test public void testNoBarrier() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + IllegalStateException result = null; + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + try { + barrier.confirmProgress(SERVER1); + } catch (IllegalStateException e) { + result = e; + } + Assert.assertNotNull(result); + Assert.assertEquals("Barrier is not initialised", result.getMessage()); + } + } + + @Test public void testNo() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + barrier.confirmNo(SERVER2); + return null; + }); + + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } + + @Test public void testDone() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + try { + barrier.confirmProgress(SERVER2); + Thread.sleep(TIMEOUT / 2); + barrier.confirmDone(SERVER2); + } catch (Exception e) { + LOG.error("Confirmation error", e); + } + return null; + }); + + Assert.assertTrue(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS)); + } + } + + @Test public void testFailed() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + barrier.confirmProgress(SERVER2); + Thread.sleep(TIMEOUT / 2); + barrier.confirmFailed(SERVER2); + return null; + }); + + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } + + @Test public void testConfirmTimeout() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } + + @Test public void testKillTimeout() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + barrier.confirmProgress(SERVER2); + // server died + return null; + }); + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } +} diff --git itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 1b60a51ebd..0567c7264e 100644 --- itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -114,6 +114,10 @@ public Builder withMiniMR() { this.miniClusterType = MiniClusterType.MR; return this; } + public Builder withMiniTez() { + this.miniClusterType = MiniClusterType.TEZ; + return this; + } public Builder withMiniKdc(String serverPrincipal, String serverKeytab) { this.useMiniKdc = true; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 8becef1cd3..5b3fa8fed2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -842,8 +842,11 @@ private void addJarLRByClassName(String className, final Map clazz, final Map lrMap) throws IOException, LoginException { - final File jar = - new File(Utilities.jarFinderGetJar(clazz)); + String jarPath = Utilities.jarFinderGetJar(clazz); + if (jarPath == null) { + throw new IOException("Can't find jar for: " + clazz); + } + final File jar = new File(jarPath); final String localJarPath = jar.toURI().toURL().toExternalForm(); final LocalResource jarLr = createJarLocalResource(localJarPath); lrMap.put(DagUtils.getBaseName(jarLr), jarLr); diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 9e497545b5..9c7ee54ed2 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -76,6 +76,7 @@ import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.server.KillQueryImpl; +import org.apache.hive.service.server.KillQueryZookeeperManager; import org.apache.hive.service.server.ThreadWithGarbageCleanup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,7 +168,11 @@ public void open(Map sessionConfMap) throws HiveSQLException { } catch (Exception e) { throw new HiveSQLException(e); } - sessionState.setKillQuery(new KillQueryImpl(operationManager)); + KillQueryZookeeperManager killQueryZookeeperManager = null; + if (sessionManager != null) { + killQueryZookeeperManager = sessionManager.getKillQueryZookeeperManager(); + } + sessionState.setKillQuery(new KillQueryImpl(operationManager, killQueryZookeeperManager)); SessionState.start(sessionState); try { sessionState.loadAuxJars(); diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 277519cba5..57031f4350 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -54,6 +54,7 @@ import org.apache.hive.service.rpc.thrift.TOpenSessionReq; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.server.HiveServer2; +import org.apache.hive.service.server.KillQueryZookeeperManager; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +86,7 @@ private int ipAddressLimit; private int userIpAddressLimit; private final OperationManager operationManager = new OperationManager(); + private KillQueryZookeeperManager killQueryZookeeperManager; private ThreadPoolExecutor backgroundOperationPool; private boolean isOperationLogEnabled; private File operationLogRootDir; @@ -114,6 +116,12 @@ public synchronized void init(HiveConf hiveConf) { } createBackgroundOperationPool(); addService(operationManager); + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && + !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE) && + hiveConf.getBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) { + killQueryZookeeperManager = new KillQueryZookeeperManager(operationManager, hiveServer2); + addService(killQueryZookeeperManager); + } initSessionImplClassName(); Metrics metrics = MetricsFactory.getInstance(); if(metrics != null){ @@ -625,6 +633,10 @@ public OperationManager getOperationManager() { return operationManager; } + public KillQueryZookeeperManager getKillQueryZookeeperManager() { + return killQueryZookeeperManager; + } + private static ThreadLocal threadLocalIpAddress = new ThreadLocal(); public static void setIpAddress(String ipAddress) { diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 181ea5d6d5..fe5ee710bf 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -622,7 +622,7 @@ public boolean isDeregisteredWithZooKeeper() { return false; } - private String getServerInstanceURI() throws Exception { + public String getServerInstanceURI() throws Exception { if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) { throw new Exception("Unable to get the server address; it hasn't been initialized yet."); } diff --git service/src/java/org/apache/hive/service/server/KillQueryImpl.java service/src/java/org/apache/hive/service/server/KillQueryImpl.java index 883e32bd2e..452ec17591 100644 --- service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -55,14 +55,19 @@ private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class); private final OperationManager operationManager; - private enum TagOrId {TAG, ID, UNKNOWN}; + private final KillQueryZookeeperManager killQueryZookeeperManager; - public KillQueryImpl(OperationManager operationManager) { + private enum TagOrId {TAG, ID, UNKNOWN} + + + public KillQueryImpl(OperationManager operationManager, KillQueryZookeeperManager killQueryZookeeperManager) { this.operationManager = operationManager; + this.killQueryZookeeperManager = killQueryZookeeperManager; } - public static Set getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException { - Set childYarnJobs = new HashSet(); + public static Set getChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin) + throws IOException, YarnException { + Set childYarnJobs = new HashSet<>(); GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); gar.setScope(ApplicationsRequestScope.OWN); gar.setApplicationTags(Collections.singleton(tag)); @@ -70,10 +75,18 @@ public KillQueryImpl(OperationManager operationManager) { ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); GetApplicationsResponse apps = proxy.getApplications(gar); List appsList = apps.getApplicationList(); - for(ApplicationReport appReport : appsList) { - if (isAdmin() || appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + SessionState.get() - .getUserName())) { + for (ApplicationReport appReport : appsList) { + if (doAsAdmin) { childYarnJobs.add(appReport.getApplicationId()); + } else if (StringUtils.isNotBlank(doAs)) { + if (appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + doAs)) { + childYarnJobs.add(appReport.getApplicationId()); + } + } else { + if (isAdmin() || appReport.getApplicationTags() + .contains(QueryState.USERID_TAG + "=" + SessionState.get().getUserName())) { + childYarnJobs.add(appReport.getApplicationId()); + } } } @@ -86,13 +99,13 @@ public KillQueryImpl(OperationManager operationManager) { return childYarnJobs; } - public static void killChildYarnJobs(Configuration conf, String tag) { + public static void killChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin) { try { if (tag == null) { return; } LOG.info("Killing yarn jobs using query tag:" + tag); - Set childYarnJobs = getChildYarnJobs(conf, tag); + Set childYarnJobs = getChildYarnJobs(conf, tag, doAs, doAsAdmin); if (!childYarnJobs.isEmpty()) { YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); @@ -102,7 +115,7 @@ public static void killChildYarnJobs(Configuration conf, String tag) { } } } catch (IOException | YarnException ye) { - LOG.warn("Exception occurred while killing child job({})", ye); + LOG.warn("Exception occurred while killing child job({})", tag, ye); } } @@ -110,76 +123,123 @@ private static boolean isAdmin() { boolean isAdmin = false; if (SessionState.get().getAuthorizerV2() != null) { try { - SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY, - new ArrayList(), new ArrayList(), - new HiveAuthzContext.Builder().build()); + SessionState.get().getAuthorizerV2() + .checkPrivileges(HiveOperationType.KILL_QUERY, new ArrayList<>(), + new ArrayList<>(), new HiveAuthzContext.Builder().build()); isAdmin = true; } catch (Exception e) { + LOG.warn("Error while checking privileges", e); } } return isAdmin; } - private boolean cancelOperation(Operation operation, boolean isAdmin, String errMsg) throws - HiveSQLException { - if (isAdmin || operation.getParentSession().getUserName().equals(SessionState.get() - .getAuthenticator().getUserName())) { - OperationHandle handle = operation.getHandle(); - operationManager.cancelOperation(handle, errMsg); - return true; - } else { - return false; + private boolean cancelOperation(Operation operation, String doAs, boolean doAsAdmin, String errMsg) + throws HiveSQLException { + if (!doAsAdmin) { + if (!StringUtils.isBlank(doAs)) { + if (!operation.getParentSession().getUserName().equals(doAs)) { + return false; + } + } else { + if (!isAdmin() && !operation.getParentSession().getUserName() + .equals(SessionState.get().getAuthenticator().getUserName())) { + return false; + } + } + } + OperationHandle handle = operation.getHandle(); + operationManager.cancelOperation(handle, errMsg); + return true; + } + + public boolean isLocalQuery(String queryIdOrTag) { + TagOrId tagOrId = TagOrId.UNKNOWN; + if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { + tagOrId = TagOrId.ID; + } else if (!operationManager.getOperationsByQueryTag(queryIdOrTag).isEmpty()) { + tagOrId = TagOrId.TAG; } + return tagOrId != TagOrId.UNKNOWN; } @Override public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException { - try { - TagOrId tagOrId = TagOrId.UNKNOWN; - Set operationsToKill = new HashSet(); - if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { - operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag)); - tagOrId = TagOrId.ID; - } else { - operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag)); - if (!operationsToKill.isEmpty()) { - tagOrId = TagOrId.TAG; - } + killQuery(queryIdOrTag, errMsg, conf, false, null, false); + } + + public void killLocalQuery(String queryIdOrTag, HiveConf conf, String doAs, boolean doAsAdmin) + throws HiveException { + killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin); + } + + private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean onlyLocal, String doAs, + boolean doAsAdmin) throws HiveException { + errMsg = StringUtils.defaultString(errMsg, "User invoked KILL QUERY"); + TagOrId tagOrId = TagOrId.UNKNOWN; + Set operationsToKill = new HashSet<>(); + if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { + operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag)); + tagOrId = TagOrId.ID; + } else { + operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag)); + if (!operationsToKill.isEmpty()) { + tagOrId = TagOrId.TAG; } - if (operationsToKill.isEmpty()) { - LOG.info("Query not found: " + queryIdOrTag); + } + if (operationsToKill.isEmpty()) { + LOG.debug("Query not found: " + queryIdOrTag); + } + if (tagOrId != TagOrId.UNKNOWN) { + killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin); + } else { + if (!onlyLocal && killQueryZookeeperManager != null && + conf.getBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) { + try { + LOG.debug("Killing query with zookeeper coordination: " + queryIdOrTag); + killQueryZookeeperManager + .killQuery(queryIdOrTag, SessionState.get().getAuthenticator().getUserName(), isAdmin()); + } catch (IOException e) { + LOG.error("Kill query failed for queryId: " + queryIdOrTag, e); + throw new HiveException(e.getMessage(), e); + } } - boolean admin = isAdmin(); - switch(tagOrId) { - case ID: - Operation operation = operationsToKill.iterator().next(); - boolean canceled = cancelOperation(operation, admin, errMsg); - if (canceled) { - String queryTag = operation.getQueryTag(); - if (queryTag == null) { - queryTag = queryIdOrTag; - } - killChildYarnJobs(conf, queryTag); - } else { - // no privilege to cancel - throw new HiveSQLException("No privilege to kill query id"); - } - break; - case TAG: - int numCanceled = 0; - for (Operation operationToKill : operationsToKill) { - if (cancelOperation(operationToKill, admin, errMsg)) { - numCanceled++; - } + } + } + + private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, TagOrId tagOrId, + Set operationsToKill, String doAs, boolean doAsAdmin) throws HiveException { + try { + switch (tagOrId) { + case ID: + Operation operation = operationsToKill.iterator().next(); + boolean canceled = cancelOperation(operation, doAs, doAsAdmin, errMsg); + if (canceled) { + String queryTag = operation.getQueryTag(); + if (queryTag == null) { + queryTag = queryIdOrTag; } - killChildYarnJobs(conf, queryIdOrTag); - if (numCanceled == 0) { - throw new HiveSQLException("No privilege to kill query tag"); + killChildYarnJobs(conf, queryTag, doAs, doAsAdmin); + } else { + // no privilege to cancel + throw new HiveSQLException("No privilege to kill query id"); + } + break; + case TAG: + int numCanceled = 0; + for (Operation operationToKill : operationsToKill) { + if (cancelOperation(operationToKill, doAs, doAsAdmin, errMsg)) { + numCanceled++; } - break; - case UNKNOWN: - killChildYarnJobs(conf, queryIdOrTag); - break; + } + if (numCanceled == 0) { + throw new HiveSQLException("No privilege to kill query tag"); + } else { + killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin); + } + break; + case UNKNOWN: + break; } } catch (HiveSQLException e) { LOG.error("Kill query failed for query " + queryIdOrTag, e); diff --git service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java new file mode 100644 index 0000000000..72584066d2 --- /dev/null +++ service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.server; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.PathUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ZookeeperUtils; +import org.apache.hive.service.AbstractService; +import org.apache.hive.service.cli.operation.OperationManager; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class KillQueryZookeeperManager extends AbstractService { + + private static final Logger LOG = LoggerFactory.getLogger(KillQueryZookeeperManager.class); + private static final String SASL_LOGIN_CONTEXT_NAME = "KillQueryZooKeeperClient"; + + private CuratorFramework zooKeeperClient; + private String zkPrincipal, zkKeytab, zkNameSpace; + private final KillQueryImpl localKillQueryImpl; + private final HiveServer2 hiveServer2; + private HiveConf conf; + + // Path cache to watch queries to kill + private PathChildrenCache killQueryListener = null; + + public KillQueryZookeeperManager(OperationManager operationManager, HiveServer2 hiveServer2) { + super(KillQueryZookeeperManager.class.getSimpleName()); + this.hiveServer2 = hiveServer2; + localKillQueryImpl = new KillQueryImpl(operationManager, this); + } + + @Override public synchronized void init(HiveConf conf) { + this.conf = conf; + zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace), + HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE.varname + " cannot be null or empty"); + this.zkPrincipal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + this.zkKeytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + this.zooKeeperClient = conf.getZKConfig().getNewZookeeperClient(getACLProviderForZKPath("/" + zkNameSpace)); + this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener()); + + super.init(conf); + } + + @Override public synchronized void start() { + super.start(); + if (zooKeeperClient == null) { + throw new RuntimeException("Failed start zookeeperClient in KillQueryZookeeperManager"); + } + try { + ZookeeperUtils.setupZookeeperAuth(this.getHiveConf(), SASL_LOGIN_CONTEXT_NAME, zkPrincipal, zkKeytab); + zooKeeperClient.start(); + try { + zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + zkNameSpace); + if (ZookeeperUtils.isKerberosEnabled(conf)) { + zooKeeperClient.setACL().withACL(createSecureAcls()).forPath("/" + zkNameSpace); + } + LOG.info("Created the root name space: " + zkNameSpace + " on ZooKeeper"); + } catch (KeeperException e) { + if (e.code() != KeeperException.Code.NODEEXISTS) { + LOG.error("Unable to create namespace: " + zkNameSpace + " on ZooKeeper", e); + throw e; + } + } + + killQueryListener = new PathChildrenCache(zooKeeperClient, "/" + zkNameSpace, false); + killQueryListener.start(PathChildrenCache.StartMode.NORMAL); + startListeningForQueries(); + // Init closeable utils in case register is not called (see HIVE-13322) + CloseableUtils.class.getName(); + } catch (Exception e) { + throw new RuntimeException("Failed start zookeeperClient in KillQueryZookeeperManager", e); + } + } + + private ACLProvider getACLProviderForZKPath(String zkPath) { + final boolean isSecure = ZookeeperUtils.isKerberosEnabled(conf); + return new ACLProvider() { + @Override public List getDefaultAcl() { + // We always return something from getAclForPath so this should not happen. + LOG.warn("getDefaultAcl was called"); + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + @Override public List getAclForPath(String path) { + if (!isSecure || path == null || !path.contains(zkPath)) { + // No security or the path is below the user path - full access. + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + return createSecureAcls(); + } + }; + } + + private static List createSecureAcls() { + // Read all to the world + List nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to creator + nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); + return nodeAcls; + } + + private void startListeningForQueries() { + PathChildrenCacheListener listener = (client, pathChildrenCacheEvent) -> { + if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { + KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace, + ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath())); + + KillQueryZookeeperData killQuery = barrier.getKillQueryData(); + if (getServerHost().equals(killQuery.getRequestingServer())) { + // The listener was called for the server who posted the request + return; + } + if (localKillQueryImpl.isLocalQuery(killQuery.getQueryId())) { + LOG.info("Killing query with id {}", killQuery.getQueryId()); + barrier.confirmProgress(getServerHost()); + try { + localKillQueryImpl + .killLocalQuery(killQuery.getQueryId(), conf, killQuery.getDoAs(), killQuery.isDoAsAdmin()); + barrier.confirmDone(getServerHost()); + } catch (Exception e) { + LOG.error("Unable to kill local query", e); + barrier.confirmFailed(getServerHost()); + } + } else { + barrier.confirmNo(getServerHost()); + } + } + }; + killQueryListener.getListenable().addListener(listener); + } + + @Override public synchronized void stop() { + super.stop(); + CloseableUtils.closeQuietly(killQueryListener); + CloseableUtils.closeQuietly(zooKeeperClient); + } + + private List getAllServerUrls() { + List serverHosts = new ArrayList<>(); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && !conf + .getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE)) { + String zooKeeperNamespace = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + try { + serverHosts.addAll(zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace)); + } catch (Exception e) { + LOG.error("Unable the get available server hosts", e); + } + } + return serverHosts; + } + + private String getServerHost() { + if (hiveServer2 == null) { + return ""; + } + try { + return removeDelimiter(hiveServer2.getServerInstanceURI()); + } catch (Exception e) { + LOG.error("Unable to determine the server host", e); + return ""; + } + } + + // for debugging + private static class ZkConnectionStateListener implements ConnectionStateListener { + @Override public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) { + LOG.info("Connection state change notification received. State: {}", connectionState); + } + } + + /** + * Post a kill query request on Zookeeper for the other HS2 instances. If the service discovery is not enabled or + * there is no other server registered, does nothing. Otherwise post the kill query request on Zookeeper and waits for + * the other instances to confirm the kill or deny it. + * + * @param queryIdOrTag queryId or tag to kill + * @param doAs user requesting the kill + * @param doAsAdmin admin user requesting the kill (with KILLQUERY privilege) + * @throws IOException If the kill query failed + */ + public void killQuery(String queryIdOrTag, String doAs, boolean doAsAdmin) throws IOException { + List serverHosts = getAllServerUrls(); + if (serverHosts.size() < 2) { + return; + } + KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace); + boolean result; + try { + barrier.setBarrier(queryIdOrTag, hiveServer2.getServerInstanceURI(), doAs, doAsAdmin); + LOG.info("Created kill query barrier in path: {} for queryId: {}", barrier.getBarrierPath(), queryIdOrTag); + result = barrier.waitOnBarrier(serverHosts.size() - 1, 30, 180, TimeUnit.SECONDS); + + } catch (Exception e) { + LOG.error("Unable to create Barrier on Zookeeper for KillQuery", e); + throw new IOException(e); + } + if (!result) { + throw new IOException("Unable to kill query on remote servers"); + } + } + + /** + * Data to post to Zookeeper for a kill query request. The fields will be serialized with ':' delimiter, so every ':' + * in fields will be escaped. + */ + public static class KillQueryZookeeperData { + private String queryId; + private String requestingServer; + private String doAs; + private boolean doAsAdmin; + + public KillQueryZookeeperData(String queryId, String requestingServer, String doAs, boolean doAsAdmin) { + this.queryId = removeDelimiter(queryId); + this.requestingServer = removeDelimiter(requestingServer); + this.doAs = removeDelimiter(doAs); + this.doAsAdmin = doAsAdmin; + } + + public KillQueryZookeeperData(String data) { + if (data == null) { + return; + } + + String[] elem = data.split(":"); + queryId = elem[0]; + requestingServer = elem[1]; + doAs = elem[2]; + doAsAdmin = Boolean.parseBoolean(elem[3]); + } + + @Override public String toString() { + return queryId + ":" + requestingServer + ":" + doAs + ":" + doAsAdmin; + } + + public String getQueryId() { + return queryId; + } + + public String getRequestingServer() { + return requestingServer; + } + + public String getDoAs() { + return doAs; + } + + public boolean isDoAsAdmin() { + return doAsAdmin; + } + } + + /** + * Zookeeper Barrier for the KillQuery Operation. It post a kill query request on Zookeeper and waits until the given + * number of service instances responses. Implementation is based on org.apache.curator.framework.recipes.barriers.DistributedBarrier. + */ + public static class KillQueryZookeeperBarrier { + private final CuratorFramework client; + private final String barrierPath; + private final Watcher watcher = new Watcher() { + @Override public void process(WatchedEvent event) { + client.postSafeNotify(KillQueryZookeeperBarrier.this); + } + }; + + /** + * @param client client + * @param barrierRootPath rootPath to put the barrier + */ + public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath) { + this(client, barrierRootPath, UUID.randomUUID().toString()); + } + + /** + * @param client client + * @param barrierRootPath rootPath to put the barrier + * @param barrierPath name of the barrier + */ + public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath, String barrierPath) { + this.client = client; + this.barrierPath = PathUtils.validatePath(barrierRootPath + "/" + barrierPath); + } + + public String getBarrierPath() { + return barrierPath; + } + + /** + * Utility to set the barrier node. + * + * @throws Exception errors + */ + public synchronized void setBarrier(String queryId, String requestingServer, String doAs, boolean doAsAdmin) + throws Exception { + try { + KillQueryZookeeperData data = new KillQueryZookeeperData(queryId, requestingServer, doAs, doAsAdmin); + client.create().creatingParentContainersIfNeeded() + .forPath(barrierPath, data.toString().getBytes(StandardCharsets.UTF_8)); + } catch (KeeperException.NodeExistsException e) { + throw new IllegalStateException("Barrier with this path already exists"); + } + } + + public synchronized KillQueryZookeeperData getKillQueryData() throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + byte[] data = client.getData().forPath(barrierPath); + return new KillQueryZookeeperData(new String(data, StandardCharsets.UTF_8)); + } + return null; + } + + /** + * Confirm not knowing the query with the queryId in the barrier. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmNo(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + client.create().forPath(barrierPath + "/NO:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Confirm knowing the query with the queryId in the barrier and starting the kill query process. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmProgress(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + client.create().forPath(barrierPath + "/PROGRESS:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Confirm killing the query with the queryId in the barrier. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmDone(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) { + client.delete().forPath(barrierPath + "/PROGRESS:" + serverId); + } + client.create().forPath(barrierPath + "/DONE:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Confirm failure of killing the query with the queryId in the barrier. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmFailed(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) { + client.delete().forPath(barrierPath + "/PROGRESS:" + serverId); + } + client.create().forPath(barrierPath + "/FAILED:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Wait for every server either confirm killing the query or confirm not knowing the query. + * + * @param confirmationCount number of confirmation to wait for + * @param maxWaitOnConfirmation confirmation waiting timeout for NO answers + * @param maxWaitOnKill timeout for waiting on the actual kill query operation + * @param unit time unit for timeouts + * @return true if the kill was confirmed, false on timeout or if everybody voted for NO + * @throws Exception If confirmation failed + */ + public synchronized boolean waitOnBarrier(int confirmationCount, long maxWaitOnConfirmation, long maxWaitOnKill, + TimeUnit unit) throws Exception { + long startMs = System.currentTimeMillis(); + long startKill = -1; + long maxWaitMs = TimeUnit.MILLISECONDS.convert(maxWaitOnConfirmation, unit); + long maxWaitOnKillMs = TimeUnit.MILLISECONDS.convert(maxWaitOnKill, unit); + + boolean progress = false; + boolean result = false; + while (true) { + List children = client.getChildren().usingWatcher(watcher).forPath(barrierPath); + for (String child : children) { + if (child.startsWith("DONE")) { + result = true; + break; + } + if (child.startsWith("FAILED")) { + result = false; + break; + } + if (child.startsWith("PROGRESS")) { + progress = true; + } + } + if (progress) { + // Wait for the kill query to finish + if (startKill < 0) { + startKill = System.currentTimeMillis(); + } + long elapsed = System.currentTimeMillis() - startKill; + long thisWaitMs = maxWaitOnKillMs - elapsed; + if (thisWaitMs <= 0) { + break; + } + wait(thisWaitMs); + } else { + if (children.size() == confirmationCount) { + result = false; + break; + } + // Wait for confirmation + long elapsed = System.currentTimeMillis() - startMs; + long thisWaitMs = maxWaitMs - elapsed; + if (thisWaitMs <= 0) { + break; + } + wait(thisWaitMs); + } + + } + client.delete().deletingChildrenIfNeeded().forPath(barrierPath); + return result; + } + } + + private static String removeDelimiter(String in) { + if (in == null) { + return null; + } + return in.replaceAll(":", ""); + } +}