Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1103989)
+++ conf/hive-default.xml (working copy)
@@ -1043,6 +1043,18 @@
+ hive.index.compact.query.max.size
+ 10737418240
+ The maximum number of bytes that a query using the compact index can read. Negative value is equivalent to infinity.
+
+
+
+ hive.index.compact.query.max.entries
+ 10000000
+ The maximum number of index entries to read during a query that uses the compact index. Negative value is equivalent to infinity.
+
+
+
hive.exim.uri.scheme.whitelist
hdfs,pfile
A comma separated list of acceptable URI schemes for import and export.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1103989)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -372,6 +372,8 @@
// Indexes
HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G
HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity
+ HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES("hive.index.compact.query.max.entries", (long) 10000000), // 10M
+ HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G
// Statistics
HIVESTATSAUTOGATHER("hive.stats.autogather", true),
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (revision 1103989)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (working copy)
@@ -88,7 +88,7 @@
bytesRef[0] = new BytesRefWritable();
bytesRef[1] = new BytesRefWritable();
- ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC);
+ ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC);
if (indexFile != null) {
Path indexFilePath = new Path(indexFile);
@@ -104,12 +104,22 @@
paths.add(indexFilePath);
}
+ long maxEntriesToLoad = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES);
+ if (maxEntriesToLoad < 0) {
+ maxEntriesToLoad=Long.MAX_VALUE;
+ }
+
+ long lineCounter = 0;
for (Path indexFinalPath : paths) {
FSDataInputStream ifile = fs.open(indexFinalPath);
LineReader lr = new LineReader(ifile, conf);
try {
Text line = new Text();
while (lr.readLine(line) > 0) {
+ if (++lineCounter > maxEntriesToLoad) {
+ throw new HiveException("Number of compact index entries loaded during the query exceeded the maximum of " + maxEntriesToLoad
+ + " set in " + HiveConf.ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES.varname);
+ }
add(line);
}
}
@@ -140,7 +150,7 @@
+ line.toString());
}
String bucketFileName = new String(bytes, 0, firstEnd);
-
+
if (ignoreHdfsLoc) {
Path tmpPath = new Path(bucketFileName);
bucketFileName = tmpPath.toUri().getPath();
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java (revision 1103989)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java (working copy)
@@ -26,6 +26,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -129,6 +131,13 @@
ArrayList newSplits = new ArrayList(
numSplits);
+
+ long maxInputSize = HiveConf.getLongVar(job, ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE);
+ if (maxInputSize < 0) {
+ maxInputSize=Long.MAX_VALUE;
+ }
+
+ long sumSplitLengths = 0;
for (HiveInputSplit split : splits) {
l4j.info("split start : " + split.getStart());
l4j.info("split end : " + (split.getStart() + split.getLength()));
@@ -140,13 +149,19 @@
if (split.inputFormatClassName().contains("RCFile")
|| split.inputFormatClassName().contains("SequenceFile")) {
if (split.getStart() > SequenceFile.SYNC_INTERVAL) {
- newSplit = new HiveInputSplit(new FileSplit(split.getPath(), split
- .getStart()
- - SequenceFile.SYNC_INTERVAL, split.getLength()
- + SequenceFile.SYNC_INTERVAL, split.getLocations()), split
- .inputFormatClassName());
+ newSplit = new HiveInputSplit(new FileSplit(split.getPath(),
+ split.getStart() - SequenceFile.SYNC_INTERVAL,
+ split.getLength() + SequenceFile.SYNC_INTERVAL,
+ split.getLocations()),
+ split.inputFormatClassName());
}
}
+ sumSplitLengths += newSplit.getLength();
+ if (sumSplitLengths > maxInputSize) {
+ throw new IOException(
+ "Size of data to read during a compact-index-based query exceeded the maximum of "
+ + maxInputSize + " set in " + ConfVars.HIVE_INDEX_COMPACT_QUERY_MAX_SIZE.varname);
+ }
newSplits.add(newSplit);
}
} catch (HiveException e) {
@@ -156,7 +171,7 @@
}
InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()]));
l4j.info("Number of input splits: " + splits.length + " new input splits: "
- + retA.length);
+ + retA.length + ", sum of split lengths: " + sumSplitLengths);
return retA;
}
}