diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 24c5db0..d42eb81 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3378,6 +3378,22 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + // guardrail rules for testing. TODO: remove before submitting patch + HIVE_GUARDRAIL_COUNTER_FILE_BYTES_READ("hive.guardrail.counter.file.bytes.read", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_FILE_BYTES_WRITTEN("hive.guardrail.counter.file.bytes.written", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_READ("hive.guardrail.counter.hdfs.bytes.read", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_WRITTEN("hive.guardrail.counter.hdfs.bytes.written", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_SHUFFLE_BYTES("hive.guardrail.counter.shuffle.bytes", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_EXECUTION_TIME("hive.guardrail.counter.execution.time.ms", 0L, + "Only for testing"), + HIVE_GUARDRAIL_COUNTER_ELAPSED_TIME("hive.guardrail.counter.elapsed.time.ms", 0L, + "Only for testing"), + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, "Whether to use operator stats to determine reducer parallelism for Hive on Spark.\n" + "If this is false, Hive will use source table stats to determine reducer\n" + diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index bf600c2..2c77da6 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -70,6 +70,13 @@ ${project.version} test-jar + + + io.netty + netty-all + ${netty.version} + test + org.apache.hive.hcatalog hive-hcatalog-core diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 28fa7a5..246ae4d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -94,6 +94,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.rules.ExpectedException; public class TestJdbcWithMiniLlap { private static MiniHS2 miniHS2 = null; @@ -105,6 +106,9 @@ private static HiveConf conf = null; private Connection hs2Conn = null; + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass public static void beforeTest() throws Exception { Class.forName(MiniHS2.getJdbcDriverName()); @@ -233,6 +237,20 @@ public void testLlapInputFormatEndToEnd() throws Exception { } @Test(timeout = 60000) + public void testRuleViolationBytesRead() throws Exception { + createTestTable("testtab1"); + + String query = "set hive.guardrail.counter.shuffle.bytes=100"; + RowCollector rowCollector = new RowCollector(); + processQuery(query, 1, rowCollector); + + thrown.expect(SQLException.class); + thrown.expectMessage("Rule { name: violate_shuffle_bytes, expression: SHUFFLE_BYTES > 100"); + query = "select * from testtab1 order by under_col"; + processQuery(query, 1, rowCollector); + } + + @Test(timeout = 60000) public void testNonAsciiStrings() throws Exception { createTestTable("nonascii", "testtab_nonascii"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 9183edf..200fb5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.guardrail.Rule; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; @@ -129,6 +130,9 @@ // Identify whether the query involves an UPDATE, DELETE or MERGE private boolean isUpdateDeleteMerge; + // list of rules that can trigger an action + private List rules; + /** * This determines the prefix of the * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.Phase1Ctx#dest} @@ -822,6 +826,14 @@ public void setHiveLocks(List hiveLocks) { this.hiveLocks = hiveLocks; } + public List getRules() { + return rules; + } + + public void setRules(final List rules) { + this.rules = rules; + } + public HiveTxnManager getHiveTxnManager() { return hiveTxnManager; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f..42f62f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -63,6 +63,13 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.guardrail.CounterLimit; +import org.apache.hadoop.hive.ql.guardrail.Expression; +import org.apache.hadoop.hive.ql.guardrail.ExpressionFactory; +import org.apache.hadoop.hive.ql.guardrail.FileSystemCounterLimit; +import org.apache.hadoop.hive.ql.guardrail.Rule; +import org.apache.hadoop.hive.ql.guardrail.RuleFactory; +import org.apache.hadoop.hive.ql.guardrail.TimeCounterLimit; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; @@ -1534,6 +1541,14 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp HiveTxnManager txnManager = SessionState.get().getTxnMgr(); ctx.setHiveTxnManager(txnManager); + List rules = populateRules(conf); + if (rules == null || rules.isEmpty()) { + LOG.info("No guardrail rules will be applied"); + } else { + LOG.info("Enabled guardrail rules: {}", rules); + ctx.setRules(rules); + } + if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation if (isInterrupted()) { @@ -1606,6 +1621,72 @@ else if(plan.getOperation() == HiveOperation.ROLLBACK) { } } + private List populateRules(final HiveConf conf) { + // FIXME: Hook this to metastore information about rules/triggers/actions + List rules = new ArrayList<>(); + long value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_FILE_BYTES_READ); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("file", + FileSystemCounterLimit.FSCounter.BYTES_READ, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_FILE_BYTES_WRITTEN); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("file", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_READ); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_HDFS_BYTES_WRITTEN); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_SHUFFLE_BYTES); + if (value > 0) { + CounterLimit counterLimit = new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.getName().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_EXECUTION_TIME); + if (value > 0) { + // start time will be updated as soon as DAG runs + CounterLimit counterLimit = new TimeCounterLimit(TimeCounterLimit.TimeCounter.EXECUTION_TIME_MS, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.toString().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + + value = HiveConf.getLongVar(conf, ConfVars.HIVE_GUARDRAIL_COUNTER_ELAPSED_TIME); + if (value > 0) { + // start time will be updated as soon as query is submitted (before compilation) + CounterLimit counterLimit = new TimeCounterLimit(TimeCounterLimit.TimeCounter.ELAPSED_TIME_MS, value); + Expression expression = ExpressionFactory.createExpression(counterLimit); + rules.add(RuleFactory.createRule("violate_" + counterLimit.toString().toLowerCase(), + expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY)); + } + return rules; + } + private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 9e2846c..9aea75b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -22,16 +22,24 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.ql.guardrail.Expression; +import org.apache.hadoop.hive.ql.guardrail.Rule; +import org.apache.hadoop.hive.ql.guardrail.RuleViolationException; +import org.apache.hadoop.hive.ql.guardrail.TimeCounterLimit; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; @@ -45,11 +53,12 @@ Licensed to the Apache Software Foundation (ASF) under one import java.io.IOException; import java.io.InterruptedIOException; import java.io.StringWriter; -import java.util.HashSet; +import java.util.EnumSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; @@ -128,6 +137,7 @@ private boolean isProfilingEnabled() { public int monitorExecution() { boolean done = false; boolean success = false; + boolean ruleViolated = false; int failedCounter = 0; final StopWatch failureTimer = new StopWatch(); int rc = 0; @@ -152,7 +162,8 @@ public int monitorExecution() { context.checkHeartbeaterLockException(); } - status = dagClient.getDAGStatus(new HashSet(), checkInterval); + status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); + checkRuleViolation(status.getDAGCounters(), dagClient, context.getRules()); vertexProgressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); @@ -169,12 +180,14 @@ public int monitorExecution() { case INITING: console.printInfo("Status: Initializing"); this.executionStartTime = System.currentTimeMillis(); + updateRuleStartTime(context.getRules()); break; case RUNNING: if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); this.executionStartTime = System.currentTimeMillis(); + updateRuleStartTime(context.getRules()); running = true; // from running -> failed/succeeded, the AM breaks out of timeouts checkInterval = MAX_CHECK_INTERVAL; @@ -220,10 +233,18 @@ public int monitorExecution() { failureTimer.reset(); failureTimer.start(); } - if (isInterrupted + if (e instanceof RuleViolationException) { + // no retries + failedCounter = MAX_RETRY_FAILURES; + ruleViolated = true; + console.printError(e.getMessage()); + diagnostics.append(e.getMessage()); + // no useful stacktrace from rule violation except for the message, so no rethrow + } + if (isInterrupted || ruleViolated || (++failedCounter >= MAX_RETRY_FAILURES && failureTimer.now(TimeUnit.MILLISECONDS) > MAX_RETRY_INTERVAL)) { try { - if (isInterrupted) { + if (isInterrupted || ruleViolated) { console.printInfo("Killing DAG..."); } else { console.printInfo(String.format("Killing DAG... after %d seconds", @@ -233,8 +254,10 @@ public int monitorExecution() { } catch (IOException | TezException tezException) { // best effort } - console - .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + // on rule violation, no useful stack trace is available + if (!ruleViolated) { + console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + } rc = 1; done = true; } else { @@ -244,8 +267,11 @@ public int monitorExecution() { if (done) { if (rc != 0 && status != null) { for (String diag : status.getDiagnostics()) { - console.printError(diag); - diagnostics.append(diag); + // on rule violation dag status will not have any useful diagnostics + if (!ruleViolated) { + console.printError(diag); + diagnostics.append(diag); + } } } synchronized (shutdownList) { @@ -261,6 +287,54 @@ public int monitorExecution() { return rc; } + private void updateRuleStartTime(final List rules) { + if (rules != null) { + rules.stream() + .filter(r -> r.getExpression().getCounterLimit() instanceof TimeCounterLimit && + r.getExpression().getCounterLimit().getName().equals(TimeCounterLimit.TimeCounter.EXECUTION_TIME_MS.name())) + .findFirst() + .ifPresent(r -> ((TimeCounterLimit) r.getExpression().getCounterLimit()).reset()); + } + } + + private void checkRuleViolation(final TezCounters dagCounters, final DAGClient dagClient, final List rules) + throws RuleViolationException { + if (dagCounters == null || dagClient == null || rules == null) { + return; + } + + for (Rule rule : rules) { + String desiredCounter = rule.getExpression().getCounterLimit().getName(); + if (rule.getExpression().getCounterLimit() instanceof TimeCounterLimit) { + final long elapsed = ((TimeCounterLimit) rule.getExpression().getCounterLimit()).elapsed(TimeUnit.MILLISECONDS); + if (rule.apply(elapsed)) { + final long limit = rule.getExpression().getCounterLimit().getLimit(); + String msg = "Current value: " + elapsed + " Limit: " + limit + "\n"; + if (rule.getAction().equals(Rule.Action.KILL_QUERY)) { + // can only throw unchecked exception from lambda + throw new RuleViolationException(rule, msg); + } + } + } else { + StreamSupport.stream(dagCounters.spliterator(), false) + .filter(cg -> cg.getUnderlyingGroup().findCounter(desiredCounter, false) != null) + .filter(cg -> cg.findCounter(desiredCounter).getValue() > 0) + .findFirst() + .map(mg -> mg.findCounter(desiredCounter).getValue()) + .ifPresent(l -> { + if (rule.apply(l)) { + String msg = "Current value: " + l + " Limit: " + + rule.getExpression().getCounterLimit().getLimit() + "\n"; + if (rule.getAction().equals(Rule.Action.KILL_QUERY)) { + // can only throw unchecked exception from lambda + throw new RuleViolationException(rule, msg); + } + } + }); + } + } + } + private void printSummary(boolean success, Map progressMap) { if (isProfilingEnabled() && success && progressMap != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/CounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/CounterLimit.java new file mode 100644 index 0000000..3bef869 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/CounterLimit.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Counter limit interface for defining counter name and its limit exceeding which action defined in trigger will get + * executed. + */ +public interface CounterLimit { + /** + * Get the name of the counter. + * + * @return name + */ + public String getName(); + + /** + * Get the threshold value for the counter + * + * @return limit + */ + public long getLimit(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Expression.java new file mode 100644 index 0000000..bfd9d9a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Expression.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Expression that is defined in triggers. + * Most expressions will get triggered only after exceeding a limit. As a result, only greater than (>) expression + * is supported. + */ +public interface Expression { + + public enum Predicate { + GREATER_THAN(">"); + + String symbol; + + Predicate(final String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + } + + public interface Builder { + public Builder greaterThan(CounterLimit counter); + + public Expression build(); + } + + /** + * Evaluate current value against this expression. Return true if expression evaluates to true (current > limit) + * else false otherwise + * + * @param current - current value against which expression will be evaluated + * @return + */ + public boolean evaluate(final long current); + + /** + * Return counter limit + * + * @return counter limit + */ + public CounterLimit getCounterLimit(); + + /** + * Return predicate defined in the expression. + * + * @return predicate + */ + public Predicate getPredicate(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/ExpressionFactory.java new file mode 100644 index 0000000..36a95eb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/ExpressionFactory.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.Arrays; + +/** + * Factory to create expressions + */ +public class ExpressionFactory { + + public static Expression fromString(final String expression) { + if (expression == null || expression.isEmpty()) { + return null; + } + + // TODO: Only ">" predicate is supported right now, this has to be extended to support expression tree when + // multiple conditions are required. + Expression.Predicate[] predicates = Expression.Predicate.values(); + boolean noMatch = Arrays.stream(predicates).noneMatch(e -> expression.contains(e.getSymbol())); + if (noMatch) { + throw new IllegalArgumentException("No valid predicates " + Arrays.toString(predicates) + + " found in expression (" + expression + ")"); + } + + Expression.Predicate matchingPredicate = Arrays.stream(predicates) + .filter(e -> expression.contains(e.getSymbol())) + .findFirst() + .get(); + if (matchingPredicate != null) { + final String[] tokens = expression.split(matchingPredicate.getSymbol()); + if (tokens.length != 2) { + throw new IllegalArgumentException("Invalid expression. Expression missing counter or limit."); + } + + final String counterName = tokens[0].trim(); + if (counterName.isEmpty()) { + throw new IllegalArgumentException("Counter name cannot be empty!"); + } + + final long counterValue; + try { + counterValue = Long.parseLong(tokens[1].trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a long value. Got " + + tokens[1].trim()); + } + + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value. Got " + + tokens[1].trim()); + } + + boolean matches = Arrays.stream(FileSystemCounterLimit.FSCounter.values()) + .anyMatch(e -> counterName.toUpperCase().contains(e.name().toUpperCase())); + if (matches) { + // this is file system counter, valid and create counter + FileSystemCounterLimit fsCounterLimit = FileSystemCounterLimit.fromName(counterName, counterValue); + return createExpression(fsCounterLimit); + } + + // look for exact matches for time based counter limits + matches = Arrays.stream(TimeCounterLimit.TimeCounter.values()) + .anyMatch(e -> counterName.equalsIgnoreCase(e.name())); + if (matches) { + TimeCounterLimit timeCounterLimit = new TimeCounterLimit( + TimeCounterLimit.TimeCounter.valueOf(counterName.toUpperCase()), counterValue); + return createExpression(timeCounterLimit); + } + } + + // unable to create expression at this point, invalid expression + throw new IllegalArgumentException("Invalid expression! " + expression); + } + + public static Expression createExpression(CounterLimit counterLimit) { + return new TriggerExpression(counterLimit, Expression.Predicate.GREATER_THAN); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/FileSystemCounterLimit.java new file mode 100644 index 0000000..8e70f9f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/FileSystemCounterLimit.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * File system specific counters with defined limits + */ +public class FileSystemCounterLimit implements CounterLimit { + + private static final String SCHEME_PATTERN = "[a-zA-Z][a-zA-Z0-9-.+]*"; + private static final String SCHEME_COUNTER_SEPARATOR = "_"; + + public enum FSCounter { + BYTES_READ, + BYTES_WRITTEN, + SHUFFLE_BYTES + } + + private String scheme; + private FSCounter fsCounter; + private long limit; + + public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + this.scheme = scheme == null ? "" : scheme.toUpperCase(); + this.fsCounter = fsCounter; + this.limit = limit; + } + + public static FileSystemCounterLimit fromName(final String counterName, final long limit) { + for (FSCounter fsCounter : FSCounter.values()) { + if (counterName.toUpperCase().contains(fsCounter.name())) { + if (counterName.toUpperCase().equals(fsCounter.name())) { + return new FileSystemCounterLimit(null, FSCounter.valueOf(counterName), limit); + } + + // validate scheme + // Java's URI scheme pattern is + // "The scheme component of a URI, if defined, only contains characters in the alphanum category and in the + // string "-.+". A scheme always starts with an alpha character." + Pattern pattern = Pattern.compile(SCHEME_PATTERN); + String[] tokens = counterName.toUpperCase().split(fsCounter.name()); + if (tokens.length == 1) { + final String scheme; + // HDFS_BYTES_READ after tokenizing will become "HDFS_", in such cases strip off the last '_' + if (tokens[0].contains(SCHEME_COUNTER_SEPARATOR) && tokens[0].endsWith(SCHEME_COUNTER_SEPARATOR)) { + scheme = tokens[0].substring(0, tokens[0].lastIndexOf(SCHEME_COUNTER_SEPARATOR)); + } else { + throw new IllegalArgumentException("Missing separator '_' between scheme and counter name."); + } + Matcher matcher = pattern.matcher(scheme); + if (matcher.matches()) { + return new FileSystemCounterLimit(scheme, FSCounter.valueOf(fsCounter.name()), limit); + } else { + throw new IllegalArgumentException("Invalid URI scheme " + scheme); + } + } + } + } + + throw new IllegalArgumentException("Invalid counter name specified " + counterName.toUpperCase() + ""); + } + + @Override + public String getName() { + return scheme.isEmpty() ? fsCounter.name() : scheme.toUpperCase() + "_" + fsCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public String toString() { + return "counter: " + getName() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * scheme.hashCode(); + hash += 31 * fsCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof FileSystemCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return scheme.equals(((FileSystemCounterLimit) other).scheme) && fsCounter.equals(( + (FileSystemCounterLimit) other).fsCounter) && limit == ((FileSystemCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/QueryRule.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/QueryRule.java new file mode 100644 index 0000000..c406835 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/QueryRule.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.Objects; + +/** + * Rule with query level scope that contains a name, trigger expression violating which defined action will be executed. + */ +public class QueryRule implements Rule { + private String name; + private Expression expression; + private Action action; + + public QueryRule(final String name, final Expression expression, final Action action) { + this.name = name; + this.expression = expression; + this.action = action; + } + + @Override + public String getName() { + return name; + } + + @Override + public Expression getExpression() { + return expression; + } + + @Override + public Action getAction() { + return action; + } + + @Override + public Scope getScope() { + return Scope.QUERY; + } + + @Override + public boolean apply(final long current) { + if (expression.evaluate(current)) { + switch (action) { + case KILL_QUERY: + return true; + case MOVE_TO_POOL: + // FIXME: act upon the Action + return true; + } + } + return false; + } + + @Override + public String toString() { + return "{ name: " + name + ", expression: " + expression + ", action: " + action + " }"; + } + + @Override + public int hashCode() { + int hash = name == null ? 31 : 31 * name.hashCode(); + hash += expression == null ? 31 * hash : 31 * hash * expression.hashCode(); + hash += action == null ? 31 * hash : 31 * hash * action.hashCode(); + return hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof QueryRule)) { + return false; + } + + if (other == this) { + return true; + } + + return Objects.equals(name, ((QueryRule) other).name) && + Objects.equals(expression, ((QueryRule) other).expression) && + Objects.equals(action, ((QueryRule) other).action); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Rule.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Rule.java new file mode 100644 index 0000000..63ba32d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Rule.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Rule interface which gets mapped to CREATE RULE .. queries. A rule can have a name, trigger and action. + * Trigger is a simple expression which gets evaluated during the lifecycle of query/session and executes an action + * if the expression defined in trigger evaluates to true. + */ +public interface Rule { + + public enum Action { + KILL_QUERY(""), + MOVE_TO_POOL(""); + + String poolName; + + Action(final String poolName) { + this.poolName = poolName; + } + + public Action setPoolName(final String poolName) { + this.poolName = poolName; + return this; + } + + public String getPoolName() { + return poolName; + } + } + + public enum Scope { + QUERY, + SESSSION, + GLOBAL + } + + /** + * Based on current value, returns true if rule is applied else false. + * + * @param current - current value + * @return true if rule got applied false otherwise + */ + public boolean apply(long current); + + /** + * Get trigger expression + * + * @return expression + */ + public Expression getExpression(); + + /** + * Return the name of the rule + * + * @return rule name + */ + public String getName(); + + /** + * Return the action that will get executed when trigger expression evaluates to true + * + * @return action + */ + public Action getAction(); + + + /** + * Return the scope of this rule + * + * @return scope + */ + public Scope getScope(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleFactory.java new file mode 100644 index 0000000..d5d4443 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * Factory for creating rules. + */ +public class RuleFactory { + // FIXME: This should be from metastore columns (CREATE RULE ... -> Metastore -> here) + public static Rule createRule(final String name, final Expression expression, final Rule.Action action, + final Rule.Scope scope) { + switch (scope) { + case QUERY: + return new QueryRule(name, expression, action); + // TODO: handle scope and global rules + } + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleViolationException.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleViolationException.java new file mode 100644 index 0000000..7655106 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/RuleViolationException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +/** + * When a rule is violated, this exception will be thrown. + */ +public class RuleViolationException extends RuntimeException { + + public RuleViolationException(Rule rule, String msg) { + super("Rule " + rule.toString() + " violated. " + msg); + } + + public RuleViolationException(final String ruleName, final String msg) { + super("Rule " + ruleName + " violated. " + msg); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TimeCounterLimit.java new file mode 100644 index 0000000..35be081 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TimeCounterLimit.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.util.StopWatch; + +/** + * Time based counters with limits + */ +public class TimeCounterLimit implements CounterLimit, Timer { + public enum TimeCounter { + ELAPSED_TIME_MS, + EXECUTION_TIME_MS + } + + private StopWatch stopWatch; + private TimeCounter timeCounter; + private long limit; + + public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + this.stopWatch = new StopWatch(); + this.timeCounter = timeCounter; + this.limit = limit; + start(); + } + + @Override + public String getName() { + return timeCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public void start() { + this.stopWatch.start(); + } + + @Override + public void stop() { + this.stopWatch.stop(); + } + + @Override + public void reset() { + this.stopWatch.reset(); + } + + @Override + public long elapsed(final TimeUnit timeUnit) { + return this.stopWatch.now(timeUnit); + } + + @Override + public String toString() { + return "counter: " + timeCounter.name() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * timeCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TimeCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return timeCounter.equals(((TimeCounterLimit) other).timeCounter) && limit == ((TimeCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Timer.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Timer.java new file mode 100644 index 0000000..d662790 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/Timer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.concurrent.TimeUnit; + +/** + * Stop watch timer interface + */ +public interface Timer { + /** + * Start the timer + */ + public void start(); + + /** + * Stop the timer + */ + public void stop(); + + /** + * Reset the timer + */ + public void reset(); + + /** + * Returned elapsed time requested timeunit + * + * @param timeUnit timeunit + * @return elapsed time + */ + public long elapsed(TimeUnit timeUnit); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TriggerExpression.java new file mode 100644 index 0000000..06bb5af --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/guardrail/TriggerExpression.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import java.util.Objects; + +/** + * Simple trigger expression for a rule. + */ +public class TriggerExpression implements Expression { + private CounterLimit counterLimit; + private Predicate predicate; + + public TriggerExpression(final CounterLimit counter, final Predicate predicate) { + this.counterLimit = counter; + this.predicate = predicate; + } + + @Override + public boolean evaluate(final long current) { + if (counterLimit.getLimit() > 0) { + if (predicate.equals(Predicate.GREATER_THAN)) { + return current > counterLimit.getLimit(); + } + } + return false; + } + + @Override + public CounterLimit getCounterLimit() { + return counterLimit; + } + + @Override + public Predicate getPredicate() { + return predicate; + } + + @Override + public String toString() { + return counterLimit.getName() + " " + predicate.getSymbol() + " " + counterLimit.getLimit(); + } + + @Override + public int hashCode() { + int hash = counterLimit == null ? 31 : 31 * counterLimit.hashCode(); + hash += predicate == null ? 31 * hash : 31 * hash * predicate.hashCode(); + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TriggerExpression)) { + return false; + } + + if (other == this) { + return true; + } + + return Objects.equals(counterLimit, ((TriggerExpression) other).counterLimit) && + Objects.equals(predicate, ((TriggerExpression) other).predicate); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/guardrail/TestRule.java b/ql/src/test/org/apache/hadoop/hive/ql/guardrail/TestRule.java new file mode 100644 index 0000000..f07e663 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/guardrail/TestRule.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.guardrail; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * + */ +public class TestRule { + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testSimpleQueryRule() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = RuleFactory.createRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = RuleFactory.createRule("hdfs_write_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + rule = RuleFactory.createRule("local_read_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = RuleFactory.createRule("local_write_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + rule = RuleFactory.createRule("shuffle_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME_MS, 10000)); + rule = RuleFactory.createRule("slow_query", expression, Rule.Action.MOVE_TO_POOL.setPoolName("fake_pool"), + Rule.Scope.QUERY); + assertEquals("counter: EXECUTION_TIME_MS limit: 10000", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(100000)); + } + + @Test + public void testExpressionFromString() { + Expression expression = ExpressionFactory.fromString("BYTES_READ>1024"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString("BYTES_READ > 1024"); + assertEquals(expected, expression); + + expression = ExpressionFactory.fromString(expected.toString()); + assertEquals(expected.toString(), expression.toString()); + + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString(" BYTES_READ > 1024 "); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString("BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_READ > 1024 "); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_READ > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" s3a_ByTeS_WRiTTeN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" EXECUTION_TIME_MS > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME_MS, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME_MS > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME_MS, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME_ms > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME_MS, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalExpressionsUnsupportedPredicate() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No valid predicates"); + ExpressionFactory.fromString("BYTES_READ < 1024"); + } + + @Test + public void testIllegalExpressionsMissingLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression. Expression missing counter or limit."); + ExpressionFactory.fromString("BYTES_READ >"); + } + + @Test + public void testIllegalExpressionsMissingCounter() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Counter name cannot be empty!"); + ExpressionFactory.fromString("> 1024"); + } + + @Test + public void testIllegalExpressionsMultipleLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression. Expression missing counter or limit."); + ExpressionFactory.fromString("BYTES_READ > 1024 > 1025"); + } + + @Test + public void testIllegalExpressionsMultipleCounters() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression. Expression missing counter or limit."); + ExpressionFactory.fromString("BYTES_READ > BYTES_READ > 1025"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPost() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal value for counter limit. Expected a long value."); + ExpressionFactory.fromString("BYTES_READ > 1024aaaa"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPre() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal value for counter limit. Expected a long value."); + ExpressionFactory.fromString("BYTES_READ > foo1024"); + } + + @Test + public void testIllegalExpressionsInvalidNegativeLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal value for counter limit. Expected a positive long value."); + ExpressionFactory.fromString("BYTES_READ > -1024"); + } + + @Test + public void testIllegalExpressionsInvalidCounter() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Missing separator '_' between scheme and counter name"); + ExpressionFactory.fromString("fooBYTES_READ > 1024"); + } + + @Test + public void testIllegalExpressionsInvalidCounterNoSeparator() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression!"); + ExpressionFactory.fromString("executiontimems > 1024"); + } + + @Test + public void testIllegalExpressionsInvalidCounterMultipleSeparator() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid URI scheme"); + ExpressionFactory.fromString("HDFS__BYTES_READ > 1024"); + } + + @Test + public void testIllegalExpressionsInvalidScheme() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid URI scheme"); + ExpressionFactory.fromString("HD/FS_BYTES_READ > 1024"); + } +}