diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java index 4eda17b..903cf06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java @@ -88,7 +88,7 @@ public class HiveIndexResult { bytesRef[0] = new BytesRefWritable(); bytesRef[1] = new BytesRefWritable(); - ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC); + ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC); if (indexFile != null) { Path indexFilePath = new Path(indexFile); @@ -104,12 +104,28 @@ public class HiveIndexResult { paths.add(indexFilePath); } + long lineCounter = 0; + long maxEntriesToLoad = Long.MAX_VALUE; + final String MAX_ENTRIES_PARAM_NAME = "hive.index.compact.query.max.entries"; + String maxEntriesToLoadStr = job.get(MAX_ENTRIES_PARAM_NAME); + if (maxEntriesToLoadStr != null) { + try { + maxEntriesToLoad = Long.parseLong(maxEntriesToLoadStr); + } catch (NumberFormatException e) { + throw new HiveException(MAX_ENTRIES_PARAM_NAME + " must be an integer", e); + } + } + for (Path indexFinalPath : paths) { FSDataInputStream ifile = fs.open(indexFinalPath); LineReader lr = new LineReader(ifile, conf); try { Text line = new Text(); while (lr.readLine(line) > 0) { + if (++lineCounter > maxEntriesToLoad) { + throw new HiveException("Number of index entries loaded during query exceeded the maximum of " + maxEntriesToLoad + + " set in " + MAX_ENTRIES_PARAM_NAME); + } add(line); } } @@ -140,7 +156,7 @@ public class HiveIndexResult { + line.toString()); } String bucketFileName = new String(bytes, 0, firstEnd); - + if (ignoreHdfsLoc) { Path tmpPath = new Path(bucketFileName); bucketFileName = tmpPath.toUri().getPath(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java index a3d74e2..e01e27d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java @@ -129,6 +129,19 @@ public class HiveIndexedInputFormat extends HiveInputFormat { ArrayList newSplits = new ArrayList( numSplits); + + long maxInputSize = Long.MAX_VALUE; + final String MAX_SIZE_PARAM_NAME = "hive.index.compact.query.max.bytes"; + String maxInputSizeStr = job.get(MAX_SIZE_PARAM_NAME); + if (maxInputSizeStr != null) { + try { + maxInputSize = Long.parseLong(maxInputSizeStr); + } catch (NumberFormatException e) { + throw new IOException(MAX_SIZE_PARAM_NAME + " must be an integer", e); + } + } + + long sumSplitLengths = 0; for (HiveInputSplit split : splits) { l4j.info("split start : " + split.getStart()); l4j.info("split end : " + (split.getStart() + split.getLength())); @@ -140,13 +153,19 @@ public class HiveIndexedInputFormat extends HiveInputFormat { if (split.inputFormatClassName().contains("RCFile") || split.inputFormatClassName().contains("SequenceFile")) { if (split.getStart() > SequenceFile.SYNC_INTERVAL) { - newSplit = new HiveInputSplit(new FileSplit(split.getPath(), split - .getStart() - - SequenceFile.SYNC_INTERVAL, split.getLength() - + SequenceFile.SYNC_INTERVAL, split.getLocations()), split - .inputFormatClassName()); + newSplit = new HiveInputSplit(new FileSplit(split.getPath(), + split.getStart() - SequenceFile.SYNC_INTERVAL, + split.getLength() + SequenceFile.SYNC_INTERVAL, + split.getLocations()), + split.inputFormatClassName()); } } + sumSplitLengths += newSplit.getLength(); + if (sumSplitLengths > maxInputSize) { + throw new RuntimeException( + "Size of data to read during an index-based query exceeded the maximum of " + + maxInputSize + " set in " + MAX_SIZE_PARAM_NAME); + } newSplits.add(newSplit); } } catch (HiveException e) { @@ -156,7 +175,7 @@ public class HiveIndexedInputFormat extends HiveInputFormat { } InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()])); l4j.info("Number of input splits: " + splits.length + " new input splits: " - + retA.length); + + retA.length + ", sum of split lengths: " + sumSplitLengths); return retA; } } diff --git a/ql/src/test/queries/clientnegative/index_compact_entry_limit.q b/ql/src/test/queries/clientnegative/index_compact_entry_limit.q new file mode 100644 index 0000000..eb49c22 --- /dev/null +++ b/ql/src/test/queries/clientnegative/index_compact_entry_limit.q @@ -0,0 +1,11 @@ +drop index src_index on src; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key<1000; +SET hive.index.compact.file=/tmp/index_result; +SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat; +SET hive.index.compact.query.max.entries=5; +SELECT key, value FROM src WHERE key=100 ORDER BY key; diff --git a/ql/src/test/queries/clientnegative/index_compact_size_limit.q b/ql/src/test/queries/clientnegative/index_compact_size_limit.q new file mode 100644 index 0000000..25e2486 --- /dev/null +++ b/ql/src/test/queries/clientnegative/index_compact_size_limit.q @@ -0,0 +1,11 @@ +drop index src_index on src; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key<1000; +SET hive.index.compact.file=/tmp/index_result; +SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat; +SET hive.index.compact.query.max.size=1048576; +SELECT key, value FROM src WHERE key=100 ORDER BY key; diff --git a/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out new file mode 100644 index 0000000..fcb2673 --- /dev/null +++ b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out @@ -0,0 +1,35 @@ +PREHOOK: query: drop index src_index on src +PREHOOK: type: DROPINDEX +POSTHOOK: query: drop index src_index on src +POSTHOOK: type: DROPINDEX +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +POSTHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +POSTHOOK: query: ALTER INDEX src_index ON src REBUILD +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@default__src_src_index__ +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Output: /tmp/index_result +POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__src_src_index__ +POSTHOOK: Output: /tmp/index_result +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT key, value FROM src WHERE key=100 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/wgaluba/hive_2011-05-09_20-23-26_023_6902661313242990836/-mr-10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask diff --git a/ql/src/test/results/clientnegative/index_compact_size_limit.q.out b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out new file mode 100644 index 0000000..fcb2673 --- /dev/null +++ b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out @@ -0,0 +1,35 @@ +PREHOOK: query: drop index src_index on src +PREHOOK: type: DROPINDEX +POSTHOOK: query: drop index src_index on src +POSTHOOK: type: DROPINDEX +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +POSTHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +POSTHOOK: query: ALTER INDEX src_index ON src REBUILD +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@default__src_src_index__ +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Output: /tmp/index_result +POSTHOOK: query: INSERT OVERWRITE DIRECTORY "/tmp/index_result" SELECT `_bucketname` , `_offsets` FROM default__src_src_index__ WHERE key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@default__src_src_index__ +POSTHOOK: Output: /tmp/index_result +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT key, value FROM src WHERE key=100 ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/tmp/wgaluba/hive_2011-05-09_20-23-26_023_6902661313242990836/-mr-10000 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask