diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index c227a63..b229abe 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -172,6 +172,124 @@ public void testTriggerTotalTasks() throws Exception { } @Test(timeout = 60000) + public void testTriggerCustomReadOps() throws Exception { + Expression expression = ExpressionFactory.fromString("HDFS_READ_OPS > 50"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + } + + @Test(timeout = 240000) + public void testTriggerCustomCreatedFiles() throws Exception { + List configs = getConfigs(); + + Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "create table testtab2 as select * from " + tableName; + runQueryWithTrigger(query, configs, "Query was cancelled"); + + runQueryWithTrigger("create table src3 (key int) partitioned by (value string)", null, null); + + // partitioned insert + expression = ExpressionFactory.fromString("CREATED_FILES > 10"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName + + " where under_col < 100"; + runQueryWithTrigger(query, configs, "Query was cancelled"); + } + + @Test(timeout = 240000) + public void testTriggerCustomCreatedDynamicPartitions() throws Exception { + runQueryWithTrigger("drop table src2", null, null); + runQueryWithTrigger("create table src2 (key int) partitioned by (value string)", null, null); + List configs = getConfigs(); + + // query will get cancelled before creating 57 partitions + String query = + "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100"; + Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + + // let it create 57 partitions without any triggers + query = "insert overwrite table src2 partition (value) select under_col, value from " + tableName + + " where under_col < 100"; + setupTriggers(Lists.newArrayList()); + runQueryWithTrigger(query, configs, null); + + // query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + + // let it create 64 more partitions (total 57 + 64 = 121) without any triggers + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + setupTriggers(Lists.newArrayList()); + runQueryWithTrigger(query, configs, null); + + // re-run insert into but this time no new partitions will be created, so there will be no violation + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, null); + } + + @Test(timeout = 120000) + public void testTriggerCustomCreatedDynamicPartitionsMultiInsert() throws Exception { + runQueryWithTrigger("drop table src2", null, null); + runQueryWithTrigger("drop table src3", null, null); + runQueryWithTrigger("create table src2 (key int) partitioned by (value string)", null, null); + runQueryWithTrigger("create table src3 (key int) partitioned by (value string)", null, null); + List configs = getConfigs(); + + String query = + "from " + tableName + + " insert overwrite table src2 partition (value) select * where under_col < 100 " + + " insert overwrite table src3 partition (value) select * where under_col >= 100 and under_col < 200"; + Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); + Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + } + + @Test(timeout = 120000) + public void testTriggerCustomCreatedDynamicPartitionsUnionAll() throws Exception { + runQueryWithTrigger("drop table src2", null, null); + runQueryWithTrigger("create table src2 (key int) partitioned by (value string)", null, null); + List configs = getConfigs(); + + // query will get cancelled before creating 57 partitions + String query = + "insert overwrite table src2 partition (value) " + + "select temps.* from (" + + "select * from " + tableName + " where under_col < 100 " + + "union all " + + "select * from " + tableName + " where under_col >= 100 and under_col < 200) temps"; + Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); + Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomNonExistent() throws Exception { + Expression expression = ExpressionFactory.fromString("OPEN_FILES > 50"); + Trigger trigger = new ExecutionTrigger("non_existent", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = + "select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col"; + runQueryWithTrigger(query, null, null); + } + + @Test(timeout = 60000) public void testMultipleTriggers1() throws Exception { Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 1000000"); Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index a081638..5c11720 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -465,6 +465,8 @@ minillap.query.files=acid_bucket_pruning.q,\ multi_count_distinct_null.q minillaplocal.query.files=\ + dp_counter_non_mm.q,\ + dp_counter_mm.q,\ acid_no_buckets.q, \ acid_globallimit.q,\ acid_vectorization_missing_cols.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 93b967f..2331498 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -143,6 +143,9 @@ protected transient long cntr = 1; protected transient long logEveryNRows = 0; protected transient int rowIndex = 0; + private transient Path destTablePath; + private transient boolean isInsertOverwrite; + private transient String counterGroup; /** * Counters. */ @@ -173,6 +176,7 @@ private boolean isMmTable; private Long txnId; private int stmtId; + String dpDir; public FSPaths(Path specPath, boolean isMmTable) { this.isMmTable = isMmTable; @@ -516,7 +520,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); - + destTablePath = conf.getDestPath(); + isInsertOverwrite = conf.getInsertOverwrite(); + counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP); if (LOG.isInfoEnabled()) { LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat + (isCompressed ? " with compression" : "")); @@ -740,6 +746,8 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]); } + updateDPCounters(fsp, filesIdx); + Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition @@ -766,9 +774,9 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); } + if (reporter != null) { - reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERCREATEDFILES, 1); + reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_FILES, 1); } } catch (IOException e) { @@ -776,6 +784,66 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) } } + private void updateDPCounters(final FSPaths fsp, final int filesIdx) { + // There are 2 cases where we increment CREATED_DYNAMIC_PARTITIONS counters + // 1) Insert overwrite (all partitions are newly created) + // 2) Insert into table which creates new partitions (some new partitions) + + if (bDynParts && destTablePath != null && fsp.dpDir != null) { + Path destPartPath = new Path(destTablePath, fsp.dpDir); + // For MM tables, directory structure is + // /// + + // For Non-MM tables, directory structure is + // // + + // if UNION ALL insert, for non-mm tables subquery creates another subdirectory at the end for each union queries + // /// + + // for non-MM tables, the final destination partition directory is created during move task via rename + // for MM tables, the final destination partition directory is created by the tasks themselves + try { + if (conf.isMmTable()) { + createDpDir(destPartPath); + } else { + // outPath will be + // non-union case: /// + // union case: //// + Path dpStagingDir = fsp.outPaths[filesIdx].getParent(); + if (isUnionDp) { + dpStagingDir = dpStagingDir.getParent(); + } + if (isInsertOverwrite) { + createDpDir(dpStagingDir); + } else { + createDpDirCheckSrc(dpStagingDir, destPartPath); + } + } + } catch (IOException e) { + LOG.warn("Skipping to increment CREATED_DYNAMIC_PARTITIONS counter.Exception: {}", e.getMessage()); + } + } + } + + private void createDpDirCheckSrc(final Path dpStagingPath, final Path dpFinalPath) throws IOException { + if (!fs.exists(dpStagingPath) && !fs.exists(dpFinalPath)) { + fs.mkdirs(dpStagingPath); + // move task will create dp final path + if (reporter != null) { + reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1); + } + } + } + + private void createDpDir(final Path dpPath) throws IOException { + if (!fs.exists(dpPath)) { + fs.mkdirs(dpPath); + if (reporter != null) { + reporter.incrCounter(counterGroup, Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1); + } + } + } + /** * Report status to JT so that JT won't kill this task if closing takes too long * due to too many files to close and the NN is overloaded. @@ -1030,6 +1098,9 @@ private FSPaths createNewPaths(String dirName) throws HiveException { + dirName + ", childSpec " + unionPath + ": tmpPath " + fsp2.getTmpPath() + ", task path " + fsp2.getTaskOutputTempPath()); } + if (bDynParts) { + fsp2.dpDir = dirName; + } if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { createBucketFiles(fsp2); valToPaths.put(dirName, fsp2); @@ -1103,6 +1174,8 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive if (fsp2 == null) { // check # of dp + // TODO: add an option to skip this if number of partitions checks is done by Triggers via + // CREATED_DYNAMIC_PARTITION counter if (valToPaths.size() > maxPartitions) { // we cannot proceed and need to tell the hive client that retries won't succeed either throw new HiveFatalException( @@ -1148,6 +1221,7 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive createBucketForFileIdx(fsp2, 0); valToPaths.put(pathKey, fsp2); } + } fp = fsp2; } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 73ddf86..391666a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -71,8 +71,9 @@ private static final long serialVersionUID = 1L; - public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES"; - public static final String HIVECOUNTERFATAL = "FATAL_ERROR"; + public static final String HIVE_COUNTER_CREATED_FILES = "CREATED_FILES"; + public static final String HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS = "CREATED_DYNAMIC_PARTITIONS"; + public static final String HIVE_COUNTER_FATAL = "FATAL_ERROR"; public static final String CONTEXT_NAME_KEY = "__hive.context.name"; private transient Configuration configuration; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 97df36e..88a75ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -208,7 +207,7 @@ public ExecDriver(MapredWork plan, JobConf job, boolean isSilent) throws HiveExc public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { Counters.Counter cntr = ctrs.findCounter( HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERFATAL); + Operator.HIVE_COUNTER_FATAL); return cntr != null && cntr.getValue() > 0; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 3c07197..2d2eafd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -209,7 +209,7 @@ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { } // check for number of created files Counters.Counter cntr = ctrs.findCounter(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERCREATEDFILES); + Operator.HIVE_COUNTER_CREATED_FILES); long numFiles = cntr != null ? cntr.getValue() : 0; long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES); if (numFiles > upperLimit) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 2ee8c93..0f5f708 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -352,7 +352,7 @@ private void printConfigInfo() throws IOException { Map> counters = new HashMap>(); List hiveCounters = new LinkedList(); counters.put(groupName, hiveCounters); - hiveCounters.add(Operator.HIVECOUNTERCREATEDFILES); + hiveCounters.add(Operator.HIVE_COUNTER_CREATED_FILES); // MapOperator is out of SparkWork, SparkMapRecordHandler use it to bridge // Spark transformation and Hive operators in SparkWork. for (MapOperator.Counter counter : MapOperator.Counter.values()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 5ccb69a..0a9e6fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7225,10 +7225,29 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, Integer dest_type) throws SemanticException { + boolean isInsertOverwrite = false; + switch (dest_type) { + case QBMetaData.DEST_PARTITION: + //fall through + case QBMetaData.DEST_TABLE: + //INSERT [OVERWRITE] path + String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); + Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); + if (iowMap.containsKey(destTableFullName)) { + isInsertOverwrite = true; + } + break; + case QBMetaData.DEST_LOCAL_FILE: + case QBMetaData.DEST_DFS_FILE: + //CTAS path or insert into file/directory + break; + default: + throw new IllegalStateException("Unexpected dest_type=" + dest_tab); + } FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path, mmWriteId, isMmCtas); + dest_path, mmWriteId, isMmCtas, isInsertOverwrite); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); @@ -7247,24 +7266,6 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE : (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); - - switch (dest_type) { - case QBMetaData.DEST_PARTITION: - //fall through - case QBMetaData.DEST_TABLE: - //INSERT [OVERWRITE] path - String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); - Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { - fileSinkDesc.setInsertOverwrite(true); - } - break; - case QBMetaData.DEST_DFS_FILE: - //CTAS path - break; - default: - throw new IllegalStateException("Unexpected dest_type=" + dest_tab); - } acidFileSinks.add(fileSinkDesc); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index ea8fc19..04a6421 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -119,7 +119,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas) { + Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -135,6 +135,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.destPath = destPath; this.mmWriteId = mmWriteId; this.isMmCtas = isMmCtas; + this.isInsertOverwrite = isInsertOverwrite; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -156,7 +157,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas); + partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java new file mode 100644 index 0000000..ad1f7fc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Custom counters with limits (this will only work if the execution engine exposes this counter) + */ +public class CustomCounterLimit implements CounterLimit { + + private String counterName; + private long limit; + + public CustomCounterLimit(final String counterName, final long limit) { + this.counterName = counterName; + this.limit = limit; + } + + @Override + public String getName() { + return counterName; + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new CustomCounterLimit(counterName, limit); + } + + @Override + public String toString() { + return "counter: " + counterName + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * counterName.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof CustomCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + CustomCounterLimit otherVcl = (CustomCounterLimit) other; + return counterName.equalsIgnoreCase(otherVcl.counterName) && limit == otherVcl.limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java index f16125d..29f7c89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -95,8 +95,18 @@ public static Expression fromString(final String expression) { return createExpression(vertexCounterLimit); } } - // unable to create expression at this point, invalid expression - throw new IllegalArgumentException("Invalid expression! " + expression); + + // if nothing matches, try creating a custom counter + try { + counterValue = getCounterValue(counterValueStr, null); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + CustomCounterLimit customCounterLimit = new CustomCounterLimit(counterName, counterValue); + return createExpression(customCounterLimit); } private static long getCounterValue(final String counterValueStr, final Validator validator) throws diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 4938e2f..b45f53a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false); + false, 1, 1, partCols, dpCtx, null, null, false, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java index ce1dc6e..cd78545 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java @@ -80,6 +80,12 @@ public void testSimpleQueryTrigger() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); + + expression = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + trigger = new ExecutionTrigger("write_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(100000)); } @Test @@ -166,6 +172,12 @@ public void testExpressionFromString() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertEquals(expected, expression); assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_WRITE_OPS > 10000"); + expected = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); } @Test diff --git a/ql/src/test/queries/clientpositive/dp_counter_mm.q b/ql/src/test/queries/clientpositive/dp_counter_mm.q new file mode 100644 index 0000000..4f3b100 --- /dev/null +++ b/ql/src/test/queries/clientpositive/dp_counter_mm.q @@ -0,0 +1,52 @@ +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.exec.max.dynamic.partitions.pernode=200; +set hive.exec.max.dynamic.partitions=200; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +drop table src2; +create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +-- regular insert overwrite + insert into + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecTezSummaryPrinter; +insert overwrite table src2 partition (value) select * from src where key < 100; +insert into table src2 partition (value) select * from src where key < 200; + +drop table src2; +create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +insert overwrite table src2 partition (value) select * from src where key < 200; +insert into table src2 partition (value) select * from src where key < 300; + +-- multi insert overwrite + insert into + +drop table src2; +drop table src3; +create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +create table src3 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +from src +insert overwrite table src2 partition (value) select * where key < 100 +insert overwrite table src3 partition (value) select * where key >= 100 and key < 200; + +from src +insert into table src2 partition (value) select * where key < 100 +insert into table src3 partition (value) select * where key >= 100 and key < 300; + +-- union all insert overwrite + insert into + +drop table src2; +create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +insert overwrite table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 200) temps; + +insert into table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 300) temps; diff --git a/ql/src/test/queries/clientpositive/dp_counter_non_mm.q b/ql/src/test/queries/clientpositive/dp_counter_non_mm.q new file mode 100644 index 0000000..fc4660e --- /dev/null +++ b/ql/src/test/queries/clientpositive/dp_counter_non_mm.q @@ -0,0 +1,50 @@ +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.exec.max.dynamic.partitions.pernode=200; +set hive.exec.max.dynamic.partitions=200; + +drop table src2; +create table src2 (key int) partitioned by (value string); + +-- regular insert overwrite + insert into + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecTezSummaryPrinter; +insert overwrite table src2 partition (value) select * from src where key < 100; +insert into table src2 partition (value) select * from src where key < 200; + +drop table src2; +create table src2 (key int) partitioned by (value string); + +insert overwrite table src2 partition (value) select * from src where key < 200; +insert into table src2 partition (value) select * from src where key < 300; + +-- multi insert overwrite + insert into + +drop table src2; +drop table src3; +create table src2 (key int) partitioned by (value string); +create table src3 (key int) partitioned by (value string); + +from src +insert overwrite table src2 partition (value) select * where key < 100 +insert overwrite table src3 partition (value) select * where key >= 100 and key < 200; + +from src +insert into table src2 partition (value) select * where key < 100 +insert into table src3 partition (value) select * where key >= 100 and key < 300; + +-- union all insert overwrite + insert into + +drop table src2; +create table src2 (key int) partitioned by (value string); + +insert overwrite table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 200) temps; + +insert into table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 300) temps; diff --git a/ql/src/test/results/clientpositive/llap/dp_counter_mm.q.out b/ql/src/test/results/clientpositive/llap/dp_counter_mm.q.out new file mode 100644 index 0000000..662c02a --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/dp_counter_mm.q.out @@ -0,0 +1,148 @@ +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table src2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +POSTHOOK: query: create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@src2 +PREHOOK: query: insert overwrite table src2 partition (value) select * from src where key < 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 57 + CREATED_FILES: 57 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 84 +PREHOOK: query: insert into table src2 partition (value) select * from src where key < 200 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 64 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 189 +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +PREHOOK: query: create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +PREHOOK: query: insert overwrite table src2 partition (value) select * from src where key < 200 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 121 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 189 +PREHOOK: query: insert into table src2 partition (value) select * from src where key < 300 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 63 + CREATED_FILES: 184 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 292 +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +PREHOOK: query: drop table src3 +PREHOOK: type: DROPTABLE +PREHOOK: query: create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +PREHOOK: query: create table src3 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src3 +PREHOOK: query: from src +insert overwrite table src2 partition (value) select * where key < 100 +insert overwrite table src3 partition (value) select * where key >= 100 and key < 200 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +PREHOOK: Output: default@src3 +Stage-2 FILE SYSTEM COUNTERS: +Stage-2 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 121 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 84 + RECORDS_OUT_2_default.src3: 105 +PREHOOK: query: from src +insert into table src2 partition (value) select * where key < 100 +insert into table src3 partition (value) select * where key >= 100 and key < 300 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +PREHOOK: Output: default@src3 +Stage-2 FILE SYSTEM COUNTERS: +Stage-2 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 63 + CREATED_FILES: 184 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 84 + RECORDS_OUT_2_default.src3: 208 +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +PREHOOK: query: create table src2 (key int) partitioned by (value string) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +PREHOOK: query: insert overwrite table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 200) temps +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 121 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_IN_Map_3: 500 + RECORDS_OUT_1_default.src2: 189 +PREHOOK: query: insert into table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 300) temps +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 63 + CREATED_FILES: 184 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_IN_Map_3: 500 + RECORDS_OUT_1_default.src2: 292 diff --git a/ql/src/test/results/clientpositive/llap/dp_counter_non_mm.q.out b/ql/src/test/results/clientpositive/llap/dp_counter_non_mm.q.out new file mode 100644 index 0000000..85f9b9d --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/dp_counter_non_mm.q.out @@ -0,0 +1,148 @@ +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table src2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table src2 (key int) partitioned by (value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +POSTHOOK: query: create table src2 (key int) partitioned by (value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@src2 +PREHOOK: query: insert overwrite table src2 partition (value) select * from src where key < 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 57 + CREATED_FILES: 57 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 84 +PREHOOK: query: insert into table src2 partition (value) select * from src where key < 200 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 64 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 189 +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +PREHOOK: query: create table src2 (key int) partitioned by (value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +PREHOOK: query: insert overwrite table src2 partition (value) select * from src where key < 200 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 121 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 189 +PREHOOK: query: insert into table src2 partition (value) select * from src where key < 300 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 63 + CREATED_FILES: 184 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 292 +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +PREHOOK: query: drop table src3 +PREHOOK: type: DROPTABLE +PREHOOK: query: create table src2 (key int) partitioned by (value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +PREHOOK: query: create table src3 (key int) partitioned by (value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src3 +PREHOOK: query: from src +insert overwrite table src2 partition (value) select * where key < 100 +insert overwrite table src3 partition (value) select * where key >= 100 and key < 200 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +PREHOOK: Output: default@src3 +Stage-2 FILE SYSTEM COUNTERS: +Stage-2 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 121 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 84 + RECORDS_OUT_2_default.src3: 105 +PREHOOK: query: from src +insert into table src2 partition (value) select * where key < 100 +insert into table src3 partition (value) select * where key >= 100 and key < 300 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +PREHOOK: Output: default@src3 +Stage-2 FILE SYSTEM COUNTERS: +Stage-2 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 63 + CREATED_FILES: 184 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_OUT_1_default.src2: 84 + RECORDS_OUT_2_default.src3: 208 +PREHOOK: query: drop table src2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@src2 +PREHOOK: Output: default@src2 +PREHOOK: query: create table src2 (key int) partitioned by (value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@src2 +PREHOOK: query: insert overwrite table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 200) temps +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 121 + CREATED_FILES: 121 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_IN_Map_3: 500 + RECORDS_OUT_1_default.src2: 189 +PREHOOK: query: insert into table src2 partition (value) +select temps.* from ( + select * from src where key < 100 + union all + select * from src where key >= 100 and key < 300) temps +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@src2 +Stage-1 FILE SYSTEM COUNTERS: +Stage-1 HIVE COUNTERS: + CREATED_DYNAMIC_PARTITIONS: 63 + CREATED_FILES: 184 + DESERIALIZE_ERRORS: 0 + RECORDS_IN_Map_1: 500 + RECORDS_IN_Map_3: 500 + RECORDS_OUT_1_default.src2: 292