diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3d276323f5..34e8530de6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -19,8 +19,12 @@ package org.apache.hadoop.hive.ql.exec.tez; import org.apache.hive.common.util.Ref; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.impl.client.HttpClients; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import java.io.IOException; +import java.io.InputStreamReader; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -44,6 +48,8 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -87,6 +93,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.CharStreams; /** * @@ -721,6 +728,38 @@ public void shutdown() { + ((dagClient == null) ? " before submit" : "")); if (dagClient == null) return; closeDagClientOnCancellation(dagClient); + if (hasDruidInput()) { + cancelDruidQuery(); + } + } + + private boolean hasDruidInput() { + for (ReadEntity input : queryPlan.getInputs()) { + if (input.getTyp() == Entity.Type.TABLE && + "DruidStorageHandler".equals(input.getTable().getStorageHandler().getClass().getSimpleName())) { + return true; + } + } + return false; + } + + private void cancelDruidQuery() { + try { + String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); + String queryId = queryPlan.getQueryId(); + HttpDelete httpDelete = new HttpDelete(address + "/druid/v2/" + queryId); + LOG.debug("Trying to cancel druid query by executing " + httpDelete.getRequestLine()); + + HttpResponse response = HttpClients.createDefault().execute(httpDelete); + if (response.getStatusLine().getStatusCode() != 200) { + throw new HiveException("DELETE request failed, HTTP error code : " + response.getStatusLine().getStatusCode()); + } + + String reply = CharStreams.toString(new InputStreamReader((response.getEntity().getContent()))); + LOG.debug("Druid query was cancellation was executed successfully the reply was:\n" + reply); + } catch (Exception e) { + LOG.warn("Error cancelling druid query", e); + } } /** DAG client that does dumb global sync on all the method calls;