diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 98f472981b..d2e9514dda 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -110,7 +110,7 @@ private static Connection hs2Conn = null; // This method should be called by sub-classes in a @BeforeClass initializer - public static void beforeTest(HiveConf inputConf) throws Exception { + public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception { conf = inputConf; Class.forName(MiniHS2.getJdbcDriverName()); miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); @@ -120,6 +120,7 @@ public static void beforeTest(HiveConf inputConf) throws Exception { Map confOverlay = new HashMap(); miniHS2.start(confOverlay); miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + return miniHS2; } static HiveConf defaultConf() throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 8f552b06ff..0410346955 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -3036,6 +3036,29 @@ private void testInsertOverwrite(HiveStatement stmt) throws SQLException { } } + @Test + public void testGetQueryId() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + stmt.executeAsync("create database query_id_test with dbproperties ('repl.source.for' = '1, 2, 3')"); + String queryId = stmt.getQueryId(); + assertFalse(queryId.isEmpty()); + stmt.getUpdateCount(); + + stmt.executeAsync("repl status query_id_test with ('hive.query.id' = 'hiveCustomTag')"); + queryId = stmt.getQueryId(); + assertFalse("hiveCustomTag".equals(queryId)); + stmt.getUpdateCount(); + + stmt.executeAsync("select count(*) from " + dataTypeTableName); + queryId = stmt.getQueryId(); + assertFalse("hiveCustomTag".equals(queryId)); + assertFalse(queryId.isEmpty()); + stmt.getUpdateCount(); + stmt.execute("drop database query_id_test"); + + stmt.close(); + } + // Test that opening a JDBC connection to a non-existent database throws a HiveSQLException @Test(expected = HiveSQLException.class) public void testConnectInvalidDatabase() throws SQLException { diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 2139709dfb..7b7cc98f42 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -1477,71 +1477,6 @@ public Integer evaluate(final Integer value, final Integer ms) { } } - /** - * Test CLI kill command of a query that is running. - * We spawn 2 threads - one running the query and - * the other attempting to cancel. - * We're using a dummy udf to simulate a query, - * that runs for a sufficiently long time. - * @throws Exception - */ - @Test - public void testKillQuery() throws Exception { - Connection con = conTestDb; - Connection con2 = getConnection(testDbName); - - String udfName = SleepMsUDF.class.getName(); - Statement stmt1 = con.createStatement(); - final Statement stmt2 = con2.createStatement(); - stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); - stmt1.close(); - final Statement stmt = con.createStatement(); - final ExceptionHolder tExecuteHolder = new ExceptionHolder(); - final ExceptionHolder tKillHolder = new ExceptionHolder(); - - // Thread executing the query - Thread tExecute = new Thread(new Runnable() { - @Override - public void run() { - try { - System.out.println("Executing query: "); - // The test table has 500 rows, so total query time should be ~ 500*500ms - stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + - "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col"); - fail("Expecting SQLException"); - } catch (SQLException e) { - tExecuteHolder.throwable = e; - } - } - }); - // Thread killing the query - Thread tKill = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(2000); - String queryId = ((HiveStatement) stmt).getQueryId(); - System.out.println("Killing query: " + queryId); - - stmt2.execute("kill query '" + queryId + "'"); - stmt2.close(); - } catch (Exception e) { - tKillHolder.throwable = e; - } - } - }); - - tExecute.start(); - tKill.start(); - tExecute.join(); - tKill.join(); - stmt.close(); - con2.close(); - - assertNotNull("tExecute", tExecuteHolder.throwable); - assertNull("tCancel", tKillHolder.throwable); - } - private static class ExceptionHolder { Throwable throwable; } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index c02980b6f5..65090d6e08 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -30,21 +30,71 @@ import org.junit.BeforeClass; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.junit.AfterClass; + +import org.junit.Test; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Connection; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; +import org.apache.hive.jdbc.miniHS2.MiniHS2; + +import org.junit.Assert; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hive.service.cli.HiveSQLException; /** * TestJdbcWithMiniLlap for Arrow format */ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { - + private static MiniHS2 miniHS2 = null; + private static final String tableName = "testjdbcminihs2tbl"; + private static String dataFileDir; + private static final String tableComment = "Simple table"; + private static final String testDbName = "testjdbcminihs2"; + + private static class ExceptionHolder { + Throwable throwable; + } @BeforeClass public static void beforeTest() throws Exception { HiveConf conf = defaultConf(); conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); - BaseJdbcWithMiniLlap.beforeTest(conf); + MiniHS2.cleanupLocalDir(); + miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + + Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), + System.getProperty("user.name"), "bar"); + Statement stmt = conDefault.createStatement(); + stmt.execute("drop database if exists " + testDbName + " cascade"); + stmt.execute("create database " + testDbName); + stmt.close(); + conDefault.close(); + } + + @AfterClass + public static void afterTest() { + if (miniHS2 != null && miniHS2.isStarted()) { + miniHS2.stop(); + } } @Override @@ -230,5 +280,92 @@ public void testDataTypes() throws Exception { assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); } + public static class SleepMsUDF extends UDF { + public Integer evaluate(int value, int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // No-op + } + return value; + } + } + + /** + * Test CLI kill command of a query that is running. + * We spawn 2 threads - one running the query and + * the other attempting to cancel. + * We're using a dummy udf to simulate a query, + * that runs for a sufficiently long time. + * @throws Exception + */ + @Test + public void testKillQuery() throws Exception { + Connection con = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), + System.getProperty("user.name"), "bar"); + Connection con2 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), + System.getProperty("user.name"), "bar"); + + String udfName = SleepMsUDF.class.getName(); + Statement stmt1 = con.createStatement(); + final Statement stmt2 = con2.createStatement(); + Path dataFilePath = new Path(dataFileDir, "kv1.txt"); + + String tblName = testDbName + "." + tableName; + + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.execute("create table " + tblName + " (int_col int, value string) "); + stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tblName); + + + stmt1.close(); + final Statement stmt = con.createStatement(); + final ExceptionHolder tExecuteHolder = new ExceptionHolder(); + final ExceptionHolder tKillHolder = new ExceptionHolder(); + + // Thread executing the query + Thread tExecute = new Thread(new Runnable() { + @Override + public void run() { + try { + System.out.println("Executing query: "); + stmt.execute("set hive.llap.execution.mode = none"); + + // The test table has 500 rows, so total query time should be ~ 500*500ms + stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + + "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col"); + } catch (SQLException e) { + tExecuteHolder.throwable = e; + } + } + }); + // Thread killing the query + Thread tKill = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(3000); + String queryId = ((HiveStatement) stmt).getQueryId(); + System.out.println("Killing query: " + queryId); + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + } catch (Exception e) { + tKillHolder.throwable = e; + } + } + }); + + tExecute.start(); + tKill.start(); + tExecute.join(); + tKill.join(); + stmt.close(); + con2.close(); + con.close(); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index b1a602c205..e57d565f5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.LineageState; +import org.apache.hadoop.mapreduce.MRJobConfig; /** * The class to store query level info such as queryId. Multiple queries can run @@ -54,6 +55,12 @@ */ private long numModifiedRows = 0; + /** + * Holds the query Id string. Its set by query analyzer if query id is passed as part of query. If its not passed, + * then stores the auto generted query id. + */ + private String queryTag = null; + /** * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object @@ -112,6 +119,25 @@ public long getNumModifiedRows() { public void setNumModifiedRows(long numModifiedRows) { this.numModifiedRows = numModifiedRows; } + + public String getQueryTag() { + return queryTag; + } + + public void setQueryTag(String queryTag) { + this.queryTag = queryTag; + } + + public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) { + String jobTag = queryConf.get(MRJobConfig.JOB_TAGS); + if (jobTag == null) { + jobTag = queryTag; + } else { + jobTag = jobTag.concat("," + queryTag); + } + queryConf.set(MRJobConfig.JOB_TAGS, jobTag); + } + /** * Builder to instantiate the QueryState object. */ @@ -221,6 +247,8 @@ public QueryState build() { if (generateNewQueryId) { String queryId = QueryPlan.makeQueryId(); queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId); + setMapReduceJobTag(queryConf, queryId); + // FIXME: druid storage handler relies on query.id to maintain some staging directories // expose queryid to session level if (hiveConf != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index accd7f1324..467f7280ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3296,7 +3296,7 @@ private int abortTxns(Hive db, AbortTxnsDesc desc) throws HiveException { private int killQuery(Hive db, KillQueryDesc desc) throws HiveException { SessionState sessionState = SessionState.get(); for (String queryId : desc.getQueryIds()) { - sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY"); + sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY", db.getConf()); } LOG.info("kill query called ({})", desc.getQueryIds()); return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 240208a645..11ef62c1c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,6 +203,10 @@ public int executeTask(HiveHistory hiveHistory) { if (hiveHistory != null) { hiveHistory.logPlanProgress(queryPlan); } + + if (conf != null) { + LOG.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS)); + } int retval = execute(driverContext); this.setDone(); if (hiveHistory != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 50d234deaa..f357775c86 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -42,7 +42,8 @@ public void applyAction(final Map queriesViolated) { KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(), + sessionState.getConf()); } } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 7137a17aae..5326e3590f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -438,7 +438,7 @@ private void scheduleWork(WmThreadSyncWork context) { WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL); LOG.info("Invoking KillQuery for " + queryId + ": " + reason); try { - kq.killQuery(queryId, reason); + kq.killQuery(queryId, reason, toKill.getConf()); addKillQueryResult(toKill, true); killCtx.killSessionFuture.set(true); wmEvent.endEvent(toKill); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index adaa3d37af..e4186c45a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; @@ -228,20 +230,7 @@ private void initReplLoad(ASTNode ast) throws SemanticException { tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); break; case TOK_REPL_CONFIG: - Map replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0)); - if (null != replConfigs) { - for (Map.Entry config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); - } - - // As hive conf is changed, need to get the Hive DB again with it. - try { - db = Hive.get(conf); - } catch (HiveException e) { - throw new SemanticException(e); - } - } + setConfigs((ASTNode) childNode.getChild(0)); break; default: throw new SemanticException("Unrecognized token in REPL LOAD statement"); @@ -360,6 +349,32 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } + private void setConfigs(ASTNode node) throws SemanticException { + Map replConfigs = DDLSemanticAnalyzer.getProps(node); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + String key = config.getKey(); + // don't set the query id in the config + if (key.equalsIgnoreCase(HIVEQUERYID.varname)) { + String queryTag = config.getValue(); + if (!StringUtils.isEmpty(queryTag)) { + QueryState.setMapReduceJobTag(conf, queryTag); + } + queryState.setQueryTag(queryTag); + } else { + conf.set(key, config.getValue()); + } + } + + // As hive conf is changed, need to get the Hive DB again with it. + try { + db = Hive.get(conf); + } catch (HiveException e) { + throw new SemanticException(e); + } + } + } + // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); @@ -371,20 +386,7 @@ private void initReplStatus(ASTNode ast) throws SemanticException{ tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); break; case TOK_REPL_CONFIG: - Map replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0)); - if (null != replConfigs) { - for (Map.Entry config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); - } - - // As hive conf is changed, need to get the Hive DB again with it. - try { - db = Hive.get(conf); - } catch (HiveException e) { - throw new SemanticException(e); - } - } + setConfigs((ASTNode) childNode.getChild(0)); break; default: throw new SemanticException("Unrecognized token in REPL STATUS statement"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java index 2e183dce6f..01dc7e2cd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java @@ -18,8 +18,9 @@ package org.apache.hadoop.hive.ql.session; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; public interface KillQuery { - void killQuery(String queryId, String errMsg) throws HiveException; + void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java index b62f22c2d0..eac2936719 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java @@ -18,11 +18,12 @@ package org.apache.hadoop.hive.ql.session; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; public class NullKillQuery implements KillQuery { @Override - public void killQuery(String queryId, String errMsg) throws HiveException { + public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { // Do nothing } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 1ee07568d2..be2a08d535 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -413,4 +413,8 @@ protected void markOperationStartTime() { protected void markOperationCompletedTime() { operationComplete = System.currentTimeMillis(); } + + public String getQueryTag() { + return queryState.getQueryTag(); + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 5336034839..8db6a297df 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -62,6 +62,7 @@ new ConcurrentHashMap(); private final ConcurrentHashMap queryIdOperation = new ConcurrentHashMap(); + private final ConcurrentHashMap queryTagToIdMap = new ConcurrentHashMap<>(); //Following fields for displaying queries on WebUI private Object webuiLock = new Object(); @@ -201,11 +202,32 @@ private void addOperation(Operation operation) { } } + public void updateQueryTag(String queryId, String queryTag) { + Operation operation = queryIdOperation.get(queryId); + if (operation != null) { + String queryIdTemp = queryTagToIdMap.get(queryTag); + if (queryIdTemp != null) { + throw new RuntimeException("tag " + queryTag + " is already applied for query " + queryIdTemp); + } + queryTagToIdMap.put(queryTag, queryId); + LOG.info("Query " + queryId + " is updated with tag " + queryTag); + return; + } + LOG.info("Query id is missing during query tag updation"); + } + private Operation removeOperation(OperationHandle opHandle) { Operation operation = handleToOperation.remove(opHandle); + if (operation == null) { + throw new RuntimeException("Operation does not exist: " + opHandle); + } String queryId = getQueryId(operation); queryIdOperation.remove(queryId); - LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle); + String queryTag = operation.getQueryTag(); + if (queryTag != null) { + queryTagToIdMap.remove(queryTag); + } + LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId, opHandle, queryTag); if (operation instanceof SQLOperation) { removeSafeQueryInfo(opHandle); } @@ -285,9 +307,6 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { public void closeOperation(OperationHandle opHandle) throws HiveSQLException { LOG.info("Closing operation: " + opHandle); Operation operation = removeOperation(opHandle); - if (operation == null) { - throw new HiveSQLException("Operation does not exist: " + opHandle); - } Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { @@ -422,4 +441,12 @@ public QueryInfo getQueryInfo(String handle) { public Operation getOperationByQueryId(String queryId) { return queryIdOperation.get(queryId); } + + public Operation getOperationByQueryTag(String queryTag) { + String queryId = queryTagToIdMap.get(queryTag); + if (queryId != null) { + return getOperationByQueryId(queryId); + } + return null; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 9a07fa1760..36df57e40c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -198,7 +198,9 @@ public void run() { if (0 != response.getResponseCode()) { throw toSQLException("Error while compiling statement", response); } - + if (queryState.getQueryTag() != null && queryState.getQueryId() != null) { + parentSession.updateQueryTag(queryState.getQueryId(), queryState.getQueryTag()); + } setHasResultSet(driver.hasResultSet()); } catch (HiveSQLException e) { setState(OperationState.ERROR); diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index b4070ce20a..cce9c22387 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -200,6 +200,8 @@ OperationHandle getCrossReference(String primaryCatalog, void cancelOperation(OperationHandle opHandle) throws HiveSQLException; + void updateQueryTag(String queryId, String queryTag) throws HiveSQLException; + void closeOperation(OperationHandle opHandle) throws HiveSQLException; TableSchema getResultSetMetadata(OperationHandle opHandle) diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b9a8537f5b..e5cdc7bb7c 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -873,6 +873,11 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { } } + @Override + public void updateQueryTag(String queryId, String queryTag) throws HiveSQLException { + sessionManager.getOperationManager().updateQueryTag(queryId, queryTag); + } + @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { acquire(true, false); diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index b39a7b1aa6..490a04da67 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -18,8 +18,20 @@ package org.apache.hive.service.server; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.operation.Operation; @@ -27,6 +39,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public class KillQueryImpl implements KillQuery { private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class); @@ -36,18 +54,82 @@ public KillQueryImpl(OperationManager operationManager) { this.operationManager = operationManager; } + public static Set getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException { + Set childYarnJobs = new HashSet(); + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(ApplicationsRequestScope.OWN); + gar.setApplicationTags(Collections.singleton(tag)); + + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List appsList = apps.getApplicationList(); + for(ApplicationReport appReport : appsList) { + childYarnJobs.add(appReport.getApplicationId()); + } + + if (childYarnJobs.isEmpty()) { + LOG.info("No child applications found"); + } else { + LOG.info("Found child YARN applications: " + StringUtils.join(childYarnJobs, ",")); + } + + return childYarnJobs; + } + + public static void killChildYarnJobs(Configuration conf, String tag) { + try { + if (tag == null) { + return; + } + Set childYarnJobs = getChildYarnJobs(conf, tag); + if (!childYarnJobs.isEmpty()) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + for (ApplicationId app : childYarnJobs) { + yarnClient.killApplication(app); + } + } + } catch (IOException | YarnException ye) { + throw new RuntimeException("Exception occurred while killing child job(s)", ye); + } + } + @Override - public void killQuery(String queryId, String errMsg) throws HiveException { + public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { try { + String queryTag = null; + Operation operation = operationManager.getOperationByQueryId(queryId); if (operation == null) { - LOG.info("Query not found: " + queryId); + // Check if user has passed the query tag to kill the operation. This is possible if the application + // restarts and it does not have the proper query id. The tag can be used in that case to kill the query. + operation = operationManager.getOperationByQueryTag(queryId); + if (operation == null) { + LOG.info("Query not found: " + queryId); + } } else { + // This is the normal flow, where the query is tagged and user wants to kill the query using the query id. + queryTag = operation.getQueryTag(); + } + + if (queryTag == null) { + //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are + //tagged with query id by default. This will cover the case where the application after restarts wants to kill + //the yarn jobs with query tag. The query tag can be passed as query id. + queryTag = queryId; + } + + LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag); + killChildYarnJobs(conf, queryTag); + + if (operation != null) { OperationHandle handle = operation.getHandle(); operationManager.cancelOperation(handle, errMsg); } } catch (HiveSQLException e) { - throw new HiveException(e); + LOG.error("Kill query failed for query " + queryId, e); + throw new HiveException(e.getMessage(), e); } } }