diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c9d75c0..323e019 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3373,6 +3373,22 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + // guardrail rules for testing. TODO: remove before submitting patch + HIVE_GUARDRAIL_COUNTER_FILE_BYTES_READ("hive.guardrail.counter.file.bytes.read", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_FILE_BYTES_WRITTEN("hive.guardrail.counter.file.bytes.written", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_READ("hive.guardrail.counter.hdfs.bytes.read", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_WRITTEN("hive.guardrail.counter.hdfs.bytes.written", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_SHUFFLE_BYTES("hive.guardrail.counter.shuffle.bytes", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_EXECUTION_TIME("hive.guardrail.counter.execution.time.ms", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_ELAPSED_TIME("hive.guardrail.counter.elapsed.time.ms", 0L, + "Only for testing"), + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, "Whether to use operator stats to determine reducer parallelism for Hive on Spark.\n" + "If this is false, Hive will use source table stats to determine reducer\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 9183edf..200fb5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.guardrail.Rule; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; @@ -129,6 +130,9 @@ // Identify whether the query involves an UPDATE, DELETE or MERGE private boolean isUpdateDeleteMerge; + // list of rules that can trigger an action + private List rules; + /** * This determines the prefix of the * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.Phase1Ctx#dest} @@ -822,6 +826,14 @@ public void setHiveLocks(List hiveLocks) { this.hiveLocks = hiveLocks; } + public List getRules() { + return rules; + } + + public void setRules(final List rules) { + this.rules = rules; + } + public HiveTxnManager getHiveTxnManager() { return hiveTxnManager; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f..42f62f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -63,6 +63,13 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.guardrail.CounterLimit; +import org.apache.hadoop.hive.ql.guardrail.Expression; +import org.apache.hadoop.hive.ql.guardrail.ExpressionFactory; +import org.apache.hadoop.hive.ql.guardrail.FileSystemCounterLimit; +import org.apache.hadoop.hive.ql.guardrail.Rule; +import org.apache.hadoop.hive.ql.guardrail.RuleFactory; +import org.apache.hadoop.hive.ql.guardrail.TimeCounterLimit; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; @@ -1534,6 +1541,14 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp HiveTxnManager txnManager = SessionState.get().getTxnMgr(); ctx.setHiveTxnManager(txnManager); + List rules = populateRules(conf); + if (rules == null || rules.isEmpty()) { + LOG.info("No guardrail rules will be applied"); + } else { + LOG.info("Enabled guardrail rules: {}", rules); + ctx.setRules(rules); + } + if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation if (isInterrupted()) { @@ -1606,6 +1621,72 @@ else if(plan.getOperation() == HiveOperation.ROLLBACK) { } } + private List populateRules(final HiveConf conf) { + // FIXME: Hook this to metastore information about rules/triggers/actions + List rules = new ArrayList<>(); + long value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_FILE_BYTES_READ); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("file", + FileSystemCounterLimit.FSCounter.BYTES_READ, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_FILE_BYTES_WRITTEN); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("file", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_READ); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_WRITTEN); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_SHUFFLE_BYTES); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_EXECUTION_TIME); + if (value > 0) { + // start time will be updated as soon as DAG runs + CounterLimit counterLimit = new TimeCounterLimit(TimeCounterLimit.TimeCounter.EXECUTION_TIME_MS, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.toString().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_ELAPSED_TIME); + if (value > 0) { + // start time will be updated as soon as query is submitted (before compilation) + CounterLimit counterLimit = new TimeCounterLimit(TimeCounterLimit.TimeCounter.ELAPSED_TIME_MS, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.toString().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + return rules; + } + private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 9e2846c..9d07b37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -22,16 +22,24 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.ql.guardrail.Expression; +import org.apache.hadoop.hive.ql.guardrail.Rule; +import org.apache.hadoop.hive.ql.guardrail.RuleViolationException; +import org.apache.hadoop.hive.ql.guardrail.TimeCounterLimit; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; @@ -45,11 +53,12 @@ Licensed to the Apache Software Foundation (ASF) under one import java.io.IOException; import java.io.InterruptedIOException; import java.io.StringWriter; -import java.util.HashSet; +import java.util.EnumSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; @@ -128,6 +137,7 @@ private boolean isProfilingEnabled() { public int monitorExecution() { boolean done = false; boolean success = false; + boolean ruleViolated = false; int failedCounter = 0; final StopWatch failureTimer = new StopWatch(); int rc = 0; @@ -152,7 +162,8 @@ public int monitorExecution() { context.checkHeartbeaterLockException(); } - status = dagClient.getDAGStatus(new HashSet(), checkInterval); + status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); + checkRuleViolation(status.getDAGCounters(), dagClient, context.getRules()); vertexProgressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); @@ -169,12 +180,14 @@ public int monitorExecution() { case INITING: console.printInfo("Status: Initializing"); this.executionStartTime = System.currentTimeMillis(); + updateRuleStartTime(context.getRules()); break; case RUNNING: if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); this.executionStartTime = System.currentTimeMillis(); + updateRuleStartTime(context.getRules()); running = true; // from running -> failed/succeeded, the AM breaks out of timeouts checkInterval = MAX_CHECK_INTERVAL; @@ -220,10 +233,16 @@ public int monitorExecution() { failureTimer.reset(); failureTimer.start(); } - if (isInterrupted + if (e instanceof RuleViolationException) { + // no retries + failedCounter = MAX_RETRY_FAILURES; + ruleViolated = true; + // no useful stacktrace from rule violation except for the message, so no rethrow + } + if (isInterrupted || ruleViolated || (++failedCounter >= MAX_RETRY_FAILURES && failureTimer.now(TimeUnit.MILLISECONDS) > MAX_RETRY_INTERVAL)) { try { - if (isInterrupted) { + if (isInterrupted || ruleViolated) { console.printInfo("Killing DAG..."); } else { console.printInfo(String.format("Killing DAG... after %d seconds", @@ -233,8 +252,9 @@ public int monitorExecution() { } catch (IOException | TezException tezException) { // best effort } - console - .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + if (!ruleViolated) { + console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + } rc = 1; done = true; } else { @@ -244,8 +264,10 @@ public int monitorExecution() { if (done) { if (rc != 0 && status != null) { for (String diag : status.getDiagnostics()) { - console.printError(diag); - diagnostics.append(diag); + if (!ruleViolated) { + console.printError(diag); + diagnostics.append(diag); + } } } synchronized (shutdownList) { @@ -261,6 +283,54 @@ public int monitorExecution() { return rc; } + private void updateRuleStartTime(final List rules) { + if (rules != null) { + rules.stream() + .filter(r -> r.getExpression().getCounterLimit() instanceof TimeCounterLimit && + r.getExpression().getCounterLimit().getName().equals(TimeCounterLimit.TimeCounter.EXECUTION_TIME_MS.name())) + .findFirst() + .ifPresent(r -> ((TimeCounterLimit) r.getExpression().getCounterLimit()).reset()); + } + } + + private void checkRuleViolation(final TezCounters dagCounters, final DAGClient dagClient, final List rules) + throws RuleViolationException { + if (dagCounters == null || dagClient == null || rules == null) { + return; + } + + for (Rule rule : rules) { + String desiredCounter = rule.getExpression().getCounterLimit().getName(); + if (rule.getExpression().getCounterLimit() instanceof TimeCounterLimit) { + final long elapsed = ((TimeCounterLimit) rule.getExpression().getCounterLimit()).elapsed(TimeUnit.MILLISECONDS); + if (rule.apply(elapsed)) { + final long limit = rule.getExpression().getCounterLimit().getLimit(); + String msg = "Current value: " + elapsed + " Limit: " + limit + "\n"; + if (rule.getAction().equals(Rule.Action.KILL_QUERY)) { + // can only throw unchecked exception from lambda + throw new RuleViolationException(rule, msg); + } + } + } else { + StreamSupport.stream(dagCounters.spliterator(), false) + .filter(cg -> cg.getUnderlyingGroup().findCounter(desiredCounter, false) != null) + .filter(cg -> cg.findCounter(desiredCounter).getValue() > 0) + .findFirst() + .map(mg -> mg.findCounter(desiredCounter).getValue()) + .ifPresent(l -> { + if (rule.apply(l)) { + String msg = "Current value: " + l + " Limit: " + + rule.getExpression().getCounterLimit().getLimit() + "\n"; + if (rule.getAction().equals(Rule.Action.KILL_QUERY)) { + // can only throw unchecked exception from lambda + throw new RuleViolationException(rule, msg); + } + } + }); + } + } + } + private void printSummary(boolean success, Map progressMap) { if (isProfilingEnabled() && success && progressMap != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/CounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/CounterLimit.java new file mode 100644 index 0000000..3bef869 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/CounterLimit.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Counter limit interface for defining counter name and its limit exceeding which action defined in trigger will get + * executed. + */ +public interface CounterLimit { + /** + * Get the name of the counter. + * + * @return name + */ + public String getName(); + + /** + * Get the threshold value for the counter + * + * @return limit + */ + public long getLimit(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Expression.java new file mode 100644 index 0000000..bfd9d9a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Expression.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Expression that is defined in triggers. + * Most expressions will get triggered only after exceeding a limit. As a result, only greater than (>) expression + * is supported. + */ +public interface Expression { + + public enum Predicate { + GREATER_THAN(">"); + + String symbol; + + Predicate(final String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + } + + public interface Builder { + public Builder greaterThan(CounterLimit counter); + + public Expression build(); + } + + /** + * Evaluate current value against this expression. Return true if expression evaluates to true (current > limit) + * else false otherwise + * + * @param current - current value against which expression will be evaluated + * @return + */ + public boolean evaluate(final long current); + + /** + * Return counter limit + * + * @return counter limit + */ + public CounterLimit getCounterLimit(); + + /** + * Return predicate defined in the expression. + * + * @return predicate + */ + public Predicate getPredicate(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/ExpressionFactory.java new file mode 100644 index 0000000..a38454b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/ExpressionFactory.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Factory to create expressions + */ +public class ExpressionFactory { + + public static Expression createExpression(CounterLimit counterLimit) { + return new TriggerExpression(counterLimit, Expression.Predicate.GREATER_THAN); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/FileSystemCounterLimit.java new file mode 100644 index 0000000..66f16e9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/FileSystemCounterLimit.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * File system specific counters with defined limits + */ +public class FileSystemCounterLimit implements CounterLimit { + + public enum FSCounter { + BYTES_READ, + BYTES_WRITTEN, + SHUFFLE_BYTES + } + + private String scheme; + private FSCounter fsCounter; + private long limit; + + public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + this.scheme = scheme == null ? "" : scheme; + this.fsCounter = fsCounter; + this.limit = limit; + } + + @Override + public String getName() { + return scheme.isEmpty() ? fsCounter.name() : scheme.toUpperCase() + "_" + fsCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public String toString() { + return "counter: " + getName() + " limit: " + limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/QueryRule.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/QueryRule.java new file mode 100644 index 0000000..977cb25 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/QueryRule.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Rule with query level scope that contains a name, trigger expression violating which defined action will be executed. + */ +public class QueryRule implements Rule { + private String name; + private Expression expression; + private Action action; + + public QueryRule(final String name, final Expression expression, final Action action) { + this.name = name; + this.expression = expression; + this.action = action; + } + + @Override + public String getName() { + return name; + } + + @Override + public Expression getExpression() { + return expression; + } + + @Override + public Action getAction() { + return action; + } + + @Override + public Scope getScope() { + return Scope.QUERY; + } + + @Override + public boolean apply(final long current) { + if (expression.evaluate(current)) { + switch (action) { + case KILL_QUERY: + return true; + case MOVE_TO_POOL: + // FIXME: act upon the Action + return true; + } + } + return false; + } + + @Override + public String toString() { + return "{ name: " + name + ", expression: " + expression + ", action: " + action + " }"; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Rule.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Rule.java new file mode 100644 index 0000000..63ba32d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Rule.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Rule interface which gets mapped to CREATE RULE .. queries. A rule can have a name, trigger and action. + * Trigger is a simple expression which gets evaluated during the lifecycle of query/session and executes an action + * if the expression defined in trigger evaluates to true. + */ +public interface Rule { + + public enum Action { + KILL_QUERY(""), + MOVE_TO_POOL(""); + + String poolName; + + Action(final String poolName) { + this.poolName = poolName; + } + + public Action setPoolName(final String poolName) { + this.poolName = poolName; + return this; + } + + public String getPoolName() { + return poolName; + } + } + + public enum Scope { + QUERY, + SESSSION, + GLOBAL + } + + /** + * Based on current value, returns true if rule is applied else false. + * + * @param current - current value + * @return true if rule got applied false otherwise + */ + public boolean apply(long current); + + /** + * Get trigger expression + * + * @return expression + */ + public Expression getExpression(); + + /** + * Return the name of the rule + * + * @return rule name + */ + public String getName(); + + /** + * Return the action that will get executed when trigger expression evaluates to true + * + * @return action + */ + public Action getAction(); + + + /** + * Return the scope of this rule + * + * @return scope + */ + public Scope getScope(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleFactory.java new file mode 100644 index 0000000..d5d4443 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Factory for creating rules. + */ +public class RuleFactory { + // FIXME: This should be from metastore columns (CREATE RULE ... -> Metastore -> here) + public static Rule createRule(final String name, final Expression expression, final Rule.Action action, + final Rule.Scope scope) { + switch (scope) { + case QUERY: + return new QueryRule(name, expression, action); + // TODO: handle scope and global rules + } + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleViolationException.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleViolationException.java new file mode 100644 index 0000000..7655106 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleViolationException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * When a rule is violated, this exception will be thrown. + */ +public class RuleViolationException extends RuntimeException { + + public RuleViolationException(Rule rule, String msg) { + super("Rule " + rule.toString() + " violated. " + msg); + } + + public RuleViolationException(final String ruleName, final String msg) { + super("Rule " + ruleName + " violated. " + msg); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TimeCounterLimit.java new file mode 100644 index 0000000..00830e7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TimeCounterLimit.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.util.StopWatch; + +/** + * Time based counters with limits + */ +public class TimeCounterLimit implements CounterLimit, Timer { + public enum TimeCounter { + ELAPSED_TIME_MS, + EXECUTION_TIME_MS + } + + private StopWatch stopWatch; + private TimeCounter timeCounter; + private long limit; + + public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + this.stopWatch = new StopWatch(); + this.timeCounter = timeCounter; + this.limit = limit; + start(); + } + + @Override + public String getName() { + return timeCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public void start() { + this.stopWatch.start(); + } + + @Override + public void stop() { + this.stopWatch.stop(); + } + + @Override + public void reset() { + this.stopWatch.reset(); + } + + @Override + public long elapsed(final TimeUnit timeUnit) { + return this.stopWatch.now(timeUnit); + } + + @Override + public String toString() { + return "counter: " + timeCounter.name() + " limit: " + limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Timer.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Timer.java new file mode 100644 index 0000000..d662790 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Timer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.concurrent.TimeUnit; + +/** + * Stop watch timer interface + */ +public interface Timer { + /** + * Start the timer + */ + public void start(); + + /** + * Stop the timer + */ + public void stop(); + + /** + * Reset the timer + */ + public void reset(); + + /** + * Returned elapsed time requested timeunit + * + * @param timeUnit timeunit + * @return elapsed time + */ + public long elapsed(TimeUnit timeUnit); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TriggerExpression.java new file mode 100644 index 0000000..f58d73d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TriggerExpression.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Simple trigger expression for a rule. + */ +public class TriggerExpression implements Expression { + private CounterLimit counterLimit; + private Predicate predicate; + + public TriggerExpression(final CounterLimit counter, final Predicate predicate) { + this.counterLimit = counter; + this.predicate = predicate; + } + + @Override + public boolean evaluate(final long current) { + if (counterLimit.getLimit() > 0) { + if (predicate.equals(Predicate.GREATER_THAN)) { + return current > counterLimit.getLimit(); + } + } + return false; + } + + @Override + public CounterLimit getCounterLimit() { + return counterLimit; + } + + @Override + public Predicate getPredicate() { + return predicate; + } + + @Override + public String toString() { + return counterLimit.getName() + " " + predicate.getSymbol() + " " + counterLimit.getLimit(); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/guardrail/TestRule.java b/ql/src/test/org/apache/hadoop/hive/ql/guardrail/TestRule.java new file mode 100644 index 0000000..56534d1 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/guardrail/TestRule.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * + */ +public class TestRule { + @Test + public void testSimpleQueryRule() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = RuleFactory.createRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = RuleFactory.createRule("hdfs_write_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + rule = RuleFactory.createRule("local_read_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = RuleFactory.createRule("local_write_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + rule = RuleFactory.createRule("shuffle_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME_MS, 10000)); + rule = RuleFactory.createRule("slow_query", expression, Rule.Action.MOVE_TO_POOL.setPoolName("fake_pool"), + Rule.Scope.QUERY); + assertEquals("counter: EXECUTION_TIME_MS limit: 10000", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(100000)); + } +}