diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 850b2d558e..ad892bb260 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -3005,6 +3005,28 @@ private void testInsertOverwrite(HiveStatement stmt) throws SQLException { } } + @Test + public void testGetStmt() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + stmt.executeAsync("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')"); + String queryId = stmt.getQueryId(); + assertFalse(queryId.isEmpty()); + stmt.execute("kill query '" + queryId + "'"); + + stmt.executeAsync("repl status default with ('hive.query.id' = 'hiveCustomTag')"); + queryId = stmt.getQueryId(); + assertTrue("hiveCustomTag".equals(queryId)); + stmt.execute("kill query '" + queryId + "'"); + + stmt.executeAsync("select count(*) from " + dataTypeTableName); + queryId = stmt.getQueryId(); + assertFalse("hiveCustomTag".equals(queryId)); + assertFalse(queryId.isEmpty()); + stmt.execute("kill query '" + queryId + "'"); + + stmt.close(); + } + // Test that opening a JDBC connection to a non-existent database throws a HiveSQLException @Test(expected = HiveSQLException.class) public void testConnectInvalidDatabase() throws SQLException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index b1a602c205..a0ef1de157 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -54,6 +54,12 @@ */ private long numModifiedRows = 0; + /** + * Holds the query Id string. Its set by query analyzer if query id is passed as part of query. If its not passed, + * then stores the auto generted query id. + */ + private String queryId = null; + /** * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object @@ -63,7 +69,11 @@ private QueryState(HiveConf conf) { } public String getQueryId() { - return (queryConf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + return ((queryId == null) ? queryConf.getVar(HiveConf.ConfVars.HIVEQUERYID) : queryId); + } + + public void setQueryId(String queryId) { + this.queryId = queryId; } public String getQueryString() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 240208a645..868d64b7a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,8 @@ import java.util.LinkedList; import java.util.List; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; + /** * Task implementation. **/ @@ -202,6 +205,11 @@ public int executeTask(HiveHistory hiveHistory) { if (hiveHistory != null) { hiveHistory.logPlanProgress(queryPlan); } + + String queryId = conf.getVar(HIVEQUERYID); + if (!org.apache.commons.lang.StringUtils.isEmpty(queryId)) { + conf.set(MRJobConfig.JOB_TAGS, queryId); + } int retval = execute(driverContext); this.setDone(); if (hiveHistory != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 5aeae166c0..2a66272828 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; @@ -251,6 +252,11 @@ private void initReplLoad(ASTNode ast) throws SemanticException { conf.set(config.getKey(), config.getValue()); } + String queryId = replConfigs.get(HIVEQUERYID.varname); + if (!StringUtils.isEmpty(queryId)) { + queryState.setQueryId(queryId); + } + // As hive conf is changed, need to get the Hive DB again with it. try { db = Hive.get(conf); @@ -654,6 +660,11 @@ private void initReplStatus(ASTNode ast) throws SemanticException{ conf.set(config.getKey(), config.getValue()); } + String queryId = replConfigs.get(HIVEQUERYID.varname); + if (!StringUtils.isEmpty(queryId)) { + queryState.setQueryId(queryId); + } + // As hive conf is changed, need to get the Hive DB again with it. try { db = Hive.get(conf); diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 5336034839..57fd4b1773 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -201,6 +201,12 @@ private void addOperation(Operation operation) { } } + public void updateQueryId(Operation operation, String queryId) { + queryIdOperation.remove(getQueryId(operation)); + queryIdOperation.put(queryId, operation); + operation.getParentSession().getHiveConf().setVar(ConfVars.HIVEQUERYID, queryId); + } + private Operation removeOperation(OperationHandle opHandle) { Operation operation = handleToOperation.remove(opHandle); String queryId = getQueryId(operation); diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 9a07fa1760..e075111d4e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -258,6 +258,7 @@ public void runInternal() throws HiveSQLException { HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE); if (!asyncPrepare) { prepare(queryState); + getParentSession().updateQueryId(this, queryState.getQueryId()); } if (!runAsync) { runQuery(); diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index b4070ce20a..96bce30f8f 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -34,6 +34,7 @@ import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.operation.Operation; public interface HiveSession extends HiveSessionBase { @@ -200,6 +201,8 @@ OperationHandle getCrossReference(String primaryCatalog, void cancelOperation(OperationHandle opHandle) throws HiveSQLException; + void updateQueryId(Operation operation, String queryId) throws HiveSQLException; + void closeOperation(OperationHandle opHandle) throws HiveSQLException; TableSchema getResultSetMetadata(OperationHandle opHandle) diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b9a8537f5b..5907531db4 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -873,6 +873,11 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { } } + @Override + public void updateQueryId(Operation operation, String queryId) throws HiveSQLException { + sessionManager.getOperationManager().updateQueryId(operation, queryId); + } + @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { acquire(true, false); diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index b39a7b1aa6..0d7ddaf458 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -18,8 +18,20 @@ package org.apache.hive.service.server; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.operation.Operation; @@ -27,6 +39,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public class KillQueryImpl implements KillQuery { private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class); @@ -36,6 +54,48 @@ public KillQueryImpl(OperationManager operationManager) { this.operationManager = operationManager; } + public static Set getChildYarnJobs(Configuration conf, String tag) { + Set childYarnJobs = new HashSet(); + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(ApplicationsRequestScope.OWN); + gar.setApplicationTags(Collections.singleton(tag)); + + try { + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List appsList = apps.getApplicationList(); + for(ApplicationReport appReport : appsList) { + childYarnJobs.add(appReport.getApplicationId()); + } + } catch (YarnException | IOException ioe) { + throw new RuntimeException("Exception occurred while finding child jobs", ioe); + } + + if (childYarnJobs.isEmpty()) { + LOG.info("No child applications found"); + } else { + LOG.info("Found child YARN applications: " + StringUtils.join(childYarnJobs, ",")); + } + + return childYarnJobs; + } + + public static void killChildYarnJobs(Configuration conf, String tag) { + try { + Set childYarnJobs = getChildYarnJobs(conf, tag); + if (!childYarnJobs.isEmpty()) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + for (ApplicationId app : childYarnJobs) { + yarnClient.killApplication(app); + } + } + } catch (IOException | YarnException ye) { + throw new RuntimeException("Exception occurred while killing child job(s)", ye); + } + } + @Override public void killQuery(String queryId, String errMsg) throws HiveException { try { @@ -46,6 +106,8 @@ public void killQuery(String queryId, String errMsg) throws HiveException { OperationHandle handle = operation.getHandle(); operationManager.cancelOperation(handle, errMsg); } + + killChildYarnJobs(SessionState.getSessionConf(), queryId); } catch (HiveSQLException e) { throw new HiveException(e); }