diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 73f185a1f3..313b9050fe 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2691,6 +2691,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Truststore password when using a client-side certificate with TLS connectivity to ZooKeeper." + "Overrides any explicit value set via the zookeeper.ssl.trustStore.password " + "system property (note the camelCase)."), + HIVE_ZOOKEEPER_KILLQUERY_ENABLE("hive.zookeeper.killquery.enable", true, + "Whether enabled kill query coordination with zookeeper, " + + "when hive.server2.support.dynamic.service.discovery is enabled."), + HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE("hive.zookeeper.killquery.namespace", "killQueries", + "When kill query coordination is enabled, uses this namespace for registering queries to kill with zookeeper"), // Transactions HIVE_TXN_MANAGER("hive.txn.manager", diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 3973ec9270..45b22f9514 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -40,6 +40,7 @@ import java.util.UUID; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -784,6 +785,8 @@ public void run() { con2.close(); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE, + tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index 68a515ccbe..1aab03d08f 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.LlapBaseInputFormat; import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -412,6 +413,8 @@ public void testKillQueryById() throws Exception { testKillQueryInternal(System.getProperty("user.name"), System.getProperty("user.name"), false, tExecuteHolder, tKillHolder); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE, + tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } @@ -431,6 +434,8 @@ public void testKillQueryByTagAdmin() throws Exception { ExceptionHolder tKillHolder = new ExceptionHolder(); testKillQueryInternal("user1", System.getProperty("user.name"), true, tExecuteHolder, tKillHolder); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE, + tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } @@ -440,6 +445,8 @@ public void testKillQueryByTagOwner() throws Exception { ExceptionHolder tKillHolder = new ExceptionHolder(); testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder); assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " "+ KillQueriesOperation.KILL_QUERY_MESSAGE, + tExecuteHolder.throwable.getMessage()); assertNull("tCancel", tKillHolder.throwable); } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java new file mode 100644 index 0000000000..1621e7e52c --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithServiceDiscovery.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test JDBC driver when two HS2 instance is running with service discovery enabled. + */ +public class TestJdbcWithServiceDiscovery { + + private static final Logger LOG = LoggerFactory.getLogger(TestJdbcWithServiceDiscovery.class); + private static final String TABLE_NAME = "testJdbcMinihs2Tbl"; + private static final String DB_NAME = "testJdbcMinihs2"; + private static final String REMOTE_ERROR_MESSAGE = "Unable to kill query locally or on remote servers."; + + private static TestingServer zkServer; + private static MiniHS2 miniHS2server1; + private static MiniHS2 miniHS2server2; + private static String miniHS2directUrl1; + private static String miniHS2directUrl2; + private static Path kvDataFilePath; + + @BeforeClass + public static void setup() throws Exception { + MiniHS2.cleanupLocalDir(); + zkServer = new TestingServer(); + + // Create one MiniHS2 with Tez and one with Local FS only + HiveConf hiveConf1 = getTezConf(); + HiveConf hiveConf2 = new HiveConf(); + + setSDConfigs(hiveConf1); + setSDConfigs(hiveConf2); + + miniHS2server1 = new MiniHS2.Builder().withConf(hiveConf1).withMiniTez().build(); + miniHS2server2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build(); + + Class.forName(MiniHS2.getJdbcDriverName()); + String instanceId1 = UUID.randomUUID().toString(); + miniHS2server1.start(getConfOverlay(instanceId1)); + miniHS2directUrl1 = + "jdbc:hive2://" + miniHS2server1.getHost() + ":" + miniHS2server1.getBinaryPort() + "/" + DB_NAME; + String instanceId2 = UUID.randomUUID().toString(); + miniHS2server2.start(getConfOverlay(instanceId2)); + miniHS2directUrl2 = + "jdbc:hive2://" + miniHS2server2.getHost() + ":" + miniHS2server2.getBinaryPort() + "/" + DB_NAME; + + String dataFileDir = hiveConf1.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + setupDb(); + } + + /** + * SleepMsUDF. + */ + public static class SleepMsUDF extends UDF { + public Integer evaluate(int value, int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // No-op + } + return value; + } + } + + public static void setupDb() throws Exception { + Connection conDefault = DriverManager + .getConnection("jdbc:hive2://" + miniHS2server1.getHost() + ":" + miniHS2server1.getBinaryPort() + "/default", + System.getProperty("user.name"), "bar"); + Statement stmt = conDefault.createStatement(); + String tblName = DB_NAME + "." + TABLE_NAME; + stmt.execute("drop database if exists " + DB_NAME + " cascade"); + stmt.execute("create database " + DB_NAME); + stmt.execute("use " + DB_NAME); + stmt.execute("create table " + tblName + " (int_col int, value string) "); + stmt.execute("load data local inpath '" + kvDataFilePath.toString() + "' into table " + tblName); + stmt.execute("grant select on table " + tblName + " to role public"); + + stmt.close(); + conDefault.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if ((miniHS2server1 != null) && miniHS2server1.isStarted()) { + try { + miniHS2server1.stop(); + } catch (Exception e) { + LOG.warn("Error why shutting down Hs2", e); + } + } + if ((miniHS2server2 != null) && miniHS2server2.isStarted()) { + try { + miniHS2server2.stop(); + } catch (Exception e) { + LOG.warn("Error why shutting down Hs2", e); + } + } + if (zkServer != null) { + zkServer.close(); + zkServer = null; + } + MiniHS2.cleanupLocalDir(); + } + + private static HiveConf getTezConf() throws Exception { + String confDir = "../../data/conf/tez/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + HiveConf defaultConf = new HiveConf(); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + return defaultConf; + } + + private static void setSDConfigs(HiveConf conf) { + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, true); + conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE, false); + conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, 2, TimeUnit.SECONDS); + conf.setTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 100, TimeUnit.MILLISECONDS); + conf.setIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES, 1); + conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE, true); + } + + private static Map getConfOverlay(final String instanceId) { + Map confOverlay = new HashMap<>(); + confOverlay.put("hive.server2.zookeeper.publish.configs", "true"); + confOverlay.put(ZkRegistryBase.UNIQUE_IDENTIFIER, instanceId); + return confOverlay; + } + + private static class ExceptionHolder { + Throwable throwable; + } + + private void executeQueryAndKill(Connection con1, Connection con2, ExceptionHolder tExecuteHolder, + ExceptionHolder tKillHolder) throws SQLException, InterruptedException { + final HiveStatement stmt = (HiveStatement) con1.createStatement(); + final Statement stmt2 = con2.createStatement(); + final StringBuffer stmtQueryId = new StringBuffer(); + + // Thread executing the query + Thread tExecute = new Thread(() -> { + try { + LOG.info("Executing waiting query."); + // The test table has 500 rows, so total query time should be ~ 500*500ms + stmt.executeAsync( + "select sleepMsUDF(t1.int_col, 10), t1.int_col, t2.int_col " + "from " + TABLE_NAME + " t1 join " + + TABLE_NAME + " t2 on t1.int_col = t2.int_col"); + stmtQueryId.append(stmt.getQueryId()); + stmt.getUpdateCount(); + } catch (SQLException e) { + tExecuteHolder.throwable = e; + } + }); + + tExecute.start(); + + // wait for other thread to create the stmt handle + int count = 0; + while (count < 10) { + try { + Thread.sleep(2000); + String queryId; + if (stmtQueryId.length() != 0) { + queryId = stmtQueryId.toString(); + } else { + count++; + continue; + } + + LOG.info("Killing query: " + queryId); + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + break; + } catch (SQLException e) { + LOG.warn("Exception when kill query", e); + tKillHolder.throwable = e; + break; + } + } + + tExecute.join(); + try { + stmt.close(); + con1.close(); + con2.close(); + } catch (Exception e) { + LOG.warn("Exception when close stmt and con", e); + } + } + + @Test + public void testKillQueryWithSameServer() throws Exception { + Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar"); + Connection con2 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar"); + + Statement stmt = con1.createStatement(); + stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'"); + stmt.close(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals("Query was cancelled. User invoked KILL QUERY", tExecuteHolder.throwable.getMessage()); + assertNull("tCancel", tKillHolder.throwable); + } + + @Test + public void testKillQueryWithDifferentServer() throws Exception { + Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar"); + Connection con2 = DriverManager.getConnection(miniHS2directUrl2, System.getProperty("user.name"), "bar"); + + Statement stmt = con1.createStatement(); + stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'"); + stmt.close(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertEquals(HiveStatement.QUERY_CANCELLED_MESSAGE + " " + KillQueriesOperation.KILL_QUERY_MESSAGE, + tExecuteHolder.throwable.getMessage()); + assertNull("tCancel", tKillHolder.throwable); + } + + @Test + public void testKillQueryWithDifferentServerZKTurnedOff() throws Exception { + Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar"); + Connection con2 = DriverManager.getConnection(miniHS2directUrl2, System.getProperty("user.name"), "bar"); + + Statement stmt = con1.createStatement(); + stmt.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'"); + stmt.close(); + + stmt = con2.createStatement(); + stmt.execute("set hive.zookeeper.killquery.enable = false"); + stmt.close(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + executeQueryAndKill(con1, con2, tExecuteHolder, tKillHolder); + + assertNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } + + @Test + public void testKillQueryWithRandomId() throws Exception { + Connection con1 = DriverManager.getConnection(miniHS2directUrl1, System.getProperty("user.name"), "bar"); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + Statement stmt = con1.createStatement(); + String queryId = "randomId123"; + try { + LOG.info("Killing query: " + queryId); + stmt.execute("kill query '" + queryId + "'"); + stmt.close(); + } catch (SQLException e) { + LOG.warn("Exception when kill query", e); + tKillHolder.throwable = e; + } + try { + con1.close(); + } catch (Exception e) { + LOG.warn("Exception when close stmt and con", e); + } + + assertNotNull("tCancel", tKillHolder.throwable); + assertTrue(tKillHolder.throwable.getMessage(), tKillHolder.throwable.getMessage().contains(REMOTE_ERROR_MESSAGE)); + } +} diff --git itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java index 99e681e5b2..0df3058359 100644 --- itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java +++ itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/TestMiniHS2StateWithNoZookeeper.java @@ -50,11 +50,14 @@ private static HiveConf hiveConf = null; @BeforeClass - public static void beforeTest() throws Exception { + public static void beforeTest() throws Exception { + MiniHS2.cleanupLocalDir(); hiveConf = new HiveConf(); hiveConf.setBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY, true); hiveConf.setIntVar(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES, 0); hiveConf.setTimeVar(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 0, TimeUnit.MILLISECONDS); + // Disable killquery, this way only HS2 start will fail, not the SessionManager service + hiveConf.setBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE, false); miniHS2 = new MiniHS2(hiveConf); Map confOverlay = new HashMap(); try { diff --git itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java new file mode 100644 index 0000000000..d9997a9c49 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/service/server/TestKillQueryZookeeperManager.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.server; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link KillQueryZookeeperManager}. + */ +public class TestKillQueryZookeeperManager { + + private static final Logger LOG = LoggerFactory.getLogger(TestKillQueryZookeeperManager.class); + private static final String BARRIER_ROOT_PATH = "/killqueries"; + private static final String QUERYID = "QUERY1"; + private static final String SERVER1 = "localhost:1234"; + private static final String SERVER2 = "localhost:1235"; + private static final String USER = "user"; + private static final int TIMEOUT = 1000; + + TestingServer server; + + @Before + public void setupZookeeper() throws Exception { + server = new TestingServer(); + } + + @After + public void shutdown() { + if (server != null) { + CloseableUtils.closeQuietly(server); + } + } + + private CuratorFramework getClient() { + return CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(TIMEOUT * 100) + .connectionTimeoutMs(TIMEOUT).retryPolicy(new RetryOneTime(1)).build(); + } + + @Test + public void testBarrierServerCrash() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + final ExecutorService service = Executors.newSingleThreadExecutor(); + Future future = service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + server.stop(); + return null; + }); + + barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS); + future.get(); + Assert.fail(); + } catch (KeeperException.ConnectionLossException expected) { + // expected + } + } + + @Test + public void testNoBarrier() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + IllegalStateException result = null; + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + try { + barrier.confirmProgress(SERVER1); + } catch (IllegalStateException e) { + result = e; + } + Assert.assertNotNull(result); + Assert.assertEquals("Barrier is not initialised", result.getMessage()); + } + } + + @Test + public void testNo() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + barrier.confirmNo(SERVER2); + return null; + }); + + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } + + @Test + public void testDone() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + try { + barrier.confirmProgress(SERVER2); + Thread.sleep(TIMEOUT / 2); + barrier.confirmDone(SERVER2); + } catch (Exception e) { + LOG.error("Confirmation error", e); + } + return null; + }); + + Assert.assertTrue(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT, TimeUnit.MILLISECONDS)); + } + } + + @Test + public void testFailed() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + barrier.confirmProgress(SERVER2); + Thread.sleep(TIMEOUT / 2); + barrier.confirmFailed(SERVER2); + return null; + }); + + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } + + @Test + public void testConfirmTimeout() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } + + @Test + public void testKillTimeout() throws Exception { + try (CuratorFramework client = getClient()) { + client.start(); + client.create().creatingParentContainersIfNeeded().forPath(BARRIER_ROOT_PATH); + final KillQueryZookeeperManager.KillQueryZookeeperBarrier barrier = + new KillQueryZookeeperManager.KillQueryZookeeperBarrier(client, BARRIER_ROOT_PATH); + barrier.setBarrier(QUERYID, SERVER1, USER, true); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> { + Thread.sleep(TIMEOUT / 2); + barrier.confirmProgress(SERVER2); + // server died + return null; + }); + Assert.assertFalse(barrier.waitOnBarrier(1, TIMEOUT, TIMEOUT * 2, TimeUnit.MILLISECONDS)); + } + } +} diff --git itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 1b60a51ebd..0567c7264e 100644 --- itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -114,6 +114,10 @@ public Builder withMiniMR() { this.miniClusterType = MiniClusterType.MR; return this; } + public Builder withMiniTez() { + this.miniClusterType = MiniClusterType.TEZ; + return this; + } public Builder withMiniKdc(String serverPrincipal, String serverKeytab) { this.useMiniKdc = true; diff --git jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index db965e7a22..543bf8c327 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -63,6 +63,7 @@ public class HiveStatement implements java.sql.Statement { public static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class.getName()); + public static final String QUERY_CANCELLED_MESSAGE = "Query was cancelled."; private static final int DEFAULT_FETCH_SIZE = HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal; @@ -394,9 +395,9 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { // 01000 -> warning String errMsg = statusResp.getErrorMessage(); if (errMsg != null && !errMsg.isEmpty()) { - throw new SQLException("Query was cancelled. " + errMsg, "01000"); + throw new SQLException(QUERY_CANCELLED_MESSAGE + " " + errMsg, "01000"); } else { - throw new SQLException("Query was cancelled", "01000"); + throw new SQLException(QUERY_CANCELLED_MESSAGE, "01000"); } case TIMEDOUT_STATE: throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); diff --git ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java index afde1a4762..26c7fb8b8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java +++ ql/src/java/org/apache/hadoop/hive/ql/ddl/process/kill/KillQueriesOperation.java @@ -31,11 +31,13 @@ public KillQueriesOperation(DDLOperationContext context, KillQueriesDesc desc) { super(context, desc); } + public static final String KILL_QUERY_MESSAGE = "User invoked KILL QUERY"; + @Override public int execute() throws HiveException { SessionState sessionState = SessionState.get(); for (String queryId : desc.getQueryIds()) { - sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY", context.getDb().getConf()); + sessionState.getKillQuery().killQuery(queryId, KILL_QUERY_MESSAGE, context.getDb().getConf()); } LOG.info("kill query called ({})", desc.getQueryIds()); return 0; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 8becef1cd3..5b3fa8fed2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -842,8 +842,11 @@ private void addJarLRByClassName(String className, final Map clazz, final Map lrMap) throws IOException, LoginException { - final File jar = - new File(Utilities.jarFinderGetJar(clazz)); + String jarPath = Utilities.jarFinderGetJar(clazz); + if (jarPath == null) { + throw new IOException("Can't find jar for: " + clazz); + } + final File jar = new File(jarPath); final String localJarPath = jar.toURI().toURL().toExternalForm(); final LocalResource jarLr = createJarLocalResource(localJarPath); lrMap.put(DagUtils.getBaseName(jarLr), jarLr); diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 9e497545b5..9c7ee54ed2 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -76,6 +76,7 @@ import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.server.KillQueryImpl; +import org.apache.hive.service.server.KillQueryZookeeperManager; import org.apache.hive.service.server.ThreadWithGarbageCleanup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,7 +168,11 @@ public void open(Map sessionConfMap) throws HiveSQLException { } catch (Exception e) { throw new HiveSQLException(e); } - sessionState.setKillQuery(new KillQueryImpl(operationManager)); + KillQueryZookeeperManager killQueryZookeeperManager = null; + if (sessionManager != null) { + killQueryZookeeperManager = sessionManager.getKillQueryZookeeperManager(); + } + sessionState.setKillQuery(new KillQueryImpl(operationManager, killQueryZookeeperManager)); SessionState.start(sessionState); try { sessionState.loadAuxJars(); diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 277519cba5..57031f4350 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -54,6 +54,7 @@ import org.apache.hive.service.rpc.thrift.TOpenSessionReq; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.server.HiveServer2; +import org.apache.hive.service.server.KillQueryZookeeperManager; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +86,7 @@ private int ipAddressLimit; private int userIpAddressLimit; private final OperationManager operationManager = new OperationManager(); + private KillQueryZookeeperManager killQueryZookeeperManager; private ThreadPoolExecutor backgroundOperationPool; private boolean isOperationLogEnabled; private File operationLogRootDir; @@ -114,6 +116,12 @@ public synchronized void init(HiveConf hiveConf) { } createBackgroundOperationPool(); addService(operationManager); + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && + !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE) && + hiveConf.getBoolVar(ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) { + killQueryZookeeperManager = new KillQueryZookeeperManager(operationManager, hiveServer2); + addService(killQueryZookeeperManager); + } initSessionImplClassName(); Metrics metrics = MetricsFactory.getInstance(); if(metrics != null){ @@ -625,6 +633,10 @@ public OperationManager getOperationManager() { return operationManager; } + public KillQueryZookeeperManager getKillQueryZookeeperManager() { + return killQueryZookeeperManager; + } + private static ThreadLocal threadLocalIpAddress = new ThreadLocal(); public static void setIpAddress(String ipAddress) { diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 181ea5d6d5..fe5ee710bf 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -622,7 +622,7 @@ public boolean isDeregisteredWithZooKeeper() { return false; } - private String getServerInstanceURI() throws Exception { + public String getServerInstanceURI() throws Exception { if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) { throw new Exception("Unable to get the server address; it hasn't been initialized yet."); } diff --git service/src/java/org/apache/hive/service/server/KillQueryImpl.java service/src/java/org/apache/hive/service/server/KillQueryImpl.java index 883e32bd2e..e15cd1f8a0 100644 --- service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -22,10 +22,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.ddl.process.kill.KillQueriesOperation; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; -import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -55,14 +55,19 @@ private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class); private final OperationManager operationManager; - private enum TagOrId {TAG, ID, UNKNOWN}; + private final KillQueryZookeeperManager killQueryZookeeperManager; - public KillQueryImpl(OperationManager operationManager) { + private enum TagOrId {TAG, ID, UNKNOWN} + + + public KillQueryImpl(OperationManager operationManager, KillQueryZookeeperManager killQueryZookeeperManager) { this.operationManager = operationManager; + this.killQueryZookeeperManager = killQueryZookeeperManager; } - public static Set getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException { - Set childYarnJobs = new HashSet(); + public static Set getChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin) + throws IOException, YarnException { + Set childYarnJobs = new HashSet<>(); GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); gar.setScope(ApplicationsRequestScope.OWN); gar.setApplicationTags(Collections.singleton(tag)); @@ -70,10 +75,13 @@ public KillQueryImpl(OperationManager operationManager) { ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); GetApplicationsResponse apps = proxy.getApplications(gar); List appsList = apps.getApplicationList(); - for(ApplicationReport appReport : appsList) { - if (isAdmin() || appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + SessionState.get() - .getUserName())) { + for (ApplicationReport appReport : appsList) { + if (doAsAdmin) { childYarnJobs.add(appReport.getApplicationId()); + } else if (StringUtils.isNotBlank(doAs)) { + if (appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" + doAs)) { + childYarnJobs.add(appReport.getApplicationId()); + } } } @@ -86,13 +94,13 @@ public KillQueryImpl(OperationManager operationManager) { return childYarnJobs; } - public static void killChildYarnJobs(Configuration conf, String tag) { + public static void killChildYarnJobs(Configuration conf, String tag, String doAs, boolean doAsAdmin) { try { if (tag == null) { return; } LOG.info("Killing yarn jobs using query tag:" + tag); - Set childYarnJobs = getChildYarnJobs(conf, tag); + Set childYarnJobs = getChildYarnJobs(conf, tag, doAs, doAsAdmin); if (!childYarnJobs.isEmpty()) { YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); @@ -102,7 +110,7 @@ public static void killChildYarnJobs(Configuration conf, String tag) { } } } catch (IOException | YarnException ye) { - LOG.warn("Exception occurred while killing child job({})", ye); + LOG.warn("Exception occurred while killing child job({})", tag, ye); } } @@ -110,76 +118,117 @@ private static boolean isAdmin() { boolean isAdmin = false; if (SessionState.get().getAuthorizerV2() != null) { try { - SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY, - new ArrayList(), new ArrayList(), - new HiveAuthzContext.Builder().build()); + SessionState.get().getAuthorizerV2() + .checkPrivileges(HiveOperationType.KILL_QUERY, new ArrayList<>(), + new ArrayList<>(), new HiveAuthzContext.Builder().build()); isAdmin = true; } catch (Exception e) { + LOG.warn("Error while checking privileges", e); } } return isAdmin; } - private boolean cancelOperation(Operation operation, boolean isAdmin, String errMsg) throws - HiveSQLException { - if (isAdmin || operation.getParentSession().getUserName().equals(SessionState.get() - .getAuthenticator().getUserName())) { + private boolean cancelOperation(Operation operation, String doAs, boolean doAsAdmin, String errMsg) + throws HiveSQLException { + if (doAsAdmin || (!StringUtils.isBlank(doAs) && operation.getParentSession().getUserName().equals(doAs))) { OperationHandle handle = operation.getHandle(); operationManager.cancelOperation(handle, errMsg); return true; - } else { - return false; } + return false; + } + + public boolean isLocalQuery(String queryIdOrTag) { + TagOrId tagOrId = TagOrId.UNKNOWN; + if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { + tagOrId = TagOrId.ID; + } else if (!operationManager.getOperationsByQueryTag(queryIdOrTag).isEmpty()) { + tagOrId = TagOrId.TAG; + } + return tagOrId != TagOrId.UNKNOWN; } @Override public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException { - try { - TagOrId tagOrId = TagOrId.UNKNOWN; - Set operationsToKill = new HashSet(); - if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { - operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag)); - tagOrId = TagOrId.ID; - } else { - operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag)); - if (!operationsToKill.isEmpty()) { - tagOrId = TagOrId.TAG; - } + killQuery(queryIdOrTag, errMsg, conf, false, SessionState.get().getUserName(), isAdmin()); + } + + public void killLocalQuery(String queryIdOrTag, HiveConf conf, String doAs, boolean doAsAdmin) + throws HiveException { + killQuery(queryIdOrTag, null, conf, true, doAs, doAsAdmin); + } + + private void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean onlyLocal, String doAs, + boolean doAsAdmin) throws HiveException { + errMsg = StringUtils.defaultString(errMsg, KillQueriesOperation.KILL_QUERY_MESSAGE); + TagOrId tagOrId = TagOrId.UNKNOWN; + Set operationsToKill = new HashSet<>(); + if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { + operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag)); + tagOrId = TagOrId.ID; + LOG.debug("Query found with id: {}", queryIdOrTag); + } else { + operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag)); + if (!operationsToKill.isEmpty()) { + tagOrId = TagOrId.TAG; + LOG.debug("Query found with tag: {}", queryIdOrTag); } - if (operationsToKill.isEmpty()) { - LOG.info("Query not found: " + queryIdOrTag); + } + if (!operationsToKill.isEmpty()){ + killOperations(queryIdOrTag, errMsg, conf, tagOrId, operationsToKill, doAs, doAsAdmin); + } else { + LOG.debug("Query not found with tag/id: {}", queryIdOrTag); + if (!onlyLocal && killQueryZookeeperManager != null && + conf.getBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_ENABLE)) { + try { + LOG.debug("Killing query with zookeeper coordination: " + queryIdOrTag); + killQueryZookeeperManager + .killQuery(queryIdOrTag, SessionState.get().getAuthenticator().getUserName(), isAdmin()); + } catch (IOException e) { + LOG.error("Kill query failed for queryId: " + queryIdOrTag, e); + throw new HiveException("Unable to kill query locally or on remote servers.", e); + } + } else { + LOG.warn("Unable to kill query with id {}", queryIdOrTag); } - boolean admin = isAdmin(); - switch(tagOrId) { - case ID: - Operation operation = operationsToKill.iterator().next(); - boolean canceled = cancelOperation(operation, admin, errMsg); - if (canceled) { - String queryTag = operation.getQueryTag(); - if (queryTag == null) { - queryTag = queryIdOrTag; - } - killChildYarnJobs(conf, queryTag); - } else { - // no privilege to cancel - throw new HiveSQLException("No privilege to kill query id"); - } - break; - case TAG: - int numCanceled = 0; - for (Operation operationToKill : operationsToKill) { - if (cancelOperation(operationToKill, admin, errMsg)) { - numCanceled++; - } + } + } + + private void killOperations(String queryIdOrTag, String errMsg, HiveConf conf, TagOrId tagOrId, + Set operationsToKill, String doAs, boolean doAsAdmin) throws HiveException { + try { + switch (tagOrId) { + case ID: + Operation operation = operationsToKill.iterator().next(); + boolean canceled = cancelOperation(operation, doAs, doAsAdmin, errMsg); + if (canceled) { + String queryTag = operation.getQueryTag(); + if (queryTag == null) { + queryTag = queryIdOrTag; } - killChildYarnJobs(conf, queryIdOrTag); - if (numCanceled == 0) { - throw new HiveSQLException("No privilege to kill query tag"); + killChildYarnJobs(conf, queryTag, doAs, doAsAdmin); + } else { + // no privilege to cancel + throw new HiveSQLException("No privilege to kill query id"); + } + break; + case TAG: + int numCanceled = 0; + for (Operation operationToKill : operationsToKill) { + if (cancelOperation(operationToKill, doAs, doAsAdmin, errMsg)) { + numCanceled++; } - break; - case UNKNOWN: - killChildYarnJobs(conf, queryIdOrTag); - break; + } + if (numCanceled == 0) { + throw new HiveSQLException("No privilege to kill query tag"); + } else { + killChildYarnJobs(conf, queryIdOrTag, doAs, doAsAdmin); + } + break; + case UNKNOWN: + default: + break; } } catch (HiveSQLException e) { LOG.error("Kill query failed for query " + queryIdOrTag, e); diff --git service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java new file mode 100644 index 0000000000..396364bf79 --- /dev/null +++ service/src/java/org/apache/hive/service/server/KillQueryZookeeperManager.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.server; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.PathUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.registry.impl.ZookeeperUtils; +import org.apache.hive.service.AbstractService; +import org.apache.hive.service.ServiceException; +import org.apache.hive.service.cli.operation.OperationManager; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * Kill query coordination service. + * When service discovery is enabled a local kill query can request a kill + * on every other HS2 server with the queryId or queryTag and wait for confirmation on denial. + * The communication is done through Zookeeper. + */ +public class KillQueryZookeeperManager extends AbstractService { + + private static final Logger LOG = LoggerFactory.getLogger(KillQueryZookeeperManager.class); + private static final String SASL_LOGIN_CONTEXT_NAME = "KillQueryZooKeeperClient"; + public static final int MAX_WAIT_ON_CONFIRMATION_SECONDS = 30; + public static final int MAX_WAIT_ON_KILL_SECONDS = 180; + + private CuratorFramework zooKeeperClient; + private String zkPrincipal, zkKeytab, zkNameSpace; + private final KillQueryImpl localKillQueryImpl; + private final HiveServer2 hiveServer2; + private HiveConf conf; + + // Path cache to watch queries to kill + private PathChildrenCache killQueryListener = null; + + public KillQueryZookeeperManager(OperationManager operationManager, HiveServer2 hiveServer2) { + super(KillQueryZookeeperManager.class.getSimpleName()); + this.hiveServer2 = hiveServer2; + localKillQueryImpl = new KillQueryImpl(operationManager, this); + } + + @Override + public synchronized void init(HiveConf conf) { + this.conf = conf; + zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE); + Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace), + HiveConf.ConfVars.HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE.varname + " cannot be null or empty"); + this.zkPrincipal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + this.zkKeytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + this.zooKeeperClient = conf.getZKConfig().getNewZookeeperClient(getACLProviderForZKPath("/" + zkNameSpace)); + this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener()); + + super.init(conf); + } + + @Override + public synchronized void start() { + super.start(); + if (zooKeeperClient == null) { + throw new ServiceException("Failed start zookeeperClient in KillQueryZookeeperManager"); + } + try { + ZookeeperUtils.setupZookeeperAuth(this.getHiveConf(), SASL_LOGIN_CONTEXT_NAME, zkPrincipal, zkKeytab); + zooKeeperClient.start(); + try { + zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/" + zkNameSpace); + if (ZookeeperUtils.isKerberosEnabled(conf)) { + zooKeeperClient.setACL().withACL(createSecureAcls()).forPath("/" + zkNameSpace); + } + LOG.info("Created the root namespace: " + zkNameSpace + " on ZooKeeper"); + } catch (KeeperException e) { + if (e.code() != KeeperException.Code.NODEEXISTS) { + LOG.error("Unable to create namespace: " + zkNameSpace + " on ZooKeeper", e); + throw e; + } + } + // Create a path cache and start to listen for every kill query request from other servers. + killQueryListener = new PathChildrenCache(zooKeeperClient, "/" + zkNameSpace, false); + killQueryListener.start(PathChildrenCache.StartMode.NORMAL); + startListeningForQueries(); + // Init closeable utils in case register is not called (see HIVE-13322) + CloseableUtils.class.getName(); + } catch (Exception e) { + throw new RuntimeException("Failed start zookeeperClient in KillQueryZookeeperManager", e); + } + LOG.info("KillQueryZookeeperManager service started."); + } + + private ACLProvider getACLProviderForZKPath(String zkPath) { + final boolean isSecure = ZookeeperUtils.isKerberosEnabled(conf); + return new ACLProvider() { + @Override + public List getDefaultAcl() { + // We always return something from getAclForPath so this should not happen. + LOG.warn("getDefaultAcl was called"); + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + @Override + public List getAclForPath(String path) { + if (!isSecure || path == null || !path.contains(zkPath)) { + // No security or the path is below the user path - full access. + return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + return createSecureAcls(); + } + }; + } + + private static List createSecureAcls() { + // Read all to the world + List nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to creator + nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL); + return nodeAcls; + } + + private void startListeningForQueries() { + PathChildrenCacheListener listener = (client, pathChildrenCacheEvent) -> { + if (pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { + KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace, + ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath())); + Optional data = barrier.getKillQueryData(); + if (!data.isPresent()) { + return; + } + KillQueryZookeeperData killQuery = data.get(); + LOG.debug("Kill query request with id {}", killQuery.getQueryId()); + if (getServerHost().equals(killQuery.getRequestingServer())) { + // The listener was called for the server who posted the request + return; + } + if (localKillQueryImpl.isLocalQuery(killQuery.getQueryId())) { + LOG.info("Killing query with id {}", killQuery.getQueryId()); + barrier.confirmProgress(getServerHost()); + try { + localKillQueryImpl + .killLocalQuery(killQuery.getQueryId(), conf, killQuery.getDoAs(), killQuery.isDoAsAdmin()); + barrier.confirmDone(getServerHost()); + } catch (Exception e) { + LOG.error("Unable to kill local query", e); + barrier.confirmFailed(getServerHost()); + } + } else { + LOG.debug("Confirm unknown kill query request with id {}", killQuery.getQueryId()); + barrier.confirmNo(getServerHost()); + } + } + }; + LOG.info("Start to listen for kill query requests."); + killQueryListener.getListenable().addListener(listener); + } + + @Override + public synchronized void stop() { + super.stop(); + LOG.info("Stopping KillQueryZookeeperManager service."); + CloseableUtils.closeQuietly(killQueryListener); + CloseableUtils.closeQuietly(zooKeeperClient); + } + + private List getAllServerUrls() { + List serverHosts = new ArrayList<>(); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY) && !conf + .getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE)) { + String zooKeeperNamespace = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + try { + serverHosts.addAll(zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace)); + } catch (Exception e) { + LOG.error("Unable the get available server hosts", e); + } + } + return serverHosts; + } + + private String getServerHost() { + if (hiveServer2 == null) { + return ""; + } + try { + return removeDelimiter(hiveServer2.getServerInstanceURI()); + } catch (Exception e) { + LOG.error("Unable to determine the server host", e); + return ""; + } + } + + // for debugging + private static class ZkConnectionStateListener implements ConnectionStateListener { + @Override + public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) { + LOG.info("Connection state change notification received. State: {}", connectionState); + } + } + + /** + * Post a kill query request on Zookeeper for the other HS2 instances. If the service discovery is not enabled or + * there is no other server registered, does nothing. Otherwise post the kill query request on Zookeeper + * and waits for the other instances to confirm the kill or deny it. + * + * @param queryIdOrTag queryId or tag to kill + * @param doAs user requesting the kill + * @param doAsAdmin admin user requesting the kill (with KILLQUERY privilege) + * @throws IOException If the kill query failed + */ + public void killQuery(String queryIdOrTag, String doAs, boolean doAsAdmin) throws IOException { + List serverHosts = getAllServerUrls(); + if (serverHosts.size() < 2) { + return; + } + KillQueryZookeeperBarrier barrier = new KillQueryZookeeperBarrier(zooKeeperClient, "/" + zkNameSpace); + boolean result; + try { + barrier.setBarrier(queryIdOrTag, hiveServer2.getServerInstanceURI(), doAs, doAsAdmin); + LOG.info("Created kill query barrier in path: {} for queryId: {}", barrier.getBarrierPath(), queryIdOrTag); + result = barrier.waitOnBarrier(serverHosts.size() - 1, MAX_WAIT_ON_CONFIRMATION_SECONDS, + MAX_WAIT_ON_KILL_SECONDS, TimeUnit.SECONDS); + + } catch (Exception e) { + LOG.error("Unable to create Barrier on Zookeeper for KillQuery", e); + throw new IOException(e); + } + if (!result) { + throw new IOException("Unable to kill query on remote servers"); + } + } + + /** + * Data to post to Zookeeper for a kill query request. The fields will be serialized with ':' delimiter. + * In requestingServer every ':' will be escaped. Other fields can not contain any ':'. + */ + public static class KillQueryZookeeperData { + private String queryId; + private String requestingServer; + private String doAs; + private boolean doAsAdmin; + + public KillQueryZookeeperData(String queryId, String requestingServer, String doAs, boolean doAsAdmin) { + if (!StringUtils.equals(queryId, removeDelimiter(queryId))) { + throw new IllegalArgumentException("QueryId can not contain any ':' character."); + } + this.queryId = queryId; + this.requestingServer = removeDelimiter(requestingServer); + if (!StringUtils.equals(doAs, removeDelimiter(doAs))) { + throw new IllegalArgumentException("doAs can not contain any ':' character."); + } + this.doAs = doAs; + this.doAsAdmin = doAsAdmin; + } + + public KillQueryZookeeperData(String data) { + if (data == null) { + return; + } + + String[] elem = data.split(":"); + queryId = elem[0]; + requestingServer = elem[1]; + doAs = elem[2]; + doAsAdmin = Boolean.parseBoolean(elem[3]); + } + + @Override + public String toString() { + return queryId + ":" + requestingServer + ":" + doAs + ":" + doAsAdmin; + } + + public String getQueryId() { + return queryId; + } + + public String getRequestingServer() { + return requestingServer; + } + + public String getDoAs() { + return doAs; + } + + public boolean isDoAsAdmin() { + return doAsAdmin; + } + } + + /** + * Zookeeper Barrier for the KillQuery Operation. + * It post a kill query request on Zookeeper and waits until the given number of service instances responses. + * Implementation is based on org.apache.curator.framework.recipes.barriers.DistributedBarrier. + */ + public static class KillQueryZookeeperBarrier { + private final CuratorFramework client; + private final String barrierPath; + private final Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + client.postSafeNotify(KillQueryZookeeperBarrier.this); + } + }; + + /** + * @param client client + * @param barrierRootPath rootPath to put the barrier + */ + public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath) { + this(client, barrierRootPath, UUID.randomUUID().toString()); + } + + /** + * @param client client + * @param barrierRootPath rootPath to put the barrier + * @param barrierPath name of the barrier + */ + public KillQueryZookeeperBarrier(CuratorFramework client, String barrierRootPath, String barrierPath) { + this.client = client; + this.barrierPath = PathUtils.validatePath(barrierRootPath + "/" + barrierPath); + } + + public String getBarrierPath() { + return barrierPath; + } + + /** + * Utility to set the barrier node. + * + * @throws Exception errors + */ + public synchronized void setBarrier(String queryId, String requestingServer, String doAs, boolean doAsAdmin) + throws Exception { + try { + KillQueryZookeeperData data = new KillQueryZookeeperData(queryId, requestingServer, doAs, doAsAdmin); + client.create().creatingParentContainersIfNeeded() + .forPath(barrierPath, data.toString().getBytes(StandardCharsets.UTF_8)); + } catch (KeeperException.NodeExistsException e) { + throw new IllegalStateException("Barrier with this path already exists"); + } + } + + public synchronized Optional getKillQueryData() throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + byte[] data = client.getData().forPath(barrierPath); + return Optional.of(new KillQueryZookeeperData(new String(data, StandardCharsets.UTF_8))); + } + return Optional.empty(); + } + + /** + * Confirm not knowing the query with the queryId in the barrier. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmNo(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + client.create().forPath(barrierPath + "/NO:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Confirm knowing the query with the queryId in the barrier and starting the kill query process. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmProgress(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + client.create().forPath(barrierPath + "/PROGRESS:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Confirm killing the query with the queryId in the barrier. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmDone(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) { + client.delete().forPath(barrierPath + "/PROGRESS:" + serverId); + } + client.create().forPath(barrierPath + "/DONE:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Confirm failure of killing the query with the queryId in the barrier. + * + * @param serverId The serverHost confirming the request + * @throws Exception If confirmation failed + */ + public synchronized void confirmFailed(String serverId) throws Exception { + if (client.checkExists().forPath(barrierPath) != null) { + if (client.checkExists().forPath(barrierPath + "/PROGRESS:" + serverId) != null) { + client.delete().forPath(barrierPath + "/PROGRESS:" + serverId); + } + client.create().forPath(barrierPath + "/FAILED:" + serverId); + } else { + throw new IllegalStateException("Barrier is not initialised"); + } + } + + /** + * Wait for every server either confirm killing the query or confirm not knowing the query. + * + * @param confirmationCount number of confirmation to wait for + * @param maxWaitOnConfirmation confirmation waiting timeout for NO answers + * @param maxWaitOnKill timeout for waiting on the actual kill query operation + * @param unit time unit for timeouts + * @return true if the kill was confirmed, false on timeout or if everybody voted for NO + * @throws Exception If confirmation failed + */ + public synchronized boolean waitOnBarrier(int confirmationCount, long maxWaitOnConfirmation, long maxWaitOnKill, + TimeUnit unit) throws Exception { + long startMs = System.currentTimeMillis(); + long startKill = -1; + long maxWaitMs = TimeUnit.MILLISECONDS.convert(maxWaitOnConfirmation, unit); + long maxWaitOnKillMs = TimeUnit.MILLISECONDS.convert(maxWaitOnKill, unit); + + boolean progress = false; + boolean result = false; + while (true) { + List children = client.getChildren().usingWatcher(watcher).forPath(barrierPath); + boolean concluded = false; + for (String child : children) { + if (child.startsWith("DONE")) { + result = true; + concluded = true; + break; + } + if (child.startsWith("FAILED")) { + concluded = true; + break; + } + if (child.startsWith("PROGRESS")) { + progress = true; + } + } + if (concluded) { + break; + } + if (progress) { + // Wait for the kill query to finish + if (startKill < 0) { + startKill = System.currentTimeMillis(); + } + long elapsed = System.currentTimeMillis() - startKill; + long thisWaitMs = maxWaitOnKillMs - elapsed; + if (thisWaitMs <= 0) { + break; + } + wait(thisWaitMs); + } else { + if (children.size() == confirmationCount) { + result = false; + break; + } + // Wait for confirmation + long elapsed = System.currentTimeMillis() - startMs; + long thisWaitMs = maxWaitMs - elapsed; + if (thisWaitMs <= 0) { + break; + } + wait(thisWaitMs); + } + + } + client.delete().deletingChildrenIfNeeded().forPath(barrierPath); + return result; + } + } + + private static String removeDelimiter(String in) { + if (in == null) { + return null; + } + return in.replaceAll(":", ""); + } +} diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java index 71d8651712..1e35795d63 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ZooKeeperHiveHelper.java @@ -29,6 +29,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.nodes.PersistentNode; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.retry.RetryOneTime; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -337,6 +338,9 @@ public CuratorFramework getNewZookeeperClient(ACLProvider zooKeeperAclProvider, } if (maxRetries > 0) { builder = builder.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)); + } else { + // Retry policy is mandatory + builder = builder.retryPolicy(new RetryOneTime(1000)); } if (zooKeeperAclProvider != null) { builder = builder.aclProvider(zooKeeperAclProvider);