Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1183507) +++ conf/hive-default.xml (working copy) @@ -1109,6 +1109,12 @@ + hive.index.compact.binary.search + false + Whether or not to use a binary search to find the entries in an index table that match the filter, where possible + + + hive.exim.uri.scheme.whitelist hdfs,pfile A comma separated list of acceptable URI schemes for import and export. Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1183507) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -389,6 +389,7 @@ HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES("hive.index.compact.query.max.entries", (long) 10000000), // 10M HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G + HIVE_INDEX_COMPACT_BINARY_SEARCH("hive.index.compact.binary.search", false), // Statistics HIVESTATSAUTOGATHER("hive.stats.autogather", true), Index: ql/src/test/results/clientpositive/index_compact_binary_search.q.out =================================================================== --- ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0) +++ ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0) @@ -0,0 +1,159 @@ +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +POSTHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: ALTERINDEX_REBUILD +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +POSTHOOK: query: ALTER INDEX src_index ON src REBUILD +POSTHOOK: type: ALTERINDEX_REBUILD +POSTHOOK: Input: default@src +POSTHOOK: Output: default@default__src_src_index__ +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT * FROM src WHERE key = '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-25_362_9037436441485749853/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key < '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-36_737_915702574002966361/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key <= '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-45_460_2820974923545239100/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key > '8' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-53_484_7730503182025797843/-mr-10000 +86 val_86 +98 val_98 +82 val_82 +92 val_92 +83 val_83 +84 val_84 +96 val_96 +95 val_95 +98 val_98 +85 val_85 +87 val_87 +90 val_90 +95 val_95 +80 val_80 +90 val_90 +83 val_83 +9 val_9 +97 val_97 +84 val_84 +90 val_90 +97 val_97 +PREHOOK: query: SELECT * FROM src WHERE key >= '9' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-01_644_2645522069983623952/-mr-10000 +98 val_98 +92 val_92 +96 val_96 +95 val_95 +98 val_98 +90 val_90 +95 val_95 +90 val_90 +9 val_9 +97 val_97 +90 val_90 +97 val_97 +PREHOOK: query: DROP INDEX src_index ON src +PREHOOK: type: DROPINDEX +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: ALTERINDEX_REBUILD +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +PREHOOK: query: SELECT * FROM src WHERE key = '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-16_817_2468472390975224229/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key < '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-25_022_2579452550595602420/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key <= '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-33_138_3044294309369249675/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key > '8' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-41_270_9170656333370391768/-mr-10000 +86 val_86 +98 val_98 +82 val_82 +92 val_92 +83 val_83 +84 val_84 +96 val_96 +95 val_95 +98 val_98 +85 val_85 +87 val_87 +90 val_90 +95 val_95 +80 val_80 +90 val_90 +83 val_83 +9 val_9 +97 val_97 +84 val_84 +90 val_90 +97 val_97 +PREHOOK: query: SELECT * FROM src WHERE key >= '9' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-49_478_2699176147331878951/-mr-10000 +98 val_98 +92 val_92 +96 val_96 +95 val_95 +98 val_98 +90 val_90 +95 val_95 +90 val_90 +9 val_9 +97 val_97 +90 val_90 +97 val_97 +PREHOOK: query: DROP INDEX src_index ON src +PREHOOK: type: DROPINDEX Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0) @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.io.Serializable; +import java.util.ArrayList; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; +import org.apache.hadoop.hive.ql.io.HiveSortedInputFormat; +import org.apache.hadoop.hive.ql.plan.MapredWork; + +public class VerifyHiveSortedInputFormatUsedHook implements ExecuteWithHookContext { + + public void run(HookContext hookContext) { + if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) { + + // Go through the root tasks, and verify the input format of the map reduce task(s) is + // HiveSortedInputFormat + ArrayList> rootTasks = + hookContext.getQueryPlan().getRootTasks(); + for (Task rootTask : rootTasks) { + if (rootTask.getWork() instanceof MapredWork) { + Assert.assertEquals("HiveSortedInputFormat was not used for the root map reduce task.", + ((MapredWork)rootTask.getWork()).getInputformat(), + HiveSortedInputFormat.class.getName()); + } + } + } + } +} Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0) @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.JobConf; +import org.mockito.InOrder; + +/** + * TestHiveBinarySearchRecordReader. + * + */ +public class TestHiveBinarySearchRecordReader extends TestCase { + + private RCFileRecordReader rcfReader; + private JobConf conf; + private TestHiveInputSplit hiveSplit; + private HiveBinarySearchRecordReader hbsReader; + private IOContext ioContext; + + private static class TestHiveInputSplit extends HiveInputSplit { + + @Override + public long getStart() { + return 0; + } + + @Override + public long getLength() { + return 100; + } + + @Override + public Path getPath() { + return new Path("/"); + } + } + + private void resetIOContext() { + ioContext = IOContext.get(); + ioContext.setUseSorted(false); + ioContext.setIsBinarySearching(false); + ioContext.setEndBinarySearch(false); + ioContext.setComparison(null); + ioContext.setGenericUDFClassName(null); + } + + private void init() throws IOException { + resetIOContext(); + rcfReader = mock(RCFileRecordReader.class); + when(rcfReader.next((LongWritable)anyObject(), + (BytesRefArrayWritable )anyObject())).thenReturn(true); + // Since the start is 0, and the length is 100, the first call to sync should be with the value + // 50 so return that for getPos() + when(rcfReader.getPos()).thenReturn(50L); + conf = new JobConf(); + hiveSplit = new TestHiveInputSplit(); + hbsReader = new HiveBinarySearchRecordReader(rcfReader, conf); + hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader); + } + + private boolean executeDoNext(HiveBinarySearchRecordReader hbsReader) throws IOException { + return hbsReader.doNext(hbsReader.createKey(), hbsReader.createValue()); + } + + public void testNonLinearGreaterThan() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(1); + when(rcfReader.getPos()).thenReturn(25L); + + // By setting the comparison to greater, the search should use the block [0, 50] + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(25); + } + + public void testNonLinearLessThan() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(-1); + when(rcfReader.getPos()).thenReturn(75L); + + // By setting the comparison to less, the search should use the block [50, 100] + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(75); + } + + public void testNonLinearEqualTo() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(0); + when(rcfReader.getPos()).thenReturn(25L); + + // By setting the comparison to equal, the search should use the block [0, 50] + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(25); + } + + public void testHitLastBlock() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(-1); + when(rcfReader.getPos()).thenReturn(100L); + + // When sync is called it will return 100, the value signaling the end of the file, this should + // result in a call to sync to the beginning of the block it was searching [50, 100], and it + // should continue normally + Assert.assertTrue(executeDoNext(hbsReader)); + InOrder inOrder = inOrder(rcfReader); + inOrder.verify(rcfReader).sync(75); + inOrder.verify(rcfReader).sync(50); + Assert.assertFalse(ioContext.isBinarySearching()); + } + + public void testHitSamePositionTwice() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(1); + + // When getPos is called it should return the same value, signaling the end of the search, so + // the search should continue linearly and it should sync to the beginning of the block [0, 50] + Assert.assertTrue(executeDoNext(hbsReader)); + InOrder inOrder = inOrder(rcfReader); + inOrder.verify(rcfReader).sync(25); + inOrder.verify(rcfReader).sync(0); + Assert.assertFalse(ioContext.isBinarySearching()); + } + + public void testResetRange() throws Exception { + init(); + InOrder inOrder = inOrder(rcfReader); + Assert.assertTrue(executeDoNext(hbsReader)); + inOrder.verify(rcfReader).sync(50); + + ioContext.setComparison(-1); + when(rcfReader.getPos()).thenReturn(75L); + + Assert.assertTrue(executeDoNext(hbsReader)); + inOrder.verify(rcfReader).sync(75); + + ioContext.setEndBinarySearch(true); + + // This should make the search linear, sync to the beginning of the block being searched + // [50, 100], set the comparison to be null, and the flag to reset the range should be unset + Assert.assertTrue(executeDoNext(hbsReader)); + inOrder.verify(rcfReader).sync(50); + Assert.assertFalse(ioContext.isBinarySearching()); + Assert.assertFalse(ioContext.shouldEndBinarySearch()); + } + + public void testEqualOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setIsBinarySearching(false); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertFalse(executeDoNext(hbsReader)); + } + + public void testLessThanOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPLessThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertFalse(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertFalse(executeDoNext(hbsReader)); + } + + public void testLessThanOrEqualOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPEqualOrLessThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertFalse(executeDoNext(hbsReader)); + } + + public void testGreaterThanOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setIsBinarySearching(false); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertTrue(executeDoNext(hbsReader)); + } + + public void testGreaterThanOrEqualOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setIsBinarySearching(false); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertTrue(executeDoNext(hbsReader)); + } + + public static void main(String[] args) throws Exception { + new TestHiveBinarySearchRecordReader().testNonLinearGreaterThan(); + new TestHiveBinarySearchRecordReader().testNonLinearLessThan(); + new TestHiveBinarySearchRecordReader().testNonLinearEqualTo(); + new TestHiveBinarySearchRecordReader().testHitLastBlock(); + new TestHiveBinarySearchRecordReader().testHitSamePositionTwice(); + new TestHiveBinarySearchRecordReader().testResetRange(); + new TestHiveBinarySearchRecordReader().testEqualOpClass(); + new TestHiveBinarySearchRecordReader().testLessThanOpClass(); + new TestHiveBinarySearchRecordReader().testLessThanOrEqualOpClass(); + new TestHiveBinarySearchRecordReader().testGreaterThanOpClass(); + new TestHiveBinarySearchRecordReader().testGreaterThanOrEqualOpClass(); + } +} Index: ql/src/test/queries/clientpositive/index_compact_binary_search.q =================================================================== --- ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0) +++ ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0) @@ -0,0 +1,45 @@ +SET hive.default.fileformat=TextFile; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.optimize.index.filter=true; +SET hive.optimize.index.filter.compact.minsize=1; +SET hive.index.compact.binary.search=true; + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook; + +SELECT * FROM src WHERE key = '0'; + +SELECT * FROM src WHERE key < '1'; + +SELECT * FROM src WHERE key <= '0'; + +SELECT * FROM src WHERE key > '8'; + +SELECT * FROM src WHERE key >= '9'; + +SET hive.exec.post.hooks=; + +DROP INDEX src_index ON src; + +SET hive.default.fileformat=RCFILE; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook; + +SELECT * FROM src WHERE key = '0'; + +SELECT * FROM src WHERE key < '1'; + +SELECT * FROM src WHERE key <= '0'; + +SELECT * FROM src WHERE key > '8'; + +SELECT * FROM src WHERE key >= '9'; + +SET hive.exec.post.hooks=; + +DROP INDEX src_index ON src; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; @@ -81,12 +82,12 @@ void evaluate() throws HiveException { obj = eval.evaluate(rowObject); } - + public Object get() throws HiveException { return obj; } } - + public ExprNodeGenericFuncEvaluator(ExprNodeGenericFuncDesc expr) { this.expr = expr; children = new ExprNodeEvaluator[expr.getChildExprs().size()]; @@ -152,4 +153,27 @@ return genericUDF.evaluate(deferredChildren); } + /** + * If the genericUDF is a base comparison, it returns an integer based on the result of comparing + * the two sides of the UDF, like the compareTo method in Comparable. + * + * If the genericUDF is not a base comparison, or there is an error executing the comparison, it + * returns null. + * @param row + * @return + * @throws HiveException + */ + public Integer compare(Object row) throws HiveException { + if (!(genericUDF instanceof GenericUDFBaseCompare)) { + return null; + } + + rowObject = row; + if (isEager) { + for (int i = 0; i < deferredChildren.length; i++) { + ((EagerExprObject) deferredChildren[i]).evaluate(); + } + } + return ((GenericUDFBaseCompare)genericUDF).compare(deferredChildren); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -49,6 +50,8 @@ private transient ExprNodeEvaluator conditionEvaluator; private transient PrimitiveObjectInspector conditionInspector; private transient int consecutiveFails; + private transient int consecutiveSearches; + private transient IOContext ioContext; transient int heartbeatInterval; public FilterOperator() { @@ -56,6 +59,7 @@ filtered_count = new LongWritable(); passed_count = new LongWritable(); consecutiveFails = 0; + consecutiveSearches = 0; } @Override @@ -67,6 +71,7 @@ statsMap.put(Counter.FILTERED, filtered_count); statsMap.put(Counter.PASSED, passed_count); conditionInspector = null; + ioContext = IOContext.get(); } catch (Throwable e) { throw new HiveException(e); } @@ -80,7 +85,43 @@ conditionInspector = (PrimitiveObjectInspector) conditionEvaluator .initialize(rowInspector); } + + // If the input is sorted, and we are executing a search based on the arguments to this filter, + // set the comparison in the IOContext and the type of the UDF + if (conf.isSortedFilter() && ioContext.useSorted()) { + if (!(conditionEvaluator instanceof ExprNodeGenericFuncEvaluator)) { + LOG.error("Attempted to use the fact data is sorted when the conditionEvaluator is not " + + "of type ExprNodeGenericFuncEvaluator"); + ioContext.setUseSorted(false); + return; + } else { + ioContext.setComparison(((ExprNodeGenericFuncEvaluator)conditionEvaluator).compare(row)); + } + + if (ioContext.getGenericUDFClassName() == null) { + ioContext.setGenericUDFClassName( + ((ExprNodeGenericFuncEvaluator)conditionEvaluator).genericUDF.getClass().getName()); + } + + // If we are currently searching the data for a place to begin, do not return data yet + if (ioContext.isBinarySearching()) { + consecutiveSearches++; + // In case we're searching through an espeially large set of data, send a heartbeat in + // order to avoid timeout + if (((consecutiveSearches % heartbeatInterval) == 0) && (reporter != null)) { + reporter.progress(); + } + return; + } + } + Object condition = conditionEvaluator.evaluate(row); + + // If we are currently peforming a binary search on the input, don't forward the results + if (ioContext.isBinarySearching()) { + return; + } + Boolean ret = (Boolean) conditionInspector .getPrimitiveJavaObject(condition); if (Boolean.TRUE.equals(ret)) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (working copy) @@ -68,6 +68,8 @@ private org.apache.hadoop.hive.ql.plan.ExprNodeDesc predicate; private boolean isSamplingPred; private transient sampleDesc sampleDescr; + // Is this a filter that should perform a comparison for sorted searches + private boolean isSortedFilter; public FilterDesc() { } @@ -116,4 +118,12 @@ this.sampleDescr = sampleDescr; } + public boolean isSortedFilter() { + return isSortedFilter; + } + + public void setSortedFilter(boolean isSortedFilter) { + this.isSortedFilter = isSortedFilter; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -43,12 +43,12 @@ import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VersionMismatchException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -61,7 +61,7 @@ * RCFiles, short of Record Columnar File, are flat files * consisting of binary key/value pairs, which shares much similarity with * SequenceFile. - * + * * RCFile stores columns of a table in a record columnar way. It first * partitions rows horizontally into row splits. and then it vertically * partitions each row split in a columnar way. RCFile first stores the meta @@ -75,7 +75,7 @@ * RCFile provides {@link Writer}, {@link Reader} and classes for * writing, reading respectively. *

- * + * *

* RCFile stores columns of a table in a record columnar way. It first * partitions rows horizontally into row splits. and then it vertically @@ -83,21 +83,21 @@ * data of a row split, as the key part of a record, and all the data of a row * split as the value part. *

- * + * *

* RCFile compresses values in a more fine-grained manner then record level * compression. However, It currently does not support compress the key part * yet. The actual compression algorithm used to compress key and/or values can * be specified by using the appropriate {@link CompressionCodec}. *

- * + * *

* The {@link Reader} is used to read and explain the bytes of RCFile. *

- * + * *

RCFile Formats

- * - * + * + * * * - * + * *
RCFile Format
* * * - * + * */ public class RCFile { @@ -177,7 +177,7 @@ /** * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as * below: - * + * *