diff --git a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index 27b8504..ad41a0d 100644 --- a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -263,7 +263,16 @@ int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) { if (ss.getIsVerbose()) { out.println(cmd); } - + try { + CommandProcessor setProc = CommandProcessorFactory.get(new String[]{"set"}, (HiveConf) conf); + CommandProcessorResponse resSet = setProc.run("hive.client.type=" + HiveConf.ClientType.CLIDriver); + if (resSet.getResponseCode() != 0) { + ss.out.println("Query returned non-zero code: " + resSet.getResponseCode() + + ", cause: " + resSet.getErrorMessage()); + } + } catch (SQLException sqle) { + //ignore + } qp.setTryCount(tryCount); ret = qp.run(cmd).getResponseCode(); if (ret != 0) { diff --git a/cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java b/cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java index f9d3beb..1189546 100644 --- a/cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java +++ b/cli/src/java/org/apache/hadoop/hive/cli/CliSessionState.java @@ -88,6 +88,7 @@ public void connect() throws TTransportException { client = new HiveClient(protocol); transport.open(); remoteMode = true; + configureConnection(); } public void setHost(String host) { @@ -125,4 +126,12 @@ public HiveClient getClient() { return client; } + private void configureConnection() { + try { + client.execute("set hive.client.type = " + HiveConf.ClientType.CLIDriver); + } catch (Exception e) { + // do not block the work flow + } + } + } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 46f9f42..871916b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -186,6 +186,7 @@ PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + QUERYCLIENTTYPE("hive.client.type", ""), CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), @@ -551,6 +552,7 @@ HIVELIMITOPTENABLE("hive.limit.optimize.enable", false), HIVELIMITOPTMAXFETCH("hive.limit.optimize.fetch.max", 50000), HIVELIMITPUSHDOWNMEMORYUSAGE("hive.limit.pushdown.memory.usage", -1f), + HIVELIMITTABLESCANPARTITION("hive.limit.query.max.table.partition", -1), HIVEHASHTABLETHRESHOLD("hive.hashtable.initialCapacity", 100000), HIVEHASHTABLELOADFACTOR("hive.hashtable.loadfactor", (float) 0.75), @@ -1531,4 +1533,18 @@ private void setupRestrictList() { restrictList.add(ConfVars.HIVE_IN_TEST.varname); restrictList.add(ConfVars.HIVE_CONF_RESTRICTED_LIST.varname); } + + public static enum ClientType { + CLIDriver("cli"), + JDBC("jdbc"); + + private final String type; + ClientType(String type) { + this.type = type; + } + @Override + public String toString() { + return this.type; + } + } } diff --git a/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java index d08e05b..4e90c8c 100644 --- a/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hadoop/hive/jdbc/HiveConnection.java @@ -132,6 +132,8 @@ private void configureConnection() throws SQLException { Statement stmt = createStatement(); stmt.execute( "set hive.fetch.output.serde = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); + stmt.execute( + "set hive.client.type = " + HiveConf.ClientType.JDBC); stmt.close(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 76f5a31..c1b88d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -288,6 +289,24 @@ protected void optimizeTaskPlan(List> rootTasks, PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer( physicalContext, conf); physicalOptimizer.optimize(); + + // check partitions limit + if (HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITTABLESCANPARTITION) != -1) { + Iterator mrTasks = Utilities.getMRTasks(rootTasks).iterator(); + while (mrTasks.hasNext()) { + ExecDriver mrTask = mrTasks.next(); + Iterator> iter= getPartitionCounter(mrTask).entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITTABLESCANPARTITION) < entry.getValue()) { + throw new SemanticException("Number of partitions for table " + entry.getKey() + "(= " + + entry.getValue() + ") is larger than " + + HiveConf.ConfVars.HIVELIMITTABLESCANPARTITION + "(= " + + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITTABLESCANPARTITION) + ")"); + } + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index b569ed0..58381b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -21,8 +21,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashSet; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -54,9 +56,11 @@ import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.serde2.NullStructSerDe; /** * TaskCompiler is a the base class for classes that compile @@ -91,6 +95,19 @@ public void compile(final ParseContext pCtx, final List task, getLeafTasks(task.getDependentTasks(), leaves); } } + + /** + * Count the partitions for each table + */ + protected Map getPartitionCounter(ExecDriver mrTask) { + Map partitionCounter = new HashMap(); + for (Map.Entry> entry : mrTask.getWork().getMapWork().getPathToAliases().entrySet()) { + String path = entry.getKey(); + PartitionDesc partDesc = mrTask.getWork().getMapWork().getPathToPartitionInfo().get(path); + if (partDesc.getSerdeClassName() != null && partDesc.getSerdeClassName().equals(NullStructSerDe.class.getName())) { + // skip metadata only query + continue; + } + for (String alias : entry.getValue()){ + if (partitionCounter.containsKey(alias)) { + partitionCounter.put(alias, partitionCounter.get(alias) + 1); + } else { + partitionCounter.put(alias, 1); + } + } + } + return partitionCounter; + } /* * Called to transform tasks into local tasks where possible/desirable diff --git a/ql/src/test/queries/clientnegative/limit_partition.q b/ql/src/test/queries/clientnegative/limit_partition.q new file mode 100644 index 0000000..8e8dbd6 --- /dev/null +++ b/ql/src/test/queries/clientnegative/limit_partition.q @@ -0,0 +1,9 @@ +set hive.limit.query.max.table.partition=1; + +select * from srcpart limit 1; + +select distinct hr from srcpart; + +select * from srcpart where hr=11; + + diff --git a/ql/src/test/results/clientnegative/limit_partition.q.out b/ql/src/test/results/clientnegative/limit_partition.q.out new file mode 100644 index 0000000..bc7df7e --- /dev/null +++ b/ql/src/test/results/clientnegative/limit_partition.q.out @@ -0,0 +1,36 @@ +PREHOOK: query: select * from srcpart limit 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select * from srcpart limit 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +238 val_238 2008-04-08 11 +PREHOOK: query: select distinct hr from srcpart +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select distinct hr from srcpart +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +11 +12 +FAILED: SemanticException Number of partitions for table default.srcpart(= 2) is larger than hive.limit.query.max.table.partition(= 1)