diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a906ce..70052cb 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.conf; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -3378,6 +3379,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + HIVE_RULE_VALIDATION_INTERVAL_MS("hive.rule.validation.interval.ms", "500ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Interval for validating rules during execution of a query. Rules defined in resource plan will get validated\n" + + "for all SQL operations after every defined interval (default: 200ms) and corresponding action defined in\n" + + "the rule will be taken"), + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, "Whether to use operator stats to determine reducer parallelism for Hive on Spark.\n" + "If this is false, Hive will use source table stats to determine reducer\n" + @@ -3923,6 +3930,18 @@ public static long toSizeBytes(String value) { return new String[] {value.substring(0, i), value.substring(i)}; } + private static Set daysSet = ImmutableSet.of("d", "D", "day", "DAY", "days", "DAYS"); + private static Set hoursSet = ImmutableSet.of("h", "H", "hour", "HOUR", "hours", "HOURS"); + private static Set minutesSet = ImmutableSet.of("m", "M", "min", "MIN", "minute", "MINUTE", + "minutes", "MINUTES"); + private static Set secondsSet = ImmutableSet.of("s", "S", "sec", "SEC", "second", "SECOND", + "seconds", "SECONDS"); + private static Set millisSet = ImmutableSet.of("ms", "MS", "msec", "MSEC", "millisecond", "MILLISECOND", + "milliseconds", "MILLISECONDS"); + private static Set microsSet = ImmutableSet.of("us", "US", "usec", "USEC", "microsecond", "MICROSECOND", + "microseconds", "MICROSECONDS"); + private static Set nanosSet = ImmutableSet.of("ns", "NS", "nsec", "NSEC", "nanosecond", "NANOSECOND", + "nanoseconds", "NANOSECONDS"); public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { unit = unit.trim().toLowerCase(); if (unit.isEmpty() || unit.equals("l")) { @@ -3930,19 +3949,19 @@ public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { throw new IllegalArgumentException("Time unit is not specified"); } return defaultUnit; - } else if (unit.equals("d") || unit.startsWith("day")) { + } else if (daysSet.contains(unit)) { return TimeUnit.DAYS; - } else if (unit.equals("h") || unit.startsWith("hour")) { + } else if (hoursSet.contains(unit)) { return TimeUnit.HOURS; - } else if (unit.equals("m") || unit.startsWith("min")) { + } else if (minutesSet.contains(unit)) { return TimeUnit.MINUTES; - } else if (unit.equals("s") || unit.startsWith("sec")) { + } else if (secondsSet.contains(unit)) { return TimeUnit.SECONDS; - } else if (unit.equals("ms") || unit.startsWith("msec")) { + } else if (millisSet.contains(unit)) { return TimeUnit.MILLISECONDS; - } else if (unit.equals("us") || unit.startsWith("usec")) { + } else if (microsSet.contains(unit)) { return TimeUnit.MICROSECONDS; - } else if (unit.equals("ns") || unit.startsWith("nsec")) { + } else if (nanosSet.contains(unit)) { return TimeUnit.NANOSECONDS; } throw new IllegalArgumentException("Invalid time unit " + unit); diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index bf600c2..2825a9a 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -198,6 +198,12 @@ ${hadoop.version} tests test + + + io.netty + netty-all + + org.apache.hadoop diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 4a9af80..d94cb44 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hive.common.util.ReflectionUtil; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.datanucleus.ClassLoaderResolver; @@ -89,6 +90,19 @@ private static String testUdfClassName = "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd"; + public static class SleepMsUDF extends UDF { + + public Integer evaluate(final Integer value, final Integer ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // no-op + } + + return value; + } + } + @BeforeClass public static void setupBeforeClass() throws Exception { MiniHS2.cleanupLocalDir(); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 28fa7a5..6df8c03 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -18,15 +18,12 @@ package org.apache.hive.jdbc; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.File; -import java.lang.reflect.Field; import java.math.BigDecimal; import java.net.URL; import java.sql.Connection; @@ -34,73 +31,56 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLTimeoutException; import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.FieldDesc; -import org.apache.hadoop.hive.llap.LlapRowRecordReader; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.llap.LlapRowInputFormat; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleFactory; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; - +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; -import org.apache.hadoop.hive.llap.LlapRowInputFormat; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; - -import org.datanucleus.ClassLoaderResolver; -import org.datanucleus.NucleusContext; -import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; -import org.datanucleus.AbstractNucleusContext; +import org.apache.hive.service.Service; +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.operation.Operation; +import org.apache.hive.service.cli.operation.SQLOperation; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Lists; + public class TestJdbcWithMiniLlap { private static MiniHS2 miniHS2 = null; private static String dataFileDir; private static Path kvDataFilePath; private static Path dataTypesFilePath; - private static final String tmpDir = System.getProperty("test.tmp.dir"); + private static String ruleTestTable = "testtab1"; + private static String ruleQuery = "select t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; + private static String ruleQueryWithSleep = "select sleep(t1.under_col, 500), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; private static HiveConf conf = null; private Connection hs2Conn = null; @@ -117,6 +97,8 @@ public static void beforeTest() throws Exception { conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_RULE_VALIDATION_INTERVAL_MS, 200, TimeUnit.MILLISECONDS); conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); @@ -438,6 +420,217 @@ public void testDataTypes() throws Exception { assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); } + @Test(timeout = 60000) + public void testRuleElapsedTimeWithSmallerQueryTimeout() throws Exception { + int elapsedTimeoutMs = 6000; + int queryTimeoutSec = 3; // query timeout will take precedence, SQLTimeoutException will be thrown + Rule rule = RuleFactory.createRule("slow_query", + ExpressionFactory.fromString("ELAPSED_TIME > " + elapsedTimeoutMs), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, "Query timed out after 3 seconds", queryTimeoutSec, + SQLTimeoutException.class); + } + + @Test(timeout = 60000) + public void testRuleElapsedTimeWithHigherQueryTimeout() throws Exception { + int elapsedTimeoutMs = 4000; // elapsed timeout will take precendence, SQLException will be thrown + int queryTimeoutSec = 8; + Rule rule = RuleFactory.createRule("slow_query", + ExpressionFactory.fromString("ELAPSED_TIME > " + elapsedTimeoutMs), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, "Query was cancelled", queryTimeoutSec, SQLException.class); + } + + @Test(timeout = 60000) + public void testRuleSlowQueryElapsedTime() throws Exception { + Rule rule = RuleFactory.createRule("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleSlowQueryExecutionTime() throws Exception { + Rule rule = RuleFactory.createRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 3000"), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, rule.toString() + " violated"); + } + + @Test(timeout = 120000) + // FIXME: when all tests are run together, sometimes shuffle fetcher error "Connection Refused" to Http endpoint + // happens which will fail this test case. Running separately doesn't fail this test. Try with ptest to validate? + public void testRuleHighShuffleBytes() throws Exception { + Rule rule = RuleFactory.createRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 512"), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + runQueryWithRule(ruleQuery, rule, rule.toString() + " violated"); + } + + @Test(timeout = 60000) + public void testRuleHighBytesRead() throws Exception { + Rule rule = RuleFactory.createRule("big_read", ExpressionFactory.fromString("FILE_BYTES_READ > 100"), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + runQueryWithRule(ruleQuery, rule, rule.toString() + " violated"); + } + + @Test(timeout = 60000) + public void testRuleHighBytesWrite() throws Exception { + Rule rule = RuleFactory.createRule("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 512"), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + runQueryWithRule(ruleQuery, rule, rule.toString() + " violated"); + } + + @Test(timeout = 120000) + // FIXME: when all tests are run together, sometimes shuffle fetcher error "Connection Refused" to Http endpoint + // happens which will fail this test case. Running separately doesn't fail this test. Try with ptest to validate? + public void testRuleElapsedTimeMultipleSet() throws Exception { + Rule firstRule = RuleFactory.createRule("slow_query", + ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + List rules = Lists.newArrayList(firstRule); + + Connection firstCon = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + final Statement firstStmt = firstCon.createStatement(); + final Throwable[] firstThrowable = new Throwable[1]; + Thread firstThread = new Thread(() -> { + try { + firstStmt.executeQuery(ruleQuery); + } catch (SQLException e) { + firstThrowable[0] = e; + } + }); + firstThread.start(); + + Thread.sleep(2000); + Collection services = miniHS2.getServices(); + for (Service service : services) { + if (service instanceof CLIService) { + Collection operations = ((CLIService) service).getSessionManager().getOperations(); + for (Operation operation : operations) { + if (operation instanceof SQLOperation) { + ((SQLOperation) operation).setRules(rules); + } + } + } + } + + Rule secondRule = RuleFactory.createRule("slow_query_updated", + ExpressionFactory.fromString("ELAPSED_TIME > 50000"), + Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + rules = Lists.newArrayList(secondRule); + + Connection secondCon = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + final Statement secondStmt = secondCon.createStatement(); + final Throwable[] secondThrowable = new Throwable[1]; + Thread secondThread = new Thread(() -> { + try { + secondStmt.executeQuery("select sleep(t1.under_col, 500), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"); + } catch (SQLException e) { + secondThrowable[0] = e; + } + }); + secondThread.start(); + + Thread.sleep(2000); + services = miniHS2.getServices(); + for (Service service : services) { + if (service instanceof CLIService) { + Collection operations = ((CLIService) service).getSessionManager().getOperations(); + for (Operation operation : operations) { + if (operation instanceof SQLOperation) { + ((SQLOperation) operation).setRules(rules); + } + } + } + } + + for (Service service : services) { + if (service instanceof CLIService) { + Collection operations = ((CLIService) service).getSessionManager().getOperations(); + for (Operation operation : operations) { + if (operation instanceof SQLOperation) { + if (!operation.isDone()) { + assertNotNull("Expecting rules to be not-null", ((SQLOperation) operation).getRules()); + assertTrue("Expecting " + secondRule + " to be contained in set of rules set for SQL operation", + ((SQLOperation) operation).getRules().contains(secondRule)); + } + } + } + } + } + + firstThread.join(); + secondThread.join(); + firstStmt.close(); + secondStmt.close(); + + assertEquals("Expected null throwable for first query", null, firstThrowable[0]); + assertNotNull("Expected non-null throwable for second query", secondThrowable[0]); + assertEquals(secondThrowable[0].getClass(), SQLException.class); + String expect = "Query was cancelled"; + assertTrue(expect + " is not contained in " + secondThrowable[0].getMessage(), + secondThrowable[0].getMessage().contains(expect)); + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + private void runQueryWithRule(final String query, final Rule rule, final String expect, int queryTimeout, + final Class exceptionClass) + throws Exception { + + List rules = Lists.newArrayList(rule); + Connection con = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + selStmt.setQueryTimeout(queryTimeout); + selStmt.executeQuery(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + Thread.sleep(2000); + Collection services = miniHS2.getServices(); + for (Service service : services) { + if (service instanceof CLIService) { + Collection operations = ((CLIService) service).getSessionManager().getOperations(); + for (Operation operation : operations) { + if (operation instanceof SQLOperation) { + ((SQLOperation) operation).setRules(rules); + } + } + } + } + + queryThread.join(); + selStmt.close(); + + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(exceptionClass, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + + private void runQueryWithRule(final String query, final Rule rule, final String expect) + throws Exception { + runQueryWithRule(query, rule, expect, 0, SQLException.class); + } + private interface RowProcessor { void process(Row row); } diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 06ddc22..636ea64 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -553,4 +554,8 @@ public static void cleanupLocalDir() throws IOException { // Ignore. Safe if it does not exist. } } + + public Collection getServices() { + return hiveServer2.getServices(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index ed091ae..579dfdd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql; +import static java.util.stream.Collectors.toList; + import java.io.DataInput; import java.io.FileNotFoundException; import java.io.IOException; @@ -32,6 +34,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.antlr.runtime.TokenRewriteStream; import org.apache.hadoop.conf.Configuration; @@ -45,6 +48,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.wm.Rule; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; @@ -64,6 +68,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableList; + /** * Context for Semantic Analyzers. Usage: not reusable - construct a new one for * each query should call clear() at end of use to remove temporary folders @@ -136,6 +142,9 @@ // Identify whether the query involves an UPDATE, DELETE or MERGE private boolean isUpdateDeleteMerge; + // list of rules that can trigger an action + private List rules; + /** * This determines the prefix of the * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.Phase1Ctx#dest} @@ -145,6 +154,8 @@ */ private Map insertBranchToNamePrefix = new HashMap<>(); private Operation operation = Operation.OTHER; + private long ruleValidateIntervalMs = 0L; + public void setOperation(Operation operation) { this.operation = operation; } @@ -847,6 +858,23 @@ public void setHiveLocks(List hiveLocks) { this.hiveLocks = hiveLocks; } + public synchronized List getRules() { + return rules; + } + + public synchronized void setRules(final List rules) { + // make a clone during set as getRules() is more frequent than updates to rules + this.rules = rules.stream().map(Rule::clone).collect(toList()); + } + + public void setRuleValidateInterval(final long validateIntervalMs) { + this.ruleValidateIntervalMs = validateIntervalMs; + } + + public long getRuleValidateInterval() { + return ruleValidateIntervalMs; + } + public HiveTxnManager getHiveTxnManager() { return hiveTxnManager; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f..6a11eb4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.wm.Rule; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; @@ -2447,4 +2448,25 @@ public void resetQueryState() { releaseResources(); this.queryState = getNewQueryState(queryState.getConf()); } + + + public void setRules(final List rules) { + if (ctx != null) { + ctx.setRules(rules); + } + } + + public List getRules() { + if (ctx != null) { + return ctx.getRules(); + } + + return null; + } + + public void setRuleValidationInterval(final long ruleValidationInterval) { + if (ctx != null) { + ctx.setRuleValidateInterval(ruleValidationInterval); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 9e2846c..2232343 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -26,6 +26,9 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleViolationException; +import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -41,15 +44,18 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InterruptedIOException; import java.io.StringWriter; -import java.util.HashSet; +import java.util.EnumSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; @@ -61,6 +67,7 @@ Licensed to the Apache Software Foundation (ASF) under one public class TezJobMonitor { static final String CLASS_NAME = TezJobMonitor.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private static final int MIN_CHECK_INTERVAL = 200; private static final int MAX_CHECK_INTERVAL = 1000; private static final int MAX_RETRY_INTERVAL = 2500; @@ -128,6 +135,7 @@ private boolean isProfilingEnabled() { public int monitorExecution() { boolean done = false; boolean success = false; + boolean ruleViolated = false; int failedCounter = 0; final StopWatch failureTimer = new StopWatch(); int rc = 0; @@ -144,7 +152,8 @@ public int monitorExecution() { DAGStatus.State lastState = null; boolean running = false; - int checkInterval = MIN_CHECK_INTERVAL; + long checkInterval = context.getRuleValidateInterval() > 0 ? context.getRuleValidateInterval() : MIN_CHECK_INTERVAL; + LOG.info("Check interval: {} ms", checkInterval); while (true) { try { @@ -152,7 +161,14 @@ public int monitorExecution() { context.checkHeartbeaterLockException(); } - status = dagClient.getDAGStatus(new HashSet(), checkInterval); + status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); + List rules = context.getRules(); + if (rules != null && !rules.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Validating rules: {}", rules); + } + validateRuleViolations(status.getDAGCounters(), dagClient, context.getRules()); + } vertexProgressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); @@ -220,10 +236,18 @@ public int monitorExecution() { failureTimer.reset(); failureTimer.start(); } - if (isInterrupted + if (e instanceof RuleViolationException) { + // no retries + failedCounter = MAX_RETRY_FAILURES; + ruleViolated = true; + console.printError(e.getMessage()); + diagnostics.append(e.getMessage()); + // no useful stacktrace from rule violation except for the message, so no rethrow + } + if (isInterrupted || ruleViolated || (++failedCounter >= MAX_RETRY_FAILURES && failureTimer.now(TimeUnit.MILLISECONDS) > MAX_RETRY_INTERVAL)) { try { - if (isInterrupted) { + if (isInterrupted || ruleViolated) { console.printInfo("Killing DAG..."); } else { console.printInfo(String.format("Killing DAG... after %d seconds", @@ -233,8 +257,10 @@ public int monitorExecution() { } catch (IOException | TezException tezException) { // best effort } - console - .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + // on rule violation, no useful stack trace is available + if (!ruleViolated) { + console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + } rc = 1; done = true; } else { @@ -244,8 +270,11 @@ public int monitorExecution() { if (done) { if (rc != 0 && status != null) { for (String diag : status.getDiagnostics()) { - console.printError(diag); - diagnostics.append(diag); + // on rule violation dag status will not have any useful diagnostics + if (!ruleViolated) { + console.printError(diag); + diagnostics.append(diag); + } } } synchronized (shutdownList) { @@ -261,6 +290,46 @@ public int monitorExecution() { return rc; } + private void validateRuleViolations(final TezCounters dagCounters, final DAGClient dagClient, final List rules) + throws RuleViolationException { + if (dagCounters == null || dagClient == null || rules == null) { + return; + } + + for (Rule rule : rules) { + String desiredCounter = rule.getExpression().getCounterLimit().getName(); + if (desiredCounter.equals(TimeCounterLimit.TimeCounter.EXECUTION_TIME.name())) { + if (executionStartTime > 0) { + final long elapsed = System.currentTimeMillis() - executionStartTime; + if (rule.apply(elapsed)) { + final long limit = rule.getExpression().getCounterLimit().getLimit(); + String msg = "Current value: " + elapsed + " Limit: " + limit + "\n"; + if (rule.getAction().equals(Rule.Action.KILL_QUERY)) { + // can only throw unchecked exception from lambda + throw new RuleViolationException(rule, msg); + } + } + } + } else { + StreamSupport.stream(dagCounters.spliterator(), false) + .filter(cg -> cg.getUnderlyingGroup().findCounter(desiredCounter, false) != null) + .filter(cg -> cg.findCounter(desiredCounter).getValue() > 0) + .findFirst() + .map(mg -> mg.findCounter(desiredCounter).getValue()) + .ifPresent(l -> { + if (rule.apply(l)) { + String msg = "Current value: " + l + " Limit: " + + rule.getExpression().getCounterLimit().getLimit() + "\n"; + if (rule.getAction().equals(Rule.Action.KILL_QUERY)) { + // can only throw unchecked exception from lambda + throw new RuleViolationException(rule, msg); + } + } + }); + } + } + } + private void printSummary(boolean success, Map progressMap) { if (isProfilingEnabled() && success && progressMap != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java new file mode 100644 index 0000000..0fc5eaa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Counter limit interface for defining counter name and its limit exceeding which action defined in trigger will get + * executed. + */ +public interface CounterLimit { + /** + * Get the name of the counter. + * + * @return name + */ + String getName(); + + /** + * Get the threshold value for the counter + * + * @return limit + */ + long getLimit(); + + /** + * Return cloned copy of this counter limit + * + * @return cloned copy + */ + CounterLimit clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java new file mode 100644 index 0000000..add933f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Expression that is defined in triggers. + * Most expressions will get triggered only after exceeding a limit. As a result, only greater than (>) expression + * is supported. + */ +public interface Expression { + + enum Predicate { + GREATER_THAN(">"); + + String symbol; + + Predicate(final String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + } + + interface Builder { + Builder greaterThan(CounterLimit counter); + + Expression build(); + } + + /** + * Evaluate current value against this expression. Return true if expression evaluates to true (current > limit) + * else false otherwise + * + * @param current - current value against which expression will be evaluated + * @return + */ + boolean evaluate(final long current); + + /** + * Return counter limit + * + * @return counter limit + */ + CounterLimit getCounterLimit(); + + /** + * Return predicate defined in the expression. + * + * @return predicate + */ + Predicate getPredicate(); + + /** + * Return cloned copy of this expression. + * + * @return cloned copy + */ + Expression clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java new file mode 100644 index 0000000..0e9764b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.Validator; + +/** + * Factory to create expressions + */ +public class ExpressionFactory { + + public static Expression fromString(final String expression) { + if (expression == null || expression.isEmpty()) { + return null; + } + + // TODO: Only ">" predicate is supported right now, this has to be extended to support expression tree when + // multiple conditions are required. + Expression.Predicate[] predicates = Expression.Predicate.values(); + boolean noMatch = Arrays.stream(predicates).noneMatch(e -> expression.contains(e.getSymbol())); + if (noMatch) { + throw new IllegalArgumentException("No valid predicates " + Arrays.toString(predicates) + + " found in expression (" + expression + ")"); + } + + Expression.Predicate matchingPredicate = Arrays.stream(predicates) + .filter(e -> expression.contains(e.getSymbol())) + .findFirst() + .orElse(null); + if (matchingPredicate != null) { + final String[] tokens = expression.split(matchingPredicate.getSymbol()); + if (tokens.length != 2) { + throw new IllegalArgumentException("Invalid expression. Expression missing counter or limit."); + } + + final String counterName = tokens[0].trim(); + final String counterValueStr = tokens[1].trim(); + if (counterName.isEmpty()) { + throw new IllegalArgumentException("Counter name cannot be empty!"); + } + + boolean matches = Arrays.stream(FileSystemCounterLimit.FSCounter.values()) + .anyMatch(e -> counterName.toUpperCase().contains(e.name().toUpperCase())); + if (matches) { + long counterValue; + try { + counterValue = getCounterValue(counterValueStr, new Validator.SizeValidator()); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + // this is file system counter, valid and create counter + FileSystemCounterLimit fsCounterLimit = FileSystemCounterLimit.fromName(counterName, counterValue); + return createExpression(fsCounterLimit); + } + + // look for exact matches for time based counter limits + matches = Arrays.stream(TimeCounterLimit.TimeCounter.values()) + .anyMatch(e -> counterName.equalsIgnoreCase(e.name())); + if (matches) { + long counterValue; + try { + counterValue = getCounterValue(counterValueStr, new Validator.TimeValidator(TimeUnit.MILLISECONDS)); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + TimeCounterLimit timeCounterLimit = new TimeCounterLimit( + TimeCounterLimit.TimeCounter.valueOf(counterName.toUpperCase()), counterValue); + return createExpression(timeCounterLimit); + } + } + + // unable to create expression at this point, invalid expression + throw new IllegalArgumentException("Invalid expression! " + expression); + } + + private static long getCounterValue(final String counterValueStr, final Validator validator) throws + NumberFormatException { + long counter; + try { + counter = Long.parseLong(counterValueStr); + } catch (NumberFormatException e) { + if (validator instanceof Validator.SizeValidator) { + counter = HiveConf.toSizeBytes(counterValueStr); + } else if (validator instanceof Validator.TimeValidator) { + counter = HiveConf.toTime(counterValueStr, TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS); + } else { + throw e; + } + } + return counter; + } + + public static Expression createExpression(CounterLimit counterLimit) { + return new TriggerExpression(counterLimit, Expression.Predicate.GREATER_THAN); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java new file mode 100644 index 0000000..d005e3e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * File system specific counters with defined limits + */ +public class FileSystemCounterLimit implements CounterLimit { + + private static final String SCHEME_PATTERN = "[a-zA-Z][a-zA-Z0-9-.+]*"; + private static final String SCHEME_COUNTER_SEPARATOR = "_"; + + public enum FSCounter { + BYTES_READ, + BYTES_WRITTEN, + SHUFFLE_BYTES + } + + private String scheme; + private FSCounter fsCounter; + private long limit; + + public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + this.scheme = scheme == null || scheme.isEmpty() ? "" : scheme.toUpperCase(); + this.fsCounter = fsCounter; + this.limit = limit; + } + + public static FileSystemCounterLimit fromName(final String counterName, final long limit) { + for (FSCounter fsCounter : FSCounter.values()) { + if (counterName.toUpperCase().contains(fsCounter.name())) { + if (counterName.toUpperCase().equals(fsCounter.name())) { + return new FileSystemCounterLimit(null, FSCounter.valueOf(counterName), limit); + } + + // validate scheme + // Java's URI scheme pattern is + // "The scheme component of a URI, if defined, only contains characters in the alphanum category and in the + // string "-.+". A scheme always starts with an alpha character." + Pattern pattern = Pattern.compile(SCHEME_PATTERN); + String[] tokens = counterName.toUpperCase().split(fsCounter.name()); + if (tokens.length == 1) { + final String scheme; + // HDFS_BYTES_READ after tokenizing will become "HDFS_", in such cases strip off the last '_' + if (tokens[0].contains(SCHEME_COUNTER_SEPARATOR) && tokens[0].endsWith(SCHEME_COUNTER_SEPARATOR)) { + scheme = tokens[0].substring(0, tokens[0].lastIndexOf(SCHEME_COUNTER_SEPARATOR)); + } else { + throw new IllegalArgumentException("Missing separator '_' between scheme and counter name."); + } + Matcher matcher = pattern.matcher(scheme); + if (matcher.matches()) { + return new FileSystemCounterLimit(scheme, FSCounter.valueOf(fsCounter.name()), limit); + } else { + throw new IllegalArgumentException("Invalid URI scheme " + scheme); + } + } + } + } + + throw new IllegalArgumentException("Invalid counter name specified " + counterName.toUpperCase() + ""); + } + + @Override + public String getName() { + return scheme.isEmpty() ? fsCounter.name() : scheme.toUpperCase() + "_" + fsCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new FileSystemCounterLimit(scheme, fsCounter, limit); + } + + @Override + public String toString() { + return "counter: " + getName() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * scheme.hashCode(); + hash += 31 * fsCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof FileSystemCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return scheme.equals(((FileSystemCounterLimit) other).scheme) && fsCounter.equals(( + (FileSystemCounterLimit) other).fsCounter) && limit == ((FileSystemCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java new file mode 100644 index 0000000..9fe24be --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Rule with query level scope that contains a name, trigger expression violating which defined action will be executed. + */ +public class QueryRule implements Rule { + private String name; + private Expression expression; + private Action action; + + public QueryRule(final String name, final Expression expression, final Action action) { + this.name = name; + this.expression = expression; + this.action = action; + } + + @Override + public String getName() { + return name; + } + + @Override + public Expression getExpression() { + return expression; + } + + @Override + public Action getAction() { + return action; + } + + @Override + public Scope getScope() { + return Scope.QUERY; + } + + @Override + public Rule clone() { + return new QueryRule(name, expression.clone(), action); + } + + @Override + public boolean apply(final long current) { + if (expression.evaluate(current)) { + switch (action) { + case KILL_QUERY: + return true; + case MOVE_TO_POOL: + // FIXME: act upon the Action + return true; + } + } + return false; + } + + @Override + public String toString() { + return "{ name: " + name + ", expression: " + expression + ", action: " + action + " }"; + } + + @Override + public int hashCode() { + int hash = name == null ? 31 : 31 * name.hashCode(); + hash += expression == null ? 31 * hash : 31 * hash * expression.hashCode(); + hash += action == null ? 31 * hash : 31 * hash * action.hashCode(); + return hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof QueryRule)) { + return false; + } + + if (other == this) { + return true; + } + + return Objects.equals(name, ((QueryRule) other).name) && + Objects.equals(expression, ((QueryRule) other).expression) && + Objects.equals(action, ((QueryRule) other).action); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java new file mode 100644 index 0000000..f3897fa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Rule interface which gets mapped to CREATE RULE .. queries. A rule can have a name, trigger and action. + * Trigger is a simple expression which gets evaluated during the lifecycle of query/session and executes an action + * if the expression defined in trigger evaluates to true. + */ +public interface Rule { + + enum Action { + KILL_QUERY(""), + MOVE_TO_POOL(""); + + String poolName; + + Action(final String poolName) { + this.poolName = poolName; + } + + public Action setPoolName(final String poolName) { + this.poolName = poolName; + return this; + } + + public String getPoolName() { + return poolName; + } + } + + enum Scope { + QUERY, + SESSSION, + GLOBAL + } + + /** + * Based on current value, returns true if rule is applied else false. + * + * @param current - current value + * @return true if rule got applied false otherwise + */ + boolean apply(long current); + + /** + * Get trigger expression + * + * @return expression + */ + Expression getExpression(); + + /** + * Return the name of the rule + * + * @return rule name + */ + String getName(); + + /** + * Return the action that will get executed when trigger expression evaluates to true + * + * @return action + */ + Action getAction(); + + + /** + * Return the scope of this rule + * + * @return scope + */ + Scope getScope(); + + /** + * Return cloned copy of this rule + * + * @return clone copy + */ + Rule clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFactory.java new file mode 100644 index 0000000..48c92ee --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Factory for creating rules. + */ +public class RuleFactory { + // FIXME: This should be from metastore columns (CREATE RULE ... -> Metastore -> here) + public static Rule createRule(final String name, final Expression expression, final Rule.Action action, + final Rule.Scope scope) { + switch (scope) { + case QUERY: + return new QueryRule(name, expression, action); + // TODO: handle scope and global rules + } + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleViolationException.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleViolationException.java new file mode 100644 index 0000000..5e02ec1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleViolationException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * When a rule is violated, this exception will be thrown. + */ +public class RuleViolationException extends RuntimeException { + + public RuleViolationException(Rule rule, String msg) { + super("Rule " + rule.toString() + " violated. " + msg); + } + + public RuleViolationException(final String ruleName, final String msg) { + super("Rule " + ruleName + " violated. " + msg); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java new file mode 100644 index 0000000..687fa5e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Time based counters with limits + */ +public class TimeCounterLimit implements CounterLimit { + public enum TimeCounter { + ELAPSED_TIME, + EXECUTION_TIME + } + + private TimeCounter timeCounter; + private long limit; + + public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + this.timeCounter = timeCounter; + this.limit = limit; + } + + @Override + public String getName() { + return timeCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new TimeCounterLimit(timeCounter, limit); + } + + @Override + public String toString() { + return "counter: " + timeCounter.name() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * timeCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TimeCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return timeCounter.equals(((TimeCounterLimit) other).timeCounter) && limit == ((TimeCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java new file mode 100644 index 0000000..065ab79 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Simple trigger expression for a rule. + */ +public class TriggerExpression implements Expression { + private CounterLimit counterLimit; + private Predicate predicate; + + public TriggerExpression(final CounterLimit counter, final Predicate predicate) { + this.counterLimit = counter; + this.predicate = predicate; + } + + @Override + public boolean evaluate(final long current) { + if (counterLimit.getLimit() > 0) { + if (predicate.equals(Predicate.GREATER_THAN)) { + return current > counterLimit.getLimit(); + } + } + return false; + } + + @Override + public CounterLimit getCounterLimit() { + return counterLimit; + } + + @Override + public Predicate getPredicate() { + return predicate; + } + + @Override + public Expression clone() { + return new TriggerExpression(counterLimit.clone(), predicate); + } + + @Override + public String toString() { + return counterLimit.getName() + " " + predicate.getSymbol() + " " + counterLimit.getLimit(); + } + + @Override + public int hashCode() { + int hash = counterLimit == null ? 31 : 31 * counterLimit.hashCode(); + hash += predicate == null ? 31 * hash : 31 * hash * predicate.hashCode(); + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TriggerExpression)) { + return false; + } + + if (other == this) { + return true; + } + + return Objects.equals(counterLimit, ((TriggerExpression) other).counterLimit) && + Objects.equals(predicate, ((TriggerExpression) other).predicate); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java new file mode 100644 index 0000000..d121aa3 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * + */ +public class TestRule { + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testSimpleQueryRule() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = RuleFactory.createRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = RuleFactory.createRule("hdfs_write_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + rule = RuleFactory.createRule("local_read_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = RuleFactory.createRule("local_write_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + rule = RuleFactory.createRule("shuffle_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 10000)); + rule = RuleFactory.createRule("slow_query", expression, Rule.Action.MOVE_TO_POOL.setPoolName("fake_pool"), + Rule.Scope.QUERY); + assertEquals("counter: EXECUTION_TIME limit: 10000", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(100000)); + } + + @Test + public void testExpressionFromString() { + Expression expression = ExpressionFactory.fromString("BYTES_READ>1024"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString("BYTES_READ > 1024"); + assertEquals(expected, expression); + + expression = ExpressionFactory.fromString(expected.toString()); + assertEquals(expected.toString(), expression.toString()); + + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString(" BYTES_READ > 1024 "); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString("BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_READ > 1024 "); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_READ > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" s3a_ByTeS_WRiTTeN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" EXECUTION_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testSizeValidationInRule() { + Expression expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100MB"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 gB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 TB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024L * 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100 B"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100bytes"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalSizeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300GiB"); + } + + @Test + public void testIllegalSizeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300 foo"); + } + + @Test + public void testTimeValidationInRule() { + Expression expression = ExpressionFactory.fromString(" elapsed_TIME > 300 s"); + Expression expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300s"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300000000 microseconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 1DAY"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 24 * 60 * 60 * 1000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalTimeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300 light years"); + } + + @Test + public void testIllegalTimeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300secTOR"); + } + + @Test + public void testRuleClone() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = RuleFactory.createRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + Rule clonedRule = rule.clone(); + assertNotEquals(System.identityHashCode(rule), System.identityHashCode(clonedRule)); + assertNotEquals(System.identityHashCode(rule.getExpression()), System.identityHashCode(clonedRule.getExpression())); + assertNotEquals(System.identityHashCode(rule.getExpression().getCounterLimit()), + System.identityHashCode(clonedRule.getExpression().getCounterLimit())); + assertEquals(rule, clonedRule); + assertEquals(rule.hashCode(), clonedRule.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + rule = RuleFactory.createRule("slow_query", expression, Rule.Action.KILL_QUERY, Rule.Scope.QUERY); + clonedRule = rule.clone(); + assertNotEquals(System.identityHashCode(rule), System.identityHashCode(clonedRule)); + assertNotEquals(System.identityHashCode(rule.getExpression()), System.identityHashCode(clonedRule.getExpression())); + assertNotEquals(System.identityHashCode(rule.getExpression().getCounterLimit()), + System.identityHashCode(clonedRule.getExpression().getCounterLimit())); + assertEquals(rule, clonedRule); + assertEquals(rule.hashCode(), clonedRule.hashCode()); + } + + @Test + public void testIllegalExpressionsUnsupportedPredicate() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No valid predicates"); + ExpressionFactory.fromString("BYTES_READ < 1024"); + } + + @Test + public void testIllegalExpressionsMissingLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression. Expression missing counter or limit."); + ExpressionFactory.fromString("BYTES_READ >"); + } + + @Test + public void testIllegalExpressionsMissingCounter() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Counter name cannot be empty!"); + ExpressionFactory.fromString("> 1024"); + } + + @Test + public void testIllegalExpressionsMultipleLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression. Expression missing counter or limit."); + ExpressionFactory.fromString("BYTES_READ > 1024 > 1025"); + } + + @Test + public void testIllegalExpressionsMultipleCounters() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression. Expression missing counter or limit."); + ExpressionFactory.fromString("BYTES_READ > BYTES_READ > 1025"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPost() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString("BYTES_READ > 1024aaaa"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPre() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid counter value"); + ExpressionFactory.fromString("BYTES_READ > foo1024"); + } + + @Test + public void testIllegalExpressionsInvalidNegativeLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal value for counter limit. Expected a positive long value."); + ExpressionFactory.fromString("BYTES_READ > -1024"); + } + + @Test + public void testIllegalExpressionsInvalidCounter() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Missing separator '_' between scheme and counter name"); + ExpressionFactory.fromString("fooBYTES_READ > 1024"); + } + + @Test + public void testIllegalExpressionsInvalidCounterNoSeparator() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid expression!"); + ExpressionFactory.fromString("executiontimems > 1024"); + } + + @Test + public void testIllegalExpressionsInvalidCounterMultipleSeparator() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid URI scheme"); + ExpressionFactory.fromString("HDFS__BYTES_READ > 1024"); + } + + @Test + public void testIllegalExpressionsInvalidScheme() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid URI scheme"); + ExpressionFactory.fromString("HD/FS_BYTES_READ > 1024"); + } +} diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 773dd51..76fdd84 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -60,6 +60,8 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -98,8 +100,11 @@ private volatile MetricsScope currentSQLStateScope; private QueryInfo queryInfo; private long queryTimeout; - private ScheduledExecutorService timeoutExecutor; + private long elapsedTimeout; + private ScheduledExecutorService queryTimeoutExecutor; + private ScheduledExecutorService elapsedTimeoutExecutor; private final boolean runAsync; + private long ruleValidationIntervalMs; /** * A map to track query count running by each user @@ -120,6 +125,9 @@ public SQLOperation(HiveSession parentSession, String statement, Map 0) { - timeoutExecutor = new ScheduledThreadPoolExecutor(1); - Runnable timeoutTask = new Runnable() { - @Override - public void run() { - try { - String queryId = queryState.getQueryId(); - LOG.info("Query timed out after: " + queryTimeout - + " seconds. Cancelling the execution now: " + queryId); - SQLOperation.this.cancel(OperationState.TIMEDOUT); - } catch (HiveSQLException e) { - LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e); - } finally { - // Stop - timeoutExecutor.shutdown(); - } - } - }; - timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS); + LOG.info("Using query timeout: {}", queryTimeout); + startQueryTimeoutMonitor(); } queryInfo.setQueryDisplay(driver.getQueryDisplay()); @@ -236,6 +226,48 @@ public void run() { } } + private void startQueryTimeoutMonitor() { + queryTimeoutExecutor = new ScheduledThreadPoolExecutor(1); + Runnable timeoutTask = () -> { + if (isDone()) { + LOG.info("{} operation isDone already. Query timeout not enforced.", getQueryInfo().getOperationId()); + } else { + try { + String queryId = queryState.getQueryId(); + LOG.info("Query timed out after: " + queryTimeout + + " seconds. Cancelling the execution now: " + queryId); + SQLOperation.this.cancel(OperationState.TIMEDOUT); + } catch (HiveSQLException e) { + LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e); + } finally { + queryTimeoutExecutor.shutdown(); + } + } + }; + queryTimeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS); + } + + private void startElapsedTimeoutMonitor(final Rule elapsedTimeRule) { + elapsedTimeoutExecutor = new ScheduledThreadPoolExecutor(1); + Runnable timeoutTask = () -> { + if (isDone()) { + LOG.info("{} operation isDone already. Elapsed timeout not enforced.", getQueryInfo().getOperationId()); + } else { + try { + String queryId = queryState.getQueryId(); + LOG.info("{} violated. Cancelling the execution now for {} after {} seconds", elapsedTimeRule, queryId, + elapsedTimeout); + SQLOperation.this.cancel(OperationState.CANCELED); + } catch (HiveSQLException e) { + LOG.error("Error cancelling the query after timeout: " + elapsedTimeout + " seconds", e); + } finally { + elapsedTimeoutExecutor.shutdown(); + } + } + }; + elapsedTimeoutExecutor.schedule(timeoutTask, elapsedTimeout, TimeUnit.SECONDS); + } + private void runQuery() throws HiveSQLException { try { OperationState opState = getStatus().getState(); @@ -309,7 +341,6 @@ public void runInternal() throws HiveSQLException { } } - private final class BackgroundWork implements Runnable { private final UserGroupInformation currentUGI; private final Hive parentHive; @@ -422,8 +453,11 @@ private synchronized void cleanup(OperationState state) throws HiveSQLException } // Shutdown the timeout thread if any, while closing this operation - if ((timeoutExecutor != null) && (state != OperationState.TIMEDOUT) && (state.isTerminal())) { - timeoutExecutor.shutdownNow(); + if ((queryTimeoutExecutor != null) && (state != OperationState.TIMEDOUT) && (state.isTerminal())) { + queryTimeoutExecutor.shutdownNow(); + } + if ((elapsedTimeoutExecutor != null) && (state != OperationState.CANCELED) && (state.isTerminal())) { + elapsedTimeoutExecutor.shutdownNow(); } } @@ -710,4 +744,50 @@ public String getExecutionEngine() { return queryState.getConf().getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); } + public void setRules(final List rules) { + // if we have a rule with limits on elapsed time, set that limit as query timeout (elapsed time includes end to end + // time of a query/operation) + if (rules != null && driver != null && !isDone()) { + String elapsedCounterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(); + rules.stream().filter(r -> r.getExpression().getCounterLimit().getName().equals(elapsedCounterName)) + .findFirst() + .ifPresent(r -> { + // FIXME: If time is specified in milliseconds this can result in rounding error 1500ms -> 1s. + long timeout = TimeUnit.MILLISECONDS.toSeconds(r.getExpression().getCounterLimit().getLimit()); + if (timeout > 0) { + // if elapsed timeout is set already, shutdown the executor that is already scheduled and create another + // timeout task with new elapsed timeout + if (elapsedTimeout > 0 && elapsedTimeoutExecutor != null) { + elapsedTimeoutExecutor.shutdownNow(); + } + this.elapsedTimeout = timeout; + startElapsedTimeoutMonitor(r); + } + }); + + // for all other rules, delegate it to driver + driver.setRules(rules); + driver.setRuleValidationInterval(ruleValidationIntervalMs); + } else { + if (rules == null) { + LOG.warn("Not setting null rules for operation {}", getQueryInfo().getOperationId()); + } + + if (driver == null) { + LOG.warn("Not setting rules as driver is null for operation {}", getQueryInfo().getOperationId()); + } + + if (isDone()) { + LOG.warn("Not setting rules as the operation {} is already in terminal state", getQueryInfo().getOperationId()); + } + } + } + + public List getRules() { + if (driver != null) { + return driver.getRules(); + } + + return null; + } }