diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 8f552b06ff..38721421bb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -3036,6 +3036,32 @@ private void testInsertOverwrite(HiveStatement stmt) throws SQLException { } } + @Test + public void testGetQueryId() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + stmt.executeAsync("create database query_id_test with dbproperties ('repl.source.for' = '1, 2, 3')"); + String queryId = stmt.getQueryId(); + assertFalse(queryId.isEmpty()); + stmt.getUpdateCount(); + stmt.execute("kill query '" + queryId + "'"); + + stmt.executeAsync("repl status query_id_test with ('hive.query.id' = 'hiveCustomTag')"); + queryId = stmt.getQueryId(); + assertFalse("hiveCustomTag".equals(queryId)); + stmt.getUpdateCount(); + stmt.execute("kill query 'hiveCustomTag' "); + + stmt.executeAsync("select count(*) from " + dataTypeTableName); + queryId = stmt.getQueryId(); + assertFalse("hiveCustomTag".equals(queryId)); + assertFalse(queryId.isEmpty()); + stmt.getUpdateCount(); + stmt.execute("kill query '" + queryId + "'"); + stmt.execute("drop database query_id_test"); + + stmt.close(); + } + // Test that opening a JDBC connection to a non-existent database throws a HiveSQLException @Test(expected = HiveSQLException.class) public void testConnectInvalidDatabase() throws SQLException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index b1a602c205..e57d565f5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.LineageState; +import org.apache.hadoop.mapreduce.MRJobConfig; /** * The class to store query level info such as queryId. Multiple queries can run @@ -54,6 +55,12 @@ */ private long numModifiedRows = 0; + /** + * Holds the query Id string. Its set by query analyzer if query id is passed as part of query. If its not passed, + * then stores the auto generted query id. + */ + private String queryTag = null; + /** * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object @@ -112,6 +119,25 @@ public long getNumModifiedRows() { public void setNumModifiedRows(long numModifiedRows) { this.numModifiedRows = numModifiedRows; } + + public String getQueryTag() { + return queryTag; + } + + public void setQueryTag(String queryTag) { + this.queryTag = queryTag; + } + + public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) { + String jobTag = queryConf.get(MRJobConfig.JOB_TAGS); + if (jobTag == null) { + jobTag = queryTag; + } else { + jobTag = jobTag.concat("," + queryTag); + } + queryConf.set(MRJobConfig.JOB_TAGS, jobTag); + } + /** * Builder to instantiate the QueryState object. */ @@ -221,6 +247,8 @@ public QueryState build() { if (generateNewQueryId) { String queryId = QueryPlan.makeQueryId(); queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId); + setMapReduceJobTag(queryConf, queryId); + // FIXME: druid storage handler relies on query.id to maintain some staging directories // expose queryid to session level if (hiveConf != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 240208a645..11ef62c1c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,6 +203,10 @@ public int executeTask(HiveHistory hiveHistory) { if (hiveHistory != null) { hiveHistory.logPlanProgress(queryPlan); } + + if (conf != null) { + LOG.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS)); + } int retval = execute(driverContext); this.setDone(); if (hiveHistory != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index adaa3d37af..e4186c45a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; @@ -228,20 +230,7 @@ private void initReplLoad(ASTNode ast) throws SemanticException { tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); break; case TOK_REPL_CONFIG: - Map replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0)); - if (null != replConfigs) { - for (Map.Entry config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); - } - - // As hive conf is changed, need to get the Hive DB again with it. - try { - db = Hive.get(conf); - } catch (HiveException e) { - throw new SemanticException(e); - } - } + setConfigs((ASTNode) childNode.getChild(0)); break; default: throw new SemanticException("Unrecognized token in REPL LOAD statement"); @@ -360,6 +349,32 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } + private void setConfigs(ASTNode node) throws SemanticException { + Map replConfigs = DDLSemanticAnalyzer.getProps(node); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + String key = config.getKey(); + // don't set the query id in the config + if (key.equalsIgnoreCase(HIVEQUERYID.varname)) { + String queryTag = config.getValue(); + if (!StringUtils.isEmpty(queryTag)) { + QueryState.setMapReduceJobTag(conf, queryTag); + } + queryState.setQueryTag(queryTag); + } else { + conf.set(key, config.getValue()); + } + } + + // As hive conf is changed, need to get the Hive DB again with it. + try { + db = Hive.get(conf); + } catch (HiveException e) { + throw new SemanticException(e); + } + } + } + // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); @@ -371,20 +386,7 @@ private void initReplStatus(ASTNode ast) throws SemanticException{ tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); break; case TOK_REPL_CONFIG: - Map replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0)); - if (null != replConfigs) { - for (Map.Entry config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); - } - - // As hive conf is changed, need to get the Hive DB again with it. - try { - db = Hive.get(conf); - } catch (HiveException e) { - throw new SemanticException(e); - } - } + setConfigs((ASTNode) childNode.getChild(0)); break; default: throw new SemanticException("Unrecognized token in REPL STATUS statement"); diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 1ee07568d2..be2a08d535 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -413,4 +413,8 @@ protected void markOperationStartTime() { protected void markOperationCompletedTime() { operationComplete = System.currentTimeMillis(); } + + public String getQueryTag() { + return queryState.getQueryTag(); + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 5336034839..c8a6abdd0d 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -62,6 +62,7 @@ new ConcurrentHashMap(); private final ConcurrentHashMap queryIdOperation = new ConcurrentHashMap(); + private final ConcurrentHashMap queryTagToIdMap = new ConcurrentHashMap<>(); //Following fields for displaying queries on WebUI private Object webuiLock = new Object(); @@ -201,11 +202,29 @@ private void addOperation(Operation operation) { } } + public void updateQueryTag(String queryId, String queryTag) { + Operation operation = queryIdOperation.get(queryId); + if (operation != null) { + String queryIdTemp = queryTagToIdMap.get(queryTag); + if (queryIdTemp != null) { + throw new RuntimeException("tag " + queryTag + " is already applied for query " + queryIdTemp); + } + queryTagToIdMap.put(queryTag, queryId); + LOG.info("Query " + queryId + " is updated with tag " + queryTag); + return; + } + LOG.info("Query id is missing during query tag updation"); + } + private Operation removeOperation(OperationHandle opHandle) { Operation operation = handleToOperation.remove(opHandle); String queryId = getQueryId(operation); queryIdOperation.remove(queryId); - LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle); + String queryTag = operation.getQueryTag(); + if (queryTag != null) { + queryTagToIdMap.remove(queryTag); + } + LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId, opHandle, queryTag); if (operation instanceof SQLOperation) { removeSafeQueryInfo(opHandle); } @@ -422,4 +441,12 @@ public QueryInfo getQueryInfo(String handle) { public Operation getOperationByQueryId(String queryId) { return queryIdOperation.get(queryId); } + + public Operation getOperationByQueryTag(String queryTag) { + String queryId = queryTagToIdMap.get(queryTag); + if (queryId != null) { + return getOperationByQueryId(queryId); + } + return null; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 9a07fa1760..36df57e40c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -198,7 +198,9 @@ public void run() { if (0 != response.getResponseCode()) { throw toSQLException("Error while compiling statement", response); } - + if (queryState.getQueryTag() != null && queryState.getQueryId() != null) { + parentSession.updateQueryTag(queryState.getQueryId(), queryState.getQueryTag()); + } setHasResultSet(driver.hasResultSet()); } catch (HiveSQLException e) { setState(OperationState.ERROR); diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index b4070ce20a..f74fe659d6 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -34,6 +34,7 @@ import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.operation.Operation; public interface HiveSession extends HiveSessionBase { @@ -200,6 +201,8 @@ OperationHandle getCrossReference(String primaryCatalog, void cancelOperation(OperationHandle opHandle) throws HiveSQLException; + void updateQueryTag(String queryId, String queryTag) throws HiveSQLException; + void closeOperation(OperationHandle opHandle) throws HiveSQLException; TableSchema getResultSetMetadata(OperationHandle opHandle) diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b9a8537f5b..34551cbe24 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -873,6 +873,11 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { } } + @Override + public void updateQueryTag(String queryId, String queryTag) throws HiveSQLException { + sessionManager.getOperationManager().updateQueryTag(queryId, queryTag); + } + @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { acquire(true, false); diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index b39a7b1aa6..8fbf2079c8 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -18,8 +18,20 @@ package org.apache.hive.service.server; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.operation.Operation; @@ -27,6 +39,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public class KillQueryImpl implements KillQuery { private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class); @@ -36,18 +54,86 @@ public KillQueryImpl(OperationManager operationManager) { this.operationManager = operationManager; } + public static Set getChildYarnJobs(Configuration conf, String tag) { + Set childYarnJobs = new HashSet(); + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(ApplicationsRequestScope.OWN); + gar.setApplicationTags(Collections.singleton(tag)); + + try { + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List appsList = apps.getApplicationList(); + for(ApplicationReport appReport : appsList) { + childYarnJobs.add(appReport.getApplicationId()); + } + } catch (YarnException | IOException ioe) { + throw new RuntimeException("Exception occurred while finding child jobs", ioe); + } + + if (childYarnJobs.isEmpty()) { + LOG.info("No child applications found"); + } else { + LOG.info("Found child YARN applications: " + StringUtils.join(childYarnJobs, ",")); + } + + return childYarnJobs; + } + + public static void killChildYarnJobs(Configuration conf, String tag) { + try { + if (tag == null) { + return; + } + Set childYarnJobs = getChildYarnJobs(conf, tag); + if (!childYarnJobs.isEmpty()) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + for (ApplicationId app : childYarnJobs) { + yarnClient.killApplication(app); + } + } + } catch (IOException | YarnException ye) { + throw new RuntimeException("Exception occurred while killing child job(s)", ye); + } + } + @Override public void killQuery(String queryId, String errMsg) throws HiveException { try { + String queryTag = null; + Operation operation = operationManager.getOperationByQueryId(queryId); if (operation == null) { - LOG.info("Query not found: " + queryId); + // Check if user has passed the query tag to kill the operation. This is possible if the application + // restarts and it does not have the proper query id. The tag can be used in that case to kill the query. + operation = operationManager.getOperationByQueryTag(queryId); + if (operation == null) { + LOG.info("Query not found: " + queryId); + } } else { + // This is the normal flow, where the query is tagged and user wants to kill the query using the query id. + queryTag = operation.getQueryTag(); + } + + if (queryTag == null) { + //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are + //tagged with query id by default. This will cover the case where the application after restarts wants to kill + //the yarn jobs with query tag. The query tag can be passed as query id. + queryTag = queryId; + } + + LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag); + killChildYarnJobs(SessionState.getSessionConf(), queryTag); + + if (operation != null) { OperationHandle handle = operation.getHandle(); operationManager.cancelOperation(handle, errMsg); } } catch (HiveSQLException e) { - throw new HiveException(e); + LOG.error("Kill query failed for query " + queryId, e); + throw new HiveException(e.getMessage(), e); } } }