diff --git a/data/conf/yarn-site.xml b/data/conf/yarn-site.xml
new file mode 100644
index 0000000000..9d95f1f016
--- /dev/null
+++ b/data/conf/yarn-site.xml
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+
+ yarn.nodemanager.aux-services
+ mapreduce_shuffle
+
+
+ yarn.nodemanager.aux-services.mapreduce_shuffle.class
+ org.apache.hadoop.mapred.ShuffleHandler
+
+
+ yarn.resourcemanager.address
+ 127.0.0.1:8032
+
+
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 8f552b06ff..38721421bb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -3036,6 +3036,32 @@ private void testInsertOverwrite(HiveStatement stmt) throws SQLException {
}
}
+ @Test
+ public void testGetQueryId() throws Exception {
+ HiveStatement stmt = (HiveStatement) con.createStatement();
+ stmt.executeAsync("create database query_id_test with dbproperties ('repl.source.for' = '1, 2, 3')");
+ String queryId = stmt.getQueryId();
+ assertFalse(queryId.isEmpty());
+ stmt.getUpdateCount();
+ stmt.execute("kill query '" + queryId + "'");
+
+ stmt.executeAsync("repl status query_id_test with ('hive.query.id' = 'hiveCustomTag')");
+ queryId = stmt.getQueryId();
+ assertFalse("hiveCustomTag".equals(queryId));
+ stmt.getUpdateCount();
+ stmt.execute("kill query 'hiveCustomTag' ");
+
+ stmt.executeAsync("select count(*) from " + dataTypeTableName);
+ queryId = stmt.getQueryId();
+ assertFalse("hiveCustomTag".equals(queryId));
+ assertFalse(queryId.isEmpty());
+ stmt.getUpdateCount();
+ stmt.execute("kill query '" + queryId + "'");
+ stmt.execute("drop database query_id_test");
+
+ stmt.close();
+ }
+
// Test that opening a JDBC connection to a non-existent database throws a HiveSQLException
@Test(expected = HiveSQLException.class)
public void testConnectInvalidDatabase() throws SQLException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index b1a602c205..e57d565f5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.LineageState;
+import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* The class to store query level info such as queryId. Multiple queries can run
@@ -54,6 +55,12 @@
*/
private long numModifiedRows = 0;
+ /**
+ * Holds the query Id string. Its set by query analyzer if query id is passed as part of query. If its not passed,
+ * then stores the auto generted query id.
+ */
+ private String queryTag = null;
+
/**
* Private constructor, use QueryState.Builder instead.
* @param conf The query specific configuration object
@@ -112,6 +119,25 @@ public long getNumModifiedRows() {
public void setNumModifiedRows(long numModifiedRows) {
this.numModifiedRows = numModifiedRows;
}
+
+ public String getQueryTag() {
+ return queryTag;
+ }
+
+ public void setQueryTag(String queryTag) {
+ this.queryTag = queryTag;
+ }
+
+ public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) {
+ String jobTag = queryConf.get(MRJobConfig.JOB_TAGS);
+ if (jobTag == null) {
+ jobTag = queryTag;
+ } else {
+ jobTag = jobTag.concat("," + queryTag);
+ }
+ queryConf.set(MRJobConfig.JOB_TAGS, jobTag);
+ }
+
/**
* Builder to instantiate the QueryState object.
*/
@@ -221,6 +247,8 @@ public QueryState build() {
if (generateNewQueryId) {
String queryId = QueryPlan.makeQueryId();
queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
+ setMapReduceJobTag(queryConf, queryId);
+
// FIXME: druid storage handler relies on query.id to maintain some staging directories
// expose queryid to session level
if (hiveConf != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 240208a645..11ef62c1c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -202,6 +203,10 @@ public int executeTask(HiveHistory hiveHistory) {
if (hiveHistory != null) {
hiveHistory.logPlanProgress(queryPlan);
}
+
+ if (conf != null) {
+ LOG.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS));
+ }
int retval = execute(driverContext);
this.setDone();
if (hiveHistory != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index adaa3d37af..e4186c45a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@
import java.util.List;
import java.util.Map;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
@@ -228,20 +230,7 @@ private void initReplLoad(ASTNode ast) throws SemanticException {
tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
break;
case TOK_REPL_CONFIG:
- Map replConfigs
- = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0));
- if (null != replConfigs) {
- for (Map.Entry config : replConfigs.entrySet()) {
- conf.set(config.getKey(), config.getValue());
- }
-
- // As hive conf is changed, need to get the Hive DB again with it.
- try {
- db = Hive.get(conf);
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
- }
+ setConfigs((ASTNode) childNode.getChild(0));
break;
default:
throw new SemanticException("Unrecognized token in REPL LOAD statement");
@@ -360,6 +349,32 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
}
}
+ private void setConfigs(ASTNode node) throws SemanticException {
+ Map replConfigs = DDLSemanticAnalyzer.getProps(node);
+ if (null != replConfigs) {
+ for (Map.Entry config : replConfigs.entrySet()) {
+ String key = config.getKey();
+ // don't set the query id in the config
+ if (key.equalsIgnoreCase(HIVEQUERYID.varname)) {
+ String queryTag = config.getValue();
+ if (!StringUtils.isEmpty(queryTag)) {
+ QueryState.setMapReduceJobTag(conf, queryTag);
+ }
+ queryState.setQueryTag(queryTag);
+ } else {
+ conf.set(key, config.getValue());
+ }
+ }
+
+ // As hive conf is changed, need to get the Hive DB again with it.
+ try {
+ db = Hive.get(conf);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+
// REPL STATUS
private void initReplStatus(ASTNode ast) throws SemanticException{
dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
@@ -371,20 +386,7 @@ private void initReplStatus(ASTNode ast) throws SemanticException{
tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText());
break;
case TOK_REPL_CONFIG:
- Map replConfigs
- = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0));
- if (null != replConfigs) {
- for (Map.Entry config : replConfigs.entrySet()) {
- conf.set(config.getKey(), config.getValue());
- }
-
- // As hive conf is changed, need to get the Hive DB again with it.
- try {
- db = Hive.get(conf);
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
- }
+ setConfigs((ASTNode) childNode.getChild(0));
break;
default:
throw new SemanticException("Unrecognized token in REPL STATUS statement");
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 1ee07568d2..be2a08d535 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -413,4 +413,8 @@ protected void markOperationStartTime() {
protected void markOperationCompletedTime() {
operationComplete = System.currentTimeMillis();
}
+
+ public String getQueryTag() {
+ return queryState.getQueryTag();
+ }
}
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 5336034839..c8a6abdd0d 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -62,6 +62,7 @@
new ConcurrentHashMap();
private final ConcurrentHashMap queryIdOperation =
new ConcurrentHashMap();
+ private final ConcurrentHashMap queryTagToIdMap = new ConcurrentHashMap<>();
//Following fields for displaying queries on WebUI
private Object webuiLock = new Object();
@@ -201,11 +202,29 @@ private void addOperation(Operation operation) {
}
}
+ public void updateQueryTag(String queryId, String queryTag) {
+ Operation operation = queryIdOperation.get(queryId);
+ if (operation != null) {
+ String queryIdTemp = queryTagToIdMap.get(queryTag);
+ if (queryIdTemp != null) {
+ throw new RuntimeException("tag " + queryTag + " is already applied for query " + queryIdTemp);
+ }
+ queryTagToIdMap.put(queryTag, queryId);
+ LOG.info("Query " + queryId + " is updated with tag " + queryTag);
+ return;
+ }
+ LOG.info("Query id is missing during query tag updation");
+ }
+
private Operation removeOperation(OperationHandle opHandle) {
Operation operation = handleToOperation.remove(opHandle);
String queryId = getQueryId(operation);
queryIdOperation.remove(queryId);
- LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle);
+ String queryTag = operation.getQueryTag();
+ if (queryTag != null) {
+ queryTagToIdMap.remove(queryTag);
+ }
+ LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId, opHandle, queryTag);
if (operation instanceof SQLOperation) {
removeSafeQueryInfo(opHandle);
}
@@ -422,4 +441,12 @@ public QueryInfo getQueryInfo(String handle) {
public Operation getOperationByQueryId(String queryId) {
return queryIdOperation.get(queryId);
}
+
+ public Operation getOperationByQueryTag(String queryTag) {
+ String queryId = queryTagToIdMap.get(queryTag);
+ if (queryId != null) {
+ return getOperationByQueryId(queryId);
+ }
+ return null;
+ }
}
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 9a07fa1760..36df57e40c 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -198,7 +198,9 @@ public void run() {
if (0 != response.getResponseCode()) {
throw toSQLException("Error while compiling statement", response);
}
-
+ if (queryState.getQueryTag() != null && queryState.getQueryId() != null) {
+ parentSession.updateQueryTag(queryState.getQueryId(), queryState.getQueryTag());
+ }
setHasResultSet(driver.hasResultSet());
} catch (HiveSQLException e) {
setState(OperationState.ERROR);
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index b4070ce20a..f74fe659d6 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -34,6 +34,7 @@
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.operation.Operation;
public interface HiveSession extends HiveSessionBase {
@@ -200,6 +201,8 @@ OperationHandle getCrossReference(String primaryCatalog,
void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+ void updateQueryTag(String queryId, String queryTag) throws HiveSQLException;
+
void closeOperation(OperationHandle opHandle) throws HiveSQLException;
TableSchema getResultSetMetadata(OperationHandle opHandle)
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index b9a8537f5b..34551cbe24 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -873,6 +873,11 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
}
}
+ @Override
+ public void updateQueryTag(String queryId, String queryTag) throws HiveSQLException {
+ sessionManager.getOperationManager().updateQueryTag(queryId, queryTag);
+ }
+
@Override
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
acquire(true, false);
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index b39a7b1aa6..8fbf2079c8 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -18,8 +18,20 @@
package org.apache.hive.service.server;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationHandle;
import org.apache.hive.service.cli.operation.Operation;
@@ -27,6 +39,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
public class KillQueryImpl implements KillQuery {
private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class);
@@ -36,18 +54,86 @@ public KillQueryImpl(OperationManager operationManager) {
this.operationManager = operationManager;
}
+ public static Set getChildYarnJobs(Configuration conf, String tag) {
+ Set childYarnJobs = new HashSet();
+ GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+ gar.setScope(ApplicationsRequestScope.OWN);
+ gar.setApplicationTags(Collections.singleton(tag));
+
+ try {
+ ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
+ GetApplicationsResponse apps = proxy.getApplications(gar);
+ List appsList = apps.getApplicationList();
+ for(ApplicationReport appReport : appsList) {
+ childYarnJobs.add(appReport.getApplicationId());
+ }
+ } catch (YarnException | IOException ioe) {
+ throw new RuntimeException("Exception occurred while finding child jobs", ioe);
+ }
+
+ if (childYarnJobs.isEmpty()) {
+ LOG.info("No child applications found");
+ } else {
+ LOG.info("Found child YARN applications: " + StringUtils.join(childYarnJobs, ","));
+ }
+
+ return childYarnJobs;
+ }
+
+ public static void killChildYarnJobs(Configuration conf, String tag) {
+ try {
+ if (tag == null) {
+ return;
+ }
+ Set childYarnJobs = getChildYarnJobs(conf, tag);
+ if (!childYarnJobs.isEmpty()) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ for (ApplicationId app : childYarnJobs) {
+ yarnClient.killApplication(app);
+ }
+ }
+ } catch (IOException | YarnException ye) {
+ throw new RuntimeException("Exception occurred while killing child job(s)", ye);
+ }
+ }
+
@Override
public void killQuery(String queryId, String errMsg) throws HiveException {
try {
+ String queryTag = null;
+
Operation operation = operationManager.getOperationByQueryId(queryId);
if (operation == null) {
- LOG.info("Query not found: " + queryId);
+ // Check if user has passed the query tag to kill the operation. This is possible if the application
+ // restarts and it does not have the proper query id. The tag can be used in that case to kill the query.
+ operation = operationManager.getOperationByQueryTag(queryId);
+ if (operation == null) {
+ LOG.info("Query not found: " + queryId);
+ }
} else {
+ // This is the normal flow, where the query is tagged and user wants to kill the query using the query id.
+ queryTag = operation.getQueryTag();
+ }
+
+ if (queryTag == null) {
+ //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are
+ //tagged with query id by default. This will cover the case where the application after restarts wants to kill
+ //the yarn jobs with query tag. The query tag can be passed as query id.
+ queryTag = queryId;
+ }
+
+ LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag);
+ killChildYarnJobs(SessionState.getSessionConf(), queryTag);
+
+ if (operation != null) {
OperationHandle handle = operation.getHandle();
operationManager.cancelOperation(handle, errMsg);
}
} catch (HiveSQLException e) {
- throw new HiveException(e);
+ LOG.error("Kill query failed for query " + queryId, e);
+ throw new HiveException(e.getMessage(), e);
}
}
}