diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ce391fd..9bdd58e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.conf; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -3396,6 +3397,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + HIVE_RULE_VALIDATION_INTERVAL_MS("hive.rule.validation.interval.ms", "500ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Interval for validating rules during execution of a query. Rules defined in resource plan will get validated\n" + + "for all SQL operations after every defined interval (default: 200ms) and corresponding action defined in\n" + + "the rule will be taken"), + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, "Whether to use operator stats to determine reducer parallelism for Hive on Spark.\n" + "If this is false, Hive will use source table stats to determine reducer\n" + @@ -3943,6 +3950,18 @@ public static long toSizeBytes(String value) { return new String[] {value.substring(0, i), value.substring(i)}; } + private static Set daysSet = ImmutableSet.of("d", "D", "day", "DAY", "days", "DAYS"); + private static Set hoursSet = ImmutableSet.of("h", "H", "hour", "HOUR", "hours", "HOURS"); + private static Set minutesSet = ImmutableSet.of("m", "M", "min", "MIN", "minute", "MINUTE", + "minutes", "MINUTES"); + private static Set secondsSet = ImmutableSet.of("s", "S", "sec", "SEC", "second", "SECOND", + "seconds", "SECONDS"); + private static Set millisSet = ImmutableSet.of("ms", "MS", "msec", "MSEC", "millisecond", "MILLISECOND", + "milliseconds", "MILLISECONDS"); + private static Set microsSet = ImmutableSet.of("us", "US", "usec", "USEC", "microsecond", "MICROSECOND", + "microseconds", "MICROSECONDS"); + private static Set nanosSet = ImmutableSet.of("ns", "NS", "nsec", "NSEC", "nanosecond", "NANOSECOND", + "nanoseconds", "NANOSECONDS"); public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { unit = unit.trim().toLowerCase(); if (unit.isEmpty() || unit.equals("l")) { @@ -3950,19 +3969,19 @@ public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { throw new IllegalArgumentException("Time unit is not specified"); } return defaultUnit; - } else if (unit.equals("d") || unit.startsWith("day")) { + } else if (daysSet.contains(unit)) { return TimeUnit.DAYS; - } else if (unit.equals("h") || unit.startsWith("hour")) { + } else if (hoursSet.contains(unit)) { return TimeUnit.HOURS; - } else if (unit.equals("m") || unit.startsWith("min")) { + } else if (minutesSet.contains(unit)) { return TimeUnit.MINUTES; - } else if (unit.equals("s") || unit.startsWith("sec")) { + } else if (secondsSet.contains(unit)) { return TimeUnit.SECONDS; - } else if (unit.equals("ms") || unit.startsWith("msec")) { + } else if (millisSet.contains(unit)) { return TimeUnit.MILLISECONDS; - } else if (unit.equals("us") || unit.startsWith("usec")) { + } else if (microsSet.contains(unit)) { return TimeUnit.MICROSECONDS; - } else if (unit.equals("ns") || unit.startsWith("nsec")) { + } else if (nanosSet.contains(unit)) { return TimeUnit.NANOSECONDS; } throw new IllegalArgumentException("Invalid time unit " + unit); diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index eeb6e58..1440983 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -204,6 +204,12 @@ ${hadoop.version} tests test + + + io.netty + netty-all + + org.apache.hadoop diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 91d0377..923b764 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -18,16 +18,14 @@ package org.apache.hive.jdbc; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; -import java.lang.reflect.Field; import java.math.BigDecimal; import java.net.URL; import java.sql.Connection; @@ -35,73 +33,51 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLTimeoutException; import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.FieldDesc; -import org.apache.hadoop.hive.llap.LlapRowRecordReader; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.llap.LlapRowInputFormat; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.QueryRule; +import org.apache.hadoop.hive.ql.wm.Rule; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; - +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; -import org.apache.hadoop.hive.llap.LlapRowInputFormat; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; - -import org.datanucleus.ClassLoaderResolver; -import org.datanucleus.NucleusContext; -import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; -import org.datanucleus.AbstractNucleusContext; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Lists; + public class TestJdbcWithMiniLlap { private static MiniHS2 miniHS2 = null; private static String dataFileDir; private static Path kvDataFilePath; private static Path dataTypesFilePath; - private static final String tmpDir = System.getProperty("test.tmp.dir"); + private static String ruleTestTable = "testtab1"; + private static String ruleQuery = "select t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; + private static String ruleQueryWithSleep = "select sleep(t1.under_col, 500), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; private static HiveConf conf = null; private Connection hs2Conn = null; @@ -119,6 +95,7 @@ public static void beforeTest() throws Exception { conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_RULE_VALIDATION_INTERVAL_MS, 200, TimeUnit.MILLISECONDS); conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); @@ -440,6 +417,162 @@ public void testDataTypes() throws Exception { assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); } + //@Test(timeout = 60000) + public void testRuleElapsedTimeWithSmallerQueryTimeout() throws Exception { + int elapsedTimeoutMs = 6000; + int queryTimeoutSec = 3; // query timeout will take precedence, SQLTimeoutException will be thrown + Rule rule = new QueryRule("slow_query", + ExpressionFactory.fromString("ELAPSED_TIME > " + elapsedTimeoutMs), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, "Query timed out after 3 seconds", queryTimeoutSec, + SQLTimeoutException.class); + } + + //@Test(timeout = 60000) + public void testRuleElapsedTimeWithHigherQueryTimeout() throws Exception { + int elapsedTimeoutMs = 4000; // elapsed timeout will take precendence, SQLException will be thrown + int queryTimeoutSec = 8; + Rule rule = new QueryRule("slow_query", + ExpressionFactory.fromString("ELAPSED_TIME > " + elapsedTimeoutMs), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, "Query was cancelled", queryTimeoutSec, SQLException.class); + } + + //@Test(timeout = 60000) + public void testRuleSlowQueryElapsedTime() throws Exception { + Rule rule = new QueryRule("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, "Query was cancelled"); + } + + //@Test(timeout = 60000) + public void testRuleSlowQueryExecutionTime() throws Exception { + Rule rule = new QueryRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 3000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQueryWithSleep, rule, rule.toString() + " violated"); + } + + //@Test(timeout = 120000) + public void testRuleHighShuffleBytes() throws Exception { + Rule rule = new QueryRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 512"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery, rule, rule.toString() + " violated"); + } + + //@Test(timeout = 60000) + public void testRuleHighBytesRead() throws Exception { + Rule rule = new QueryRule("big_read", ExpressionFactory.fromString("FILE_BYTES_READ > 100"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery, rule, rule.toString() + " violated"); + } + + //@Test(timeout = 60000) + public void testRuleHighBytesWrite() throws Exception { + Rule rule = new QueryRule("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 512"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery, rule, rule.toString() + " violated"); + } + + //@Test(timeout = 120000) + public void testRuleElapsedTimeMultipleSet() throws Exception { + Rule firstRule = new QueryRule("slow_query", + ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Rule.Action.KILL_QUERY); + List rules = Lists.newArrayList(firstRule); + // TODO: these rules should be done via create statements after integration + + Connection firstCon = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + final Statement firstStmt = firstCon.createStatement(); + final Throwable[] firstThrowable = new Throwable[1]; + Thread firstThread = new Thread(() -> { + try { + firstStmt.executeQuery(ruleQuery); + } catch (SQLException e) { + firstThrowable[0] = e; + } + }); + firstThread.start(); + + Rule secondRule = new QueryRule("slow_query_updated", + ExpressionFactory.fromString("ELAPSED_TIME > 50000"), + Rule.Action.KILL_QUERY); + rules = Lists.newArrayList(secondRule); + // TODO: these rules should be done via create statements after integration + + Connection secondCon = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + final Statement secondStmt = secondCon.createStatement(); + final Throwable[] secondThrowable = new Throwable[1]; + Thread secondThread = new Thread(() -> { + try { + secondStmt.executeQuery("select sleep(t1.under_col, 500), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"); + } catch (SQLException e) { + secondThrowable[0] = e; + } + }); + secondThread.start(); + + firstThread.join(); + secondThread.join(); + firstStmt.close(); + secondStmt.close(); + + assertEquals("Expected null throwable for first query", null, firstThrowable[0]); + assertNotNull("Expected non-null throwable for second query", secondThrowable[0]); + assertEquals(secondThrowable[0].getClass(), SQLException.class); + String expect = "Query was cancelled"; + assertTrue(expect + " is not contained in " + secondThrowable[0].getMessage(), + secondThrowable[0].getMessage().contains(expect)); + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + private void runQueryWithRule(final String query, final Rule rule, final String expect, int queryTimeout, + final Class exceptionClass) + throws Exception { + + List rules = Lists.newArrayList(rule); + // TODO: these rules should be done via create statements after integration + Connection con = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + selStmt.setQueryTimeout(queryTimeout); + selStmt.executeQuery(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(exceptionClass, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + + private void runQueryWithRule(final String query, final Rule rule, final String expect) + throws Exception { + runQueryWithRule(query, rule, expect, 0, SQLException.class); + } + private interface RowProcessor { void process(Row row); } diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index 0415169..9f3ec38 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -392,8 +392,12 @@ public void onRemoval(RemovalNotification arg) { this.hostProxies = cb.build(); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); this.token = token; - String tokenUser = getTokenUser(token); - this.tokenUser = tokenUser; + if (token != null) { + String tokenUser = getTokenUser(token); + this.tokenUser = tokenUser; + } else { + this.tokenUser = null; + } this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index cedb486..197f89d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.RuleContext; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -145,10 +146,20 @@ */ private Map insertBranchToNamePrefix = new HashMap<>(); private Operation operation = Operation.OTHER; + private RuleContext ruleContext; + public void setOperation(Operation operation) { this.operation = operation; } + public RuleContext getRuleContext() { + return ruleContext; + } + + public void setRuleContext(final RuleContext ruleContext) { + this.ruleContext = ruleContext; + } + /** * These ops require special handling in various places * (note that Insert into Acid table is in OTHER category) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index f01edf8..626435f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.RuleContext; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; @@ -484,6 +485,9 @@ private int compile(String command, boolean resetTaskIds, boolean deferClose) { String queryId = queryState.getQueryId(); + if (ctx != null) { + setRuleContext(); + } //save some info for webUI for use after plan is freed this.queryDisplay.setQueryStr(queryStr); this.queryDisplay.setQueryId(queryId); @@ -528,6 +532,7 @@ public void run() { } if (ctx == null) { ctx = new Context(conf); + setRuleContext(); } ctx.setTryCount(getTryCount()); @@ -712,6 +717,11 @@ public void run() { } } + private void setRuleContext() { + RuleContext ruleContext = new RuleContext(queryInfo); + ctx.setRuleContext(ruleContext); + } + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index d978a25..eaa39bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -92,39 +92,42 @@ protected int getExecutorCount(boolean allowUpdate) { } @Override - public void updateSessionsAsync(double totalMaxAlloc, List sessionsToUpdate) { + public void updateSessionsAsync(double totalMaxAlloc, List sessionsToUpdate) { // Do not make a remote call unless we have no information at all. int totalCount = getExecutorCount(false); int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc); double lastDelta = 0; for (int i = 0; i < sessionsToUpdate.size(); ++i) { - WmTezSession session = sessionsToUpdate.get(i); - int intAlloc = -1; - if (i + 1 == sessionsToUpdate.size()) { - intAlloc = totalToDistribute; - // We rely on the caller to supply a reasonable total; we could log a warning - // if this doesn't match the allocation of the last session beyond some threshold. - } else { - // This ensures we don't create skew, e.g. with 8 ducks and 5 queries with simple rounding - // we'd produce 2-2-2-2-0 as we round 1.6; whereas adding the last delta to the next query - // we'd round 1.6-1.2-1.8-1.4-2.0 and thus give out 2-1-2-1-2, as intended. - // Note that fractions don't have to all be the same like in this example. - double fraction = session.getClusterFraction(); - double allocation = fraction * totalCount + lastDelta; - double roundedAlloc = Math.round(allocation); - lastDelta = allocation - roundedAlloc; - if (roundedAlloc < 0) { - roundedAlloc = 0; // Can this happen? Delta cannot exceed 0.5. + TezSessionState sessionState = sessionsToUpdate.get(i); + if (sessionState instanceof WmTezSession) { + WmTezSession session = (WmTezSession) sessionState; + int intAlloc = -1; + if (i + 1 == sessionsToUpdate.size()) { + intAlloc = totalToDistribute; + // We rely on the caller to supply a reasonable total; we could log a warning + // if this doesn't match the allocation of the last session beyond some threshold. + } else { + // This ensures we don't create skew, e.g. with 8 ducks and 5 queries with simple rounding + // we'd produce 2-2-2-2-0 as we round 1.6; whereas adding the last delta to the next query + // we'd round 1.6-1.2-1.8-1.4-2.0 and thus give out 2-1-2-1-2, as intended. + // Note that fractions don't have to all be the same like in this example. + double fraction = session.getClusterFraction(); + double allocation = fraction * totalCount + lastDelta; + double roundedAlloc = Math.round(allocation); + lastDelta = allocation - roundedAlloc; + if (roundedAlloc < 0) { + roundedAlloc = 0; // Can this happen? Delta cannot exceed 0.5. + } + intAlloc = (int) roundedAlloc; } - intAlloc = (int)roundedAlloc; - } - // Make sure we don't give out more than allowed due to double/rounding artifacts. - if (intAlloc > totalToDistribute) { - intAlloc = totalToDistribute; + // Make sure we don't give out more than allowed due to double/rounding artifacts. + if (intAlloc > totalToDistribute) { + intAlloc = totalToDistribute; + } + totalToDistribute -= intAlloc; + // This will only send update if it's necessary. + updateSessionAsync(session, intAlloc); } - totalToDistribute -= intAlloc; - // This will only send update if it's necessary. - updateSessionAsync(session, intAlloc); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java index a326db3..162217d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java @@ -28,5 +28,5 @@ * avoid various artifacts, esp. with small numbers and double weirdness. * @param sessions Sessions to update based on their allocation fraction. */ - void updateSessionsAsync(double totalMaxAlloc, List sessions); + void updateSessionsAsync(double totalMaxAlloc, List sessions); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java index f6b1c1d..5b2d0a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.List; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -30,7 +32,7 @@ class RestrictedConfigChecker { private final static Logger LOG = LoggerFactory.getLogger(RestrictedConfigChecker.class); - private final List restrictedHiveConf = new ArrayList<>(); + private final List restrictedHiveConf = new ArrayList<>(); private final List restrictedNonHiveConf = new ArrayList<>(); private final HiveConf initConf; @@ -45,7 +47,7 @@ confName = confName.toLowerCase(); ConfVars cv = confVars.get(confName); if (cv != null) { - restrictedHiveConf.add(cv); + restrictedHiveConf.add(cv.varname); } else { LOG.warn("A restricted config " + confName + " is not recognized as a Hive setting."); restrictedNonHiveConf.add(confName); @@ -53,13 +55,13 @@ } } - public void validate(HiveConf conf) throws HiveException { - for (ConfVars var : restrictedHiveConf) { - String userValue = HiveConf.getVarWithoutType(conf, var), - serverValue = HiveConf.getVarWithoutType(initConf, var); + public void validate(Configuration conf) throws HiveException { + for (String var : restrictedHiveConf) { + String userValue = HiveConf.getVarWithoutType(conf, ConfVars.valueOf(var)), + serverValue = HiveConf.getVarWithoutType(initConf, ConfVars.valueOf(var)); // Note: with some trickery, we could add logic for each type in ConfVars; for now the // potential spurious mismatches (e.g. 0 and 0.0 for float) should be easy to work around. - validateRestrictedConfigValues(var.varname, userValue, serverValue); + validateRestrictedConfigValues(var, userValue, serverValue); } for (String var : restrictedNonHiveConf) { String userValue = conf.get(var), serverValue = initConf.get(var); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 363443d..da5a228 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -57,8 +57,10 @@ import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.RuleContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -112,6 +114,8 @@ private final Set localizedResources = new HashSet(); private boolean doAsEnabled; private boolean isLegacyLlapMode; + private RuleContext ruleContext; + private Set desiredCounters; /** * Constructor. We do not automatically connect, because we only want to @@ -751,4 +755,32 @@ public void destroy() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. TezSessionPoolManager.getInstance().destroy(this); } + + public RuleContext getRuleContext() { + return ruleContext; + } + + public void setRuleContext(final RuleContext ruleContext) { + this.ruleContext = ruleContext; + } + + public KillQuery getKillQuery() { + return SessionState.get().getKillQuery(); + } + + public void setDesiredCounters(final Set desiredCounters) { + this.desiredCounters = desiredCounters; + } + + public void mergeDesiredCounters(final Set moreCounters) { + if (this.desiredCounters == null) { + this.desiredCounters = moreCounters; + } else { + desiredCounters.addAll(moreCounters); + } + } + + public Set getDesiredCounters() { + return desiredCounters; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 28d91cc..e625792 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -31,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -43,9 +42,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -59,6 +60,10 @@ import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.JobCounterListener; +import org.apache.hadoop.hive.ql.wm.MetastoreGlobalRulesFetcher; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleFetcher; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -84,7 +89,6 @@ import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.json.JSONObject; -import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import com.google.common.annotations.VisibleForTesting; @@ -152,12 +156,25 @@ public int execute(DriverContext driverContext) { if (WorkloadManager.isInUse(ss.getConf())) { // TODO: in future, we may also pass getUserIpAddress. // Note: for now this will just block to wait for a session based on parallelism. - session = WorkloadManager.getInstance().getSession(session, ss.getUserName(), conf); + session = WorkloadManager.getInstance().getSession(session, ss.getUserName(), ctx); } else { session = TezSessionPoolManager.getInstance().getSession( - session, conf, false, getWork().getLlapMode()); + session, conf, false, getWork().getLlapMode()); } + + // if WM enabled, rules are bound to pools. There can be rules that have scope outside of LLAP. + RuleFetcher globalRuleFetcher = new MetastoreGlobalRulesFetcher(); + List globalRules = globalRuleFetcher.fetch(); + if (globalRules != null) { + Set desiredCounters = globalRules.stream().map(r -> r.getExpression().getCounterLimit().getName()) + .collect(Collectors.toSet()); + session.mergeDesiredCounters(desiredCounters); + } + + JobCounterListener jcl = new JobCounterListenerImpl(session.getDesiredCounters()); + ctx.getRuleContext().setJobCounterListener(jcl); ss.setTezSession(session); + session.setRuleContext(ctx.getRuleContext()); try { // jobConf will hold all the configuration for hadoop, tez, and hive JobConf jobConf = utils.createConfiguration(conf); @@ -219,7 +236,7 @@ public int execute(DriverContext driverContext) { } // finally monitor will print progress until the job is done - TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(),dagClient, conf, dag, ctx); + TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), dagClient, conf, dag, ctx); rc = monitor.monitorExecution(); if (rc != 0) { @@ -781,4 +798,29 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set } } } + + + static class JobCounterListenerImpl implements JobCounterListener { + private Set counters; + private Map currentCounters; + + public JobCounterListenerImpl(final Set subscribedCounters) { + this.counters = subscribedCounters; + } + + @Override + public Set getSubscribedCounters() { + return counters; + } + + @Override + public void updateCounters(final Map counters) { + this.currentCounters = counters; + } + + @Override + public Map getCounters() { + return currentCounters; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 3f62127..d4b9d6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; @@ -26,11 +30,19 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.JobCounterListener; +import org.apache.hadoop.hive.ql.wm.MetastorePoolRulesFetcher; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleActionListener; +import org.apache.hadoop.hive.ql.wm.RuleContext; +import org.apache.hadoop.hive.ql.wm.RuleFetcher; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -42,6 +54,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** Workload management entry point for HS2. */ @@ -60,17 +73,42 @@ // TODO: it's not clear that we need to track this - unlike PoolManager we don't have non-pool // sessions, so the pool itself could internally track the sessions it gave out, since // calling close on an unopened session is probably harmless. - private final IdentityHashMap openSessions = + private final IdentityHashMap openSessions = new IdentityHashMap<>(); - /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */ - private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock(); private final HashMap pools = new HashMap<>(); private final int amRegistryTimeoutMs; - - private static class PoolState { + // rules are usually updated less frequently. + // sessions can be updated much more frequently (as and when a new DAG/query is submitted). + // any updates to rules or sessions will have to be validated to make sure the rules are not violated. + // lock to avoid simultaneous updates to rules and sessions during rule validation + private final ReentrantReadWriteLock sessionsPoolsLock = new ReentrantReadWriteLock(); + private RuleFetcher ruleFetcher; + + public static class PoolState { // Add stuff here as WM is implemented. private final Object lock = new Object(); - private final List sessions = new ArrayList<>(); + private List sessions; + private List rules; + + public PoolState(final String poolName) { + updateCounterNames(); + } + + private void updateCounterNames() { + if (rules != null && sessions != null) { + Set counters = rules.stream().map(r -> r.getExpression().getCounterLimit().getName()) + .collect(Collectors.toSet()); + sessions.forEach(s -> s.setDesiredCounters(counters)); + } + } + + public void setRules(final List rules) { + this.rules = rules; + } + + public List getRules() { + return rules; + } } // TODO: this is temporary before HiveServerEnvironment is merged. @@ -113,8 +151,8 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf) { QueryAllocationManager qam, Token amsToken) { this.yarnQueue = yarnQueue; this.conf = conf; + this.ruleFetcher = new MetastorePoolRulesFetcher(); initializeHivePools(); - this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); sessions = new TezSessionPool<>(conf, numSessions, true); @@ -125,21 +163,116 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf) { for (int i = 0; i < numSessions; i++) { sessions.addInitialSession(createSession()); } + long ruleValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars.HIVE_RULE_VALIDATION_INTERVAL_MS, TimeUnit + .MILLISECONDS); + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RuleValidator").build()); + RuleViolationListener ruleViolationListener = new RuleViolationListener(); + RuleValidatorCallable ruleValidatorCallable = new RuleValidatorCallable(openSessions, pools, sessionsPoolsLock, + ruleViolationListener); + scheduledExecutorService.schedule(ruleValidatorCallable, ruleValidationIntervalMs, TimeUnit.MILLISECONDS); + } + + + private static class RuleValidatorCallable implements Runnable { + private final Map openSessions; + private final Map pools; + private final ReentrantReadWriteLock sessionsPoolsLock; + private final RuleViolationListener ruleViolationListener; + + public RuleValidatorCallable(final Map openSessions, + final Map pools, final ReentrantReadWriteLock sessionsPoolsLock, + final RuleViolationListener ruleViolationListener) { + this.openSessions = openSessions; + this.pools = pools; + this.sessionsPoolsLock = sessionsPoolsLock; + this.ruleViolationListener = ruleViolationListener; + } + + @Override + public void run() { + Map violatedSessions = new HashMap<>(); + sessionsPoolsLock.readLock().lock(); + try { + if (openSessions != null) { + openSessions.forEach((s, b) -> { + RuleContext ruleContext = s.getRuleContext(); + JobCounterListener jobCounterListener = ruleContext.getJobCounterListener(); + Map currentCounters = jobCounterListener.getCounters(); + if (pools != null) { + pools.forEach((p, ps) -> { + List rules = ps.getRules(); + if (rules != null) { + rules.forEach(r -> { + String desiredCounter = r.getExpression().getCounterLimit().getName(); + if (r.apply(currentCounters.get(desiredCounter))) { + violatedSessions.put(s, r); + } + }); + } + }); + } + }); + } + } finally { + sessionsPoolsLock.readLock().unlock(); + } + + if (ruleViolationListener != null) { + ruleViolationListener.applyAction(violatedSessions); + } + } + } + + private static class RuleViolationListener implements RuleActionListener { + @Override + public void applyAction(final Map queriesViolated) { + if (queriesViolated != null) { + queriesViolated.forEach((s, r) -> { + if (r.getAction().equals(Rule.Action.KILL_QUERY)) { + try { + s.getKillQuery().killQuery(s.getRuleContext().getQueryId()); + } catch (HiveException e) { + LOG.warn("Unable to kill query {} for rule {} violation", s.getRuleContext().getQueryId(), r); + } + } else { + // TODO: add support for moving to different pools + } + }); + } + } } private void initializeHivePools() { // TODO: real implementation - poolsLock.writeLock().lock(); + sessionsPoolsLock.writeLock().lock(); + try { + String poolName = "llap"; + PoolState poolState = new PoolState(poolName); + poolState.setRules(ruleFetcher.fetch()); + pools.put(poolName, poolState); + } finally { + sessionsPoolsLock.writeLock().unlock(); + } + } + + @VisibleForTesting + public void updateHivePools() { + sessionsPoolsLock.writeLock().lock(); try { - pools.put("llap", new PoolState()); + // TODO: updates any changes to rules in poolstate + String poolName = "llap"; + PoolState poolState = new PoolState(poolName); + poolState.setRules(ruleFetcher.fetch()); + pools.put(poolName, poolState); } finally { - poolsLock.writeLock().unlock(); + sessionsPoolsLock.writeLock().unlock(); } } public TezSessionState getSession( - TezSessionState session, String userName, HiveConf conf) throws Exception { - validateConfig(conf); + TezSessionState session, String userName, Context ctx) throws Exception { + validateConfig(ctx.getConf()); String poolName = mapSessionToPoolName(userName); // TODO: do query parallelism enforcement here based on the policies and pools. while (true) { @@ -170,11 +303,11 @@ protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { private void redistributePoolAllocations( String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove) { - List sessionsToUpdate = null; + List sessionsToUpdate = null; double totalAlloc = 0; assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); - poolsLock.readLock().lock(); + sessionsPoolsLock.readLock().lock(); try { PoolState pool = pools.get(poolName); synchronized (pool.lock) { @@ -192,7 +325,7 @@ private void redistributePoolAllocations( sessionsToUpdate = new ArrayList<>(pool.sessions); } } finally { - poolsLock.readLock().unlock(); + sessionsPoolsLock.readLock().unlock(); } allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); } @@ -219,11 +352,13 @@ private WmTezSession checkSessionForReuse(TezSessionState session) throws Except return null; } - private double updatePoolAllocations(List sessions) { + private double updatePoolAllocations(List sessions) { // TODO: real implementation involving in-the-pool policy interface, etc. double allocation = 1.0 / sessions.size(); - for (WmTezSession session : sessions) { - session.setClusterFraction(allocation); + for (TezSessionState session : sessions) { + if (session instanceof WmTezSession) { + ((WmTezSession)session).setClusterFraction(allocation); + } } return 1.0; } @@ -233,13 +368,13 @@ private String mapSessionToPoolName(String userName) { return "llap"; } - private void validateConfig(HiveConf conf) throws HiveException { + private void validateConfig(Configuration conf) throws HiveException { String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); if ((queueName != null) && !queueName.isEmpty()) { LOG.warn("Ignoring " + TezConfiguration.TEZ_QUEUE_NAME + "=" + queueName); conf.set(TezConfiguration.TEZ_QUEUE_NAME, yarnQueue); } - if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { // Should this also just be ignored? Throw for now, doAs unlike queue is often set by admin. throw new HiveException(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + " is not supported"); } @@ -257,13 +392,16 @@ public void start() throws Exception { } public void stop() throws Exception { - List sessionsToClose = null; - synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions.keySet()); - } - for (TezSessionState sessionState : sessionsToClose) { - sessionState.close(false); + sessionsPoolsLock.writeLock().lock(); + try { + for (TezSessionState sessionState : openSessions.keySet()) { + sessionState.close(false); + openSessions.remove(sessionState); + } + } finally { + sessionsPoolsLock.writeLock().unlock(); } + if (expirationTracker != null) { expirationTracker.stop(); } @@ -316,16 +454,22 @@ private WmTezSession ensureOwnedSession(TezSessionState oldSession) { /** Called by TezSessionPoolSession when opened. */ @Override public void registerOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { + sessionsPoolsLock.writeLock().lock(); + try { openSessions.put(session, true); + } finally { + sessionsPoolsLock.writeLock().unlock(); } } /** Called by TezSessionPoolSession when closed. */ @Override public void unregisterOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { + sessionsPoolsLock.writeLock().lock(); + try { openSessions.remove(session); + } finally { + sessionsPoolsLock.writeLock().unlock(); } } @@ -366,4 +510,12 @@ public void destroy(TezSessionState session) throws Exception { protected final HiveConf getConf() { return conf; } + + public RuleFetcher getRuleFetcher() { + return ruleFetcher; + } + + public void setRuleFetcher(final RuleFetcher ruleFetcher) { + this.ruleFetcher = ruleFetcher; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 0de9de5..3a9125f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -26,11 +26,13 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.ql.wm.JobCounterListener; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -41,15 +43,20 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InterruptedIOException; import java.io.StringWriter; -import java.util.HashSet; +import java.util.EnumSet; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; @@ -61,6 +68,7 @@ Licensed to the Apache Software Foundation (ASF) under one public class TezJobMonitor { static final String CLASS_NAME = TezJobMonitor.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private static final int MIN_CHECK_INTERVAL = 200; private static final int MAX_CHECK_INTERVAL = 1000; private static final int MAX_RETRY_INTERVAL = 2500; @@ -146,7 +154,7 @@ public int monitorExecution() { DAGStatus.State lastState = null; boolean running = false; - int checkInterval = MIN_CHECK_INTERVAL; + long checkInterval = MIN_CHECK_INTERVAL; while (true) { try { @@ -154,7 +162,11 @@ public int monitorExecution() { context.checkHeartbeaterLockException(); } - status = dagClient.getDAGStatus(new HashSet(), checkInterval); + status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); + JobCounterListener jobCounterListener = context.getRuleContext().getJobCounterListener(); + if (jobCounterListener != null) { + publishCounterUpdates(status.getDAGCounters(), jobCounterListener); + } vertexProgressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); @@ -235,8 +247,7 @@ public int monitorExecution() { } catch (IOException | TezException tezException) { // best effort } - console - .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); rc = 1; done = true; } else { @@ -263,6 +274,36 @@ public int monitorExecution() { return rc; } + private void publishCounterUpdates(final TezCounters dagCounters, final JobCounterListener jobCounterListener) { + if (dagCounters != null && jobCounterListener != null && jobCounterListener.getSubscribedCounters() != null) { + Set subscribedCounters = jobCounterListener.getSubscribedCounters(); + Map updatedCounters = new HashMap<>(); + StreamSupport.stream(dagCounters.spliterator(), false) + .forEach(cg -> cg.getUnderlyingGroup().forEach( + cgb -> { + String counterName = cgb.getName(); + if (subscribedCounters.contains(counterName)) { + updatedCounters.put(counterName, cgb.getValue()); + } + } + )); + + if (subscribedCounters.contains(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name())) { + updatedCounters.put(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(), context.getRuleContext() + .getQueryInfo().getElapsedTime()); + } + + if (subscribedCounters.contains(TimeCounterLimit.TimeCounter.EXECUTION_TIME.name()) && executionStartTime > 0) { + updatedCounters.put(TimeCounterLimit.TimeCounter.EXECUTION_TIME.name(), + System.currentTimeMillis() - executionStartTime); + } + + if (!updatedCounters.isEmpty()) { + jobCounterListener.updateCounters(updatedCounters); + } + } + } + private void printSummary(boolean success, Map progressMap) { if (isProfilingEnabled() && success && progressMap != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java new file mode 100644 index 0000000..0fc5eaa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Counter limit interface for defining counter name and its limit exceeding which action defined in trigger will get + * executed. + */ +public interface CounterLimit { + /** + * Get the name of the counter. + * + * @return name + */ + String getName(); + + /** + * Get the threshold value for the counter + * + * @return limit + */ + long getLimit(); + + /** + * Return cloned copy of this counter limit + * + * @return cloned copy + */ + CounterLimit clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java new file mode 100644 index 0000000..add933f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Expression that is defined in triggers. + * Most expressions will get triggered only after exceeding a limit. As a result, only greater than (>) expression + * is supported. + */ +public interface Expression { + + enum Predicate { + GREATER_THAN(">"); + + String symbol; + + Predicate(final String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + } + + interface Builder { + Builder greaterThan(CounterLimit counter); + + Expression build(); + } + + /** + * Evaluate current value against this expression. Return true if expression evaluates to true (current > limit) + * else false otherwise + * + * @param current - current value against which expression will be evaluated + * @return + */ + boolean evaluate(final long current); + + /** + * Return counter limit + * + * @return counter limit + */ + CounterLimit getCounterLimit(); + + /** + * Return predicate defined in the expression. + * + * @return predicate + */ + Predicate getPredicate(); + + /** + * Return cloned copy of this expression. + * + * @return cloned copy + */ + Expression clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java new file mode 100644 index 0000000..f19bf82 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.Validator; + +/** + * Factory to create expressions + */ +public class ExpressionFactory { + + public static Expression fromString(final String expression) { + if (expression == null || expression.isEmpty()) { + return null; + } + + // TODO: Only ">" predicate is supported right now, this has to be extended to support expression tree when + // multiple conditions are required. HIVE-17622 + + String[] tokens = expression.split(Expression.Predicate.GREATER_THAN.getSymbol()); + if (tokens.length != 2) { + throw new IllegalArgumentException("Invalid predicate in expression"); + } + + final String counterName = tokens[0].trim(); + final String counterValueStr = tokens[1].trim(); + if (counterName.isEmpty()) { + throw new IllegalArgumentException("Counter name cannot be empty!"); + } + + long counterValue; + for (FileSystemCounterLimit.FSCounter fsCounter : FileSystemCounterLimit.FSCounter.values()) { + if (counterName.toUpperCase().endsWith(fsCounter.name())) { + try { + counterValue = getCounterValue(counterValueStr, new Validator.SizeValidator()); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + // this is file system counter, valid and create counter + FileSystemCounterLimit fsCounterLimit = FileSystemCounterLimit.fromName(counterName, counterValue); + return createExpression(fsCounterLimit); + } + } + + for (TimeCounterLimit.TimeCounter timeCounter : TimeCounterLimit.TimeCounter.values()) { + if (counterName.equalsIgnoreCase(timeCounter.name())) { + try { + counterValue = getCounterValue(counterValueStr, new Validator.TimeValidator(TimeUnit.MILLISECONDS)); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + TimeCounterLimit timeCounterLimit = new TimeCounterLimit( + TimeCounterLimit.TimeCounter.valueOf(counterName.toUpperCase()), counterValue); + return createExpression(timeCounterLimit); + } + } + + // unable to create expression at this point, invalid expression + throw new IllegalArgumentException("Invalid expression! " + expression); + } + + private static long getCounterValue(final String counterValueStr, final Validator validator) throws + NumberFormatException { + long counter; + try { + counter = Long.parseLong(counterValueStr); + } catch (NumberFormatException e) { + if (validator instanceof Validator.SizeValidator) { + counter = HiveConf.toSizeBytes(counterValueStr); + } else if (validator instanceof Validator.TimeValidator) { + counter = HiveConf.toTime(counterValueStr, TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS); + } else { + throw e; + } + } + return counter; + } + + static Expression createExpression(CounterLimit counterLimit) { + return new TriggerExpression(counterLimit, Expression.Predicate.GREATER_THAN); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java new file mode 100644 index 0000000..45d6bf8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * File system specific counters with defined limits + */ +public class FileSystemCounterLimit implements CounterLimit { + + public enum FSCounter { + BYTES_READ, + BYTES_WRITTEN, + SHUFFLE_BYTES + } + + private String scheme; + private FSCounter fsCounter; + private long limit; + + public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + this.scheme = scheme == null || scheme.isEmpty() ? "" : scheme.toUpperCase(); + this.fsCounter = fsCounter; + this.limit = limit; + } + + public static FileSystemCounterLimit fromName(final String counterName, final long limit) { + String counterNameStr = counterName.toUpperCase(); + for (FSCounter fsCounter : FSCounter.values()) { + if (counterNameStr.endsWith(fsCounter.name())) { + int startIdx = counterNameStr.indexOf(fsCounter.name()); + if (startIdx == 0) { // exact match + return new FileSystemCounterLimit(null, FSCounter.valueOf(counterName), limit); + } else { + String scheme = counterNameStr.substring(0, startIdx - 1); + // schema/counter name validation will be done in grammar as part of HIVE-17622 + return new FileSystemCounterLimit(scheme, FSCounter.valueOf(fsCounter.name()), limit); + } + } + } + + throw new IllegalArgumentException("Invalid counter name specified " + counterName.toUpperCase() + ""); + } + + @Override + public String getName() { + return scheme.isEmpty() ? fsCounter.name() : scheme.toUpperCase() + "_" + fsCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new FileSystemCounterLimit(scheme, fsCounter, limit); + } + + @Override + public String toString() { + return "counter: " + getName() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * scheme.hashCode(); + hash += 31 * fsCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof FileSystemCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return scheme.equals(((FileSystemCounterLimit) other).scheme) && fsCounter.equals(( + (FileSystemCounterLimit) other).fsCounter) && limit == ((FileSystemCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/JobCounterListener.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/JobCounterListener.java new file mode 100644 index 0000000..9d82243 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/JobCounterListener.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Map; +import java.util.Set; + +/** + * + */ +public interface JobCounterListener { + Set getSubscribedCounters(); + + void updateCounters(Map counters); + + Map getCounters(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalRulesFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalRulesFetcher.java new file mode 100644 index 0000000..b4975fd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalRulesFetcher.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; + +/** + * Fetch global (non-llap) rules from metastore + */ +public class MetastoreGlobalRulesFetcher implements RuleFetcher { + @Override + public List fetch() { + // TODO: implement after integration + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastorePoolRulesFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastorePoolRulesFetcher.java new file mode 100644 index 0000000..02886a3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastorePoolRulesFetcher.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; + +/** + * Fetch pool specific rules from metastore + */ +public class MetastorePoolRulesFetcher implements RuleFetcher { + @Override + public List fetch() { + // TODO: implement after integration. + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java new file mode 100644 index 0000000..8a0e554 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Rule with query level scope that contains a name, trigger expression violating which defined action will be executed. + */ +public class QueryRule implements Rule { + private String name; + private Expression expression; + private Action action; + + public QueryRule(final String name, final Expression expression, final Action action) { + this.name = name; + this.expression = expression; + this.action = action; + } + + @Override + public String getName() { + return name; + } + + @Override + public Expression getExpression() { + return expression; + } + + @Override + public Action getAction() { + return action; + } + + @Override + public Rule clone() { + return new QueryRule(name, expression.clone(), action); + } + + @Override + public boolean apply(final long current) { + if (expression.evaluate(current)) { + switch (action) { + case KILL_QUERY: + return true; + case MOVE_TO_POOL: + // FIXME: act upon the Action + return true; + } + } + return false; + } + + @Override + public String toString() { + return "{ name: " + name + ", expression: " + expression + ", action: " + action + " }"; + } + + @Override + public int hashCode() { + int hash = name == null ? 31 : 31 * name.hashCode(); + hash += expression == null ? 31 * hash : 31 * hash * expression.hashCode(); + hash += action == null ? 31 * hash : 31 * hash * action.hashCode(); + return hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof QueryRule)) { + return false; + } + + if (other == this) { + return true; + } + + QueryRule otherQR = (QueryRule) other; + return Objects.equals(name, otherQR.name) && + Objects.equals(expression, otherQR.expression) && + Objects.equals(action, otherQR.action); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java new file mode 100644 index 0000000..7e95e13 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Rule interface which gets mapped to CREATE RULE .. queries. A rule can have a name, trigger and action. + * Trigger is a simple expression which gets evaluated during the lifecycle of query and executes an action + * if the expression defined in trigger evaluates to true. + */ +public interface Rule { + + enum Action { + KILL_QUERY(""), + MOVE_TO_POOL(""); + + String poolName; + + Action(final String poolName) { + this.poolName = poolName; + } + + public Action setPoolName(final String poolName) { + this.poolName = poolName; + return this; + } + + public String getPoolName() { + return poolName; + } + } + + /** + * Based on current value, returns true if rule is applied else false. + * + * @param current - current value + * @return true if rule got applied false otherwise + */ + boolean apply(long current); + + /** + * Get trigger expression + * + * @return expression + */ + Expression getExpression(); + + /** + * Return the name of the rule + * + * @return rule name + */ + String getName(); + + /** + * Return the action that will get executed when trigger expression evaluates to true + * + * @return action + */ + Action getAction(); + + /** + * Return cloned copy of this rule + * + * @return clone copy + */ + Rule clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleActionListener.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleActionListener.java new file mode 100644 index 0000000..e5a15c6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleActionListener.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; + +/** + * Interface for listener to rule violations by queries and for performing actions defined in the rules. + */ +public interface RuleActionListener { + /** + * Applies the action defined in the rule for the specified queries + * + * @param queriesViolated - violated queries and the rule it violated + */ + void applyAction(Map queriesViolated); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleContext.java new file mode 100644 index 0000000..5d62f89 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleContext.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import org.apache.hadoop.hive.ql.QueryInfo; + +/** + * Some context information that are required for rule evaluation. + */ +public class RuleContext { + private JobCounterListener jobCounterListener; + private QueryInfo queryInfo; + + public RuleContext(final QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } + + public void setJobCounterListener(final JobCounterListener jobCounterListener) { + this.jobCounterListener = jobCounterListener; + } + + public JobCounterListener getJobCounterListener() { + return jobCounterListener; + } + + public QueryInfo getQueryInfo() { + return queryInfo; + } + + public void setQueryInfo(final QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } + + public String getQueryId() { + return queryInfo.getQueryDisplay().getQueryId(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFetcher.java new file mode 100644 index 0000000..94a8a2d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFetcher.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; + +/** + * Interface to fetch rules + */ +public interface RuleFetcher { + List fetch(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java new file mode 100644 index 0000000..687fa5e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Time based counters with limits + */ +public class TimeCounterLimit implements CounterLimit { + public enum TimeCounter { + ELAPSED_TIME, + EXECUTION_TIME + } + + private TimeCounter timeCounter; + private long limit; + + public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + this.timeCounter = timeCounter; + this.limit = limit; + } + + @Override + public String getName() { + return timeCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new TimeCounterLimit(timeCounter, limit); + } + + @Override + public String toString() { + return "counter: " + timeCounter.name() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * timeCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TimeCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return timeCounter.equals(((TimeCounterLimit) other).timeCounter) && limit == ((TimeCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java new file mode 100644 index 0000000..065ab79 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Simple trigger expression for a rule. + */ +public class TriggerExpression implements Expression { + private CounterLimit counterLimit; + private Predicate predicate; + + public TriggerExpression(final CounterLimit counter, final Predicate predicate) { + this.counterLimit = counter; + this.predicate = predicate; + } + + @Override + public boolean evaluate(final long current) { + if (counterLimit.getLimit() > 0) { + if (predicate.equals(Predicate.GREATER_THAN)) { + return current > counterLimit.getLimit(); + } + } + return false; + } + + @Override + public CounterLimit getCounterLimit() { + return counterLimit; + } + + @Override + public Predicate getPredicate() { + return predicate; + } + + @Override + public Expression clone() { + return new TriggerExpression(counterLimit.clone(), predicate); + } + + @Override + public String toString() { + return counterLimit.getName() + " " + predicate.getSymbol() + " " + counterLimit.getLimit(); + } + + @Override + public int hashCode() { + int hash = counterLimit == null ? 31 : 31 * counterLimit.hashCode(); + hash += predicate == null ? 31 * hash : 31 * hash * predicate.hashCode(); + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TriggerExpression)) { + return false; + } + + if (other == this) { + return true; + } + + return Objects.equals(counterLimit, ((TriggerExpression) other).counterLimit) && + Objects.equals(predicate, ((TriggerExpression) other).predicate); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java index 5d1a3b6..5a125a0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestGuaranteedTaskAllocator.java @@ -94,7 +94,7 @@ public void testAllocations() { private void testAllocation(int ducks, double total, double[] in, int[] out) { MockCommunicator comm = new MockCommunicator(); GuaranteedTasksAllocatorForTest qam = new GuaranteedTasksAllocatorForTest(comm); - List sessionsToUpdate = new ArrayList<>(); + List sessionsToUpdate = new ArrayList<>(); comm.messages.clear(); for (int i = 0; i < in.length; ++i) { addSession(in[i], sessionsToUpdate, i); @@ -111,7 +111,7 @@ private void testAllocation(int ducks, double total, double[] in, int[] out) { private void testEqualAllocation(int ducks, int sessions, double total) { MockCommunicator comm = new MockCommunicator(); GuaranteedTasksAllocatorForTest qam = new GuaranteedTasksAllocatorForTest(comm); - List sessionsToUpdate = new ArrayList<>(); + List sessionsToUpdate = new ArrayList<>(); comm.messages.clear(); double fraction = total / sessions; for (int i = 0; i < sessions; ++i) { @@ -142,7 +142,7 @@ private void testEqualAllocation(int ducks, int sessions, double total) { return results; } - private void addSession(double alloc, List sessionsToUpdate, int i) { + private void addSession(double alloc, List sessionsToUpdate, int i) { SampleTezSessionState session = new SampleTezSessionState("" + i, null, null); session.setClusterFraction(alloc); sessionsToUpdate.add(session); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 7adf895..c3d50c8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.tez.dag.api.TezConfiguration; import java.util.List; @@ -30,7 +31,7 @@ import org.junit.Test; public class TestWorkloadManager { - private static class MockQam implements QueryAllocationManager { + public static class MockQam implements QueryAllocationManager { boolean isCalled = false; @Override @@ -42,7 +43,7 @@ public void stop() { } @Override - public void updateSessionsAsync(double totalMaxAlloc, List sessions) { + public void updateSessionsAsync(double totalMaxAlloc, List sessions) { isCalled = true; } @@ -52,9 +53,9 @@ void assertWasCalled() { } } - private static class WorkloadManagerForTest extends WorkloadManager { + public static class WorkloadManagerForTest extends WorkloadManager { - WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, + public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, QueryAllocationManager qam) { super(yarnQueue, conf, numSessions, qam, null); } @@ -73,39 +74,41 @@ protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { @Test(timeout = 10000) public void testReuse() throws Exception { HiveConf conf = createConf(); + Context ctx = new Context(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); TezSessionState nonPool = mock(TezSessionState.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, null, conf); + TezSessionState session = wm.getSession(nonPool, null, ctx); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); when(diffPool.getConf()).thenReturn(conf); doNothing().when(diffPool).returnToSessionManager(); - session = wm.getSession(diffPool, null, conf); + session = wm.getSession(diffPool, null, ctx); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, null, conf); + TezSessionState session2 = wm.getSession(session, null, ctx); assertSame(session, session2); } @Test(timeout = 10000) public void testQueueName() throws Exception { HiveConf conf = createConf(); + Context ctx = new Context(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, null, conf); + TezSessionState session = wm.getSession(null, null, ctx); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); - session = wm.getSession(session, null, conf); + session = wm.getSession(session, null, ctx); assertEquals("test", session.getQueueName()); } @@ -117,10 +120,11 @@ public void testQueueName() throws Exception { public void testReopen() throws Exception { // We should always get a different object, and cluster fraction should be propagated. HiveConf conf = createConf(); + Context ctx = new Context(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + WmTezSession session = (WmTezSession) wm.getSession(null, null, ctx); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalled(); WmTezSession session2 = (WmTezSession) session.reopen(conf, null); @@ -134,13 +138,14 @@ public void testReopen() throws Exception { public void testDestroyAndReturn() throws Exception { // Session should not be lost; however the fraction should be discarded. HiveConf conf = createConf(); + Context ctx = new Context(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + WmTezSession session = (WmTezSession) wm.getSession(null, null, ctx); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalled(); - WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf); + WmTezSession session2 = (WmTezSession) wm.getSession(null, null, ctx); assertEquals(0.5, session.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalled(); @@ -151,7 +156,7 @@ public void testDestroyAndReturn() throws Exception { qam.assertWasCalled(); // We never lose pool session, so we should still be able to get. - session = (WmTezSession) wm.getSession(null, null, conf); + session = (WmTezSession) wm.getSession(null, null, ctx); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java new file mode 100644 index 0000000..011b018 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * + */ +public class TestRule { + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testSimpleQueryRule() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = new QueryRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = new QueryRule("hdfs_write_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + rule = new QueryRule("local_read_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = new QueryRule("local_write_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + rule = new QueryRule("shuffle_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 10000)); + rule = new QueryRule("slow_query", expression, Rule.Action.MOVE_TO_POOL.setPoolName("fake_pool")); + assertEquals("counter: EXECUTION_TIME limit: 10000", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(100000)); + } + + @Test + public void testExpressionFromString() { + Expression expression = ExpressionFactory.fromString("BYTES_READ>1024"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString("BYTES_READ > 1024"); + assertEquals(expected, expression); + + expression = ExpressionFactory.fromString(expected.toString()); + assertEquals(expected.toString(), expression.toString()); + + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString(" BYTES_READ > 1024 "); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString("BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_READ > 1024 "); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_READ > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" s3a_ByTeS_WRiTTeN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" EXECUTION_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testSizeValidationInRule() { + Expression expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100MB"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 gB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 TB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024L * 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100 B"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100bytes"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalSizeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300GiB"); + } + + @Test + public void testIllegalSizeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300 foo"); + } + + @Test + public void testTimeValidationInRule() { + Expression expression = ExpressionFactory.fromString(" elapsed_TIME > 300 s"); + Expression expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300s"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300000000 microseconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 1DAY"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 24 * 60 * 60 * 1000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalTimeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300 light years"); + } + + @Test + public void testIllegalTimeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300secTOR"); + } + + @Test + public void testRuleClone() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = new QueryRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY); + Rule clonedRule = rule.clone(); + assertNotEquals(System.identityHashCode(rule), System.identityHashCode(clonedRule)); + assertNotEquals(System.identityHashCode(rule.getExpression()), System.identityHashCode(clonedRule.getExpression())); + assertNotEquals(System.identityHashCode(rule.getExpression().getCounterLimit()), + System.identityHashCode(clonedRule.getExpression().getCounterLimit())); + assertEquals(rule, clonedRule); + assertEquals(rule.hashCode(), clonedRule.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + rule = new QueryRule("slow_query", expression, Rule.Action.KILL_QUERY); + clonedRule = rule.clone(); + assertNotEquals(System.identityHashCode(rule), System.identityHashCode(clonedRule)); + assertNotEquals(System.identityHashCode(rule.getExpression()), System.identityHashCode(clonedRule.getExpression())); + assertNotEquals(System.identityHashCode(rule.getExpression().getCounterLimit()), + System.identityHashCode(clonedRule.getExpression().getCounterLimit())); + assertEquals(rule, clonedRule); + assertEquals(rule.hashCode(), clonedRule.hashCode()); + } + + @Test + public void testIllegalExpressionsUnsupportedPredicate() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ < 1024"); + } + + @Test + public void testIllegalExpressionsMissingLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ >"); + } + + @Test + public void testIllegalExpressionsMissingCounter() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Counter name cannot be empty!"); + ExpressionFactory.fromString("> 1024"); + } + + @Test + public void testIllegalExpressionsMultipleLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ > 1024 > 1025"); + } + + @Test + public void testIllegalExpressionsMultipleCounters() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ > BYTES_READ > 1025"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPost() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString("BYTES_READ > 1024aaaa"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPre() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid counter value"); + ExpressionFactory.fromString("BYTES_READ > foo1024"); + } + + @Test + public void testIllegalExpressionsInvalidNegativeLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal value for counter limit. Expected a positive long value."); + ExpressionFactory.fromString("BYTES_READ > -1024"); + } +}