diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index c227a63..38f2c79 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -172,6 +172,86 @@ public void testTriggerTotalTasks() throws Exception { } @Test(timeout = 60000) + public void testTriggerCustomReadOps() throws Exception { + Expression expression = ExpressionFactory.fromString("HDFS_READ_OPS > 50"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomCreatedFiles() throws Exception { + List configs = getConfigs(); + + Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "create table testtab2 as select * from " + tableName; + runQueryWithTrigger(query, configs, "Query was cancelled"); + + runQueryWithTrigger("create table src3 (key int) partitioned by (value string)", null, null); + + // partitioned insert + expression = ExpressionFactory.fromString("CREATED_FILES > 10"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName + + " where under_col < 100"; + runQueryWithTrigger(query, configs, "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomCreatedDynamicPartitions() throws Exception { + runQueryWithTrigger("create table src2 (key int) partitioned by (value string)", null, null); + List configs = getConfigs(); + + // query will get cancelled before creating 57 partitions + String query = + "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100"; + Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + + // let it create 57 partitions without any triggers + query = "insert overwrite table src2 partition (value) select under_col, value from " + tableName + + " where under_col < 100"; + setupTriggers(Lists.newArrayList()); + runQueryWithTrigger(query, configs, null); + + // query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + + // let it create 64 more partitions (total 57 + 64 = 121) without any triggers + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + setupTriggers(Lists.newArrayList()); + runQueryWithTrigger(query, configs, null); + + // re-run insert into but this time no new partitions will be created, so there will be no violation + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, null); + } + + @Test(timeout = 60000) + public void testTriggerCustomNonExistent() throws Exception { + Expression expression = ExpressionFactory.fromString("OPEN_FILES > 50"); + Trigger trigger = new ExecutionTrigger("non_existent", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = + "select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col"; + runQueryWithTrigger(query, null, null); + } + + @Test(timeout = 60000) public void testMultipleTriggers1() throws Exception { Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 1000000"); Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 93b967f..4364206 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -143,6 +143,8 @@ protected transient long cntr = 1; protected transient long logEveryNRows = 0; protected transient int rowIndex = 0; + private transient Path destTablePath; + private transient boolean isInsertOverwrite; /** * Counters. */ @@ -173,6 +175,7 @@ private boolean isMmTable; private Long txnId; private int stmtId; + String dpDir; public FSPaths(Path specPath, boolean isMmTable) { this.isMmTable = isMmTable; @@ -516,7 +519,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); - + destTablePath = conf.getDestPath(); + isInsertOverwrite = conf.getInsertOverwrite(); if (LOG.isInfoEnabled()) { LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat + (isCompressed ? " with compression" : "")); @@ -740,6 +744,32 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]); } + // There are 2 cases where we increment CREATED_DYNAMIC_PARTITIONS counters + // 1) Insert overwrite + // 2) Insert into table which creates new partitions + if (bDynParts && fsp.dpDir != null) { + Path destPartPath = new Path(destTablePath, fsp.dpDir); + Path dpStagingDir = fsp.outPaths[filesIdx].getParent(); + try { + // for insert overwrite, dp directory gets atomically created in staging dir (which is typically inside + // destination table directory). If the dp dir in staging dir is not created yet, create it and increment the + // counter. Record writer will create the destination file inside the dp dir. + // If the dp dir already exists then we have accounted for that partition. + if (isInsertOverwrite && !fs.exists(dpStagingDir)) { + createStagingDpDir(fsp, filesIdx); + } + // for insert into case, check if dp dir already exists in the destination table, if exists then we are not + // creating new partition but adding to existing one. If dp dir does not exist, create one in staging dir and + // increment the counter. + if (!fs.exists(destPartPath) && !fs.exists(dpStagingDir)) { + createStagingDpDir(fsp, filesIdx); + } + } catch (IOException e) { + LOG.warn("Skipping to increment CREATED_DYNAMIC_PARTITIONS counter. destPartPath: {} dpStagingDir: {} " + + "Exception: {}", destPartPath, dpStagingDir, e.getMessage()); + } + } + Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition @@ -766,9 +796,10 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); } + if (reporter != null) { reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERCREATEDFILES, 1); + Operator.HIVE_COUNTER_CREATED_FILES, 1); } } catch (IOException e) { @@ -776,6 +807,14 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) } } + private void createStagingDpDir(final FSPaths fsp, final int filesIdx) throws IOException { + fs.mkdirs(fsp.outPaths[filesIdx].getParent()); + if (reporter != null) { + reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), + Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1); + } + } + /** * Report status to JT so that JT won't kill this task if closing takes too long * due to too many files to close and the NN is overloaded. @@ -1030,6 +1069,9 @@ private FSPaths createNewPaths(String dirName) throws HiveException { + dirName + ", childSpec " + unionPath + ": tmpPath " + fsp2.getTmpPath() + ", task path " + fsp2.getTaskOutputTempPath()); } + if (bDynParts) { + fsp2.dpDir = dirName; + } if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { createBucketFiles(fsp2); valToPaths.put(dirName, fsp2); @@ -1103,6 +1145,8 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive if (fsp2 == null) { // check # of dp + // TODO: add an option to skip this if number of partitions checks is done by Triggers via + // CREATED_DYNAMIC_PARTITION counter if (valToPaths.size() > maxPartitions) { // we cannot proceed and need to tell the hive client that retries won't succeed either throw new HiveFatalException( @@ -1148,6 +1192,7 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive createBucketForFileIdx(fsp2, 0); valToPaths.put(pathKey, fsp2); } + } fp = fsp2; } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 73ddf86..391666a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -71,8 +71,9 @@ private static final long serialVersionUID = 1L; - public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES"; - public static final String HIVECOUNTERFATAL = "FATAL_ERROR"; + public static final String HIVE_COUNTER_CREATED_FILES = "CREATED_FILES"; + public static final String HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS = "CREATED_DYNAMIC_PARTITIONS"; + public static final String HIVE_COUNTER_FATAL = "FATAL_ERROR"; public static final String CONTEXT_NAME_KEY = "__hive.context.name"; private transient Configuration configuration; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 97df36e..88a75ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -208,7 +207,7 @@ public ExecDriver(MapredWork plan, JobConf job, boolean isSilent) throws HiveExc public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { Counters.Counter cntr = ctrs.findCounter( HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERFATAL); + Operator.HIVE_COUNTER_FATAL); return cntr != null && cntr.getValue() > 0; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 3c07197..2d2eafd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -209,7 +209,7 @@ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { } // check for number of created files Counters.Counter cntr = ctrs.findCounter(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERCREATEDFILES); + Operator.HIVE_COUNTER_CREATED_FILES); long numFiles = cntr != null ? cntr.getValue() : 0; long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES); if (numFiles > upperLimit) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 2ee8c93..0f5f708 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -352,7 +352,7 @@ private void printConfigInfo() throws IOException { Map> counters = new HashMap>(); List hiveCounters = new LinkedList(); counters.put(groupName, hiveCounters); - hiveCounters.add(Operator.HIVECOUNTERCREATEDFILES); + hiveCounters.add(Operator.HIVE_COUNTER_CREATED_FILES); // MapOperator is out of SparkWork, SparkMapRecordHandler use it to bridge // Spark transformation and Hive operators in SparkWork. for (MapOperator.Counter counter : MapOperator.Counter.values()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index df5c6aa..fa8cc22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7225,10 +7225,28 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, Integer dest_type) throws SemanticException { + boolean isInsertOverwrite = false; + switch (dest_type) { + case QBMetaData.DEST_PARTITION: + //fall through + case QBMetaData.DEST_TABLE: + //INSERT [OVERWRITE] path + String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); + Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); + if (iowMap.containsKey(destTableFullName)) { + isInsertOverwrite = true; + } + break; + case QBMetaData.DEST_DFS_FILE: + //CTAS path + break; + default: + throw new IllegalStateException("Unexpected dest_type=" + dest_tab); + } FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path, mmWriteId, isMmCtas); + dest_path, mmWriteId, isMmCtas, isInsertOverwrite); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); @@ -7247,24 +7265,6 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE : (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); - - switch (dest_type) { - case QBMetaData.DEST_PARTITION: - //fall through - case QBMetaData.DEST_TABLE: - //INSERT [OVERWRITE] path - String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); - Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { - fileSinkDesc.setInsertOverwrite(true); - } - break; - case QBMetaData.DEST_DFS_FILE: - //CTAS path - break; - default: - throw new IllegalStateException("Unexpected dest_type=" + dest_tab); - } acidFileSinks.add(fileSinkDesc); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index ea8fc19..04a6421 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -119,7 +119,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas) { + Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -135,6 +135,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.destPath = destPath; this.mmWriteId = mmWriteId; this.isMmCtas = isMmCtas; + this.isInsertOverwrite = isInsertOverwrite; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -156,7 +157,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas); + partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java new file mode 100644 index 0000000..ad1f7fc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Custom counters with limits (this will only work if the execution engine exposes this counter) + */ +public class CustomCounterLimit implements CounterLimit { + + private String counterName; + private long limit; + + public CustomCounterLimit(final String counterName, final long limit) { + this.counterName = counterName; + this.limit = limit; + } + + @Override + public String getName() { + return counterName; + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new CustomCounterLimit(counterName, limit); + } + + @Override + public String toString() { + return "counter: " + counterName + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * counterName.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof CustomCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + CustomCounterLimit otherVcl = (CustomCounterLimit) other; + return counterName.equalsIgnoreCase(otherVcl.counterName) && limit == otherVcl.limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java index f16125d..29f7c89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -95,8 +95,18 @@ public static Expression fromString(final String expression) { return createExpression(vertexCounterLimit); } } - // unable to create expression at this point, invalid expression - throw new IllegalArgumentException("Invalid expression! " + expression); + + // if nothing matches, try creating a custom counter + try { + counterValue = getCounterValue(counterValueStr, null); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + CustomCounterLimit customCounterLimit = new CustomCounterLimit(counterName, counterValue); + return createExpression(customCounterLimit); } private static long getCounterValue(final String counterValueStr, final Validator validator) throws diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 4938e2f..b45f53a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false); + false, 1, 1, partCols, dpCtx, null, null, false, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java index ce1dc6e..cd78545 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java @@ -80,6 +80,12 @@ public void testSimpleQueryTrigger() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); + + expression = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + trigger = new ExecutionTrigger("write_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(100000)); } @Test @@ -166,6 +172,12 @@ public void testExpressionFromString() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertEquals(expected, expression); assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_WRITE_OPS > 10000"); + expected = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); } @Test