Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1183507) +++ conf/hive-default.xml (working copy) @@ -1109,6 +1109,12 @@ + hive.index.compact.binary.search + true + Whether or not to use a binary search to find the entries in an index table that match the filter, where possible + + + hive.exim.uri.scheme.whitelist hdfs,pfile A comma separated list of acceptable URI schemes for import and export. Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1183507) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -389,6 +389,7 @@ HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES("hive.index.compact.query.max.entries", (long) 10000000), // 10M HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G + HIVE_INDEX_COMPACT_BINARY_SEARCH("hive.index.compact.binary.search", true), // Statistics HIVESTATSAUTOGATHER("hive.stats.autogather", true), Index: ql/src/test/results/clientpositive/index_compact_binary_search.q.out =================================================================== --- ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0) +++ ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0) @@ -0,0 +1,309 @@ +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +POSTHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: ALTERINDEX_REBUILD +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +POSTHOOK: query: ALTER INDEX src_index ON src REBUILD +POSTHOOK: type: ALTERINDEX_REBUILD +POSTHOOK: Input: default@src +POSTHOOK: Output: default@default__src_src_index__ +POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ] +POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ] +POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT * FROM src WHERE key = '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-42-04_761_7731970290214222034/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key < '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-42-13_023_7431452930107692937/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key <= '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-42-21_566_7819743051409628641/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key > '8' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-42-29_920_4735543588304747926/-mr-10000 +86 val_86 +98 val_98 +82 val_82 +92 val_92 +83 val_83 +84 val_84 +96 val_96 +95 val_95 +98 val_98 +85 val_85 +87 val_87 +90 val_90 +95 val_95 +80 val_80 +90 val_90 +83 val_83 +9 val_9 +97 val_97 +84 val_84 +90 val_90 +97 val_97 +PREHOOK: query: SELECT * FROM src WHERE key >= '9' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-42-38_292_5766598221099547271/-mr-10000 +98 val_98 +92 val_92 +96 val_96 +95 val_95 +98 val_98 +90 val_90 +95 val_95 +90 val_90 +9 val_9 +97 val_97 +90 val_90 +97 val_97 +PREHOOK: query: DROP INDEX src_index ON src +PREHOOK: type: DROPINDEX +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: ALTERINDEX_REBUILD +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +PREHOOK: query: SELECT * FROM src WHERE key = '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-42-53_913_1248080488549564593/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key < '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-43-02_670_5728156015596318760/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key <= '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-43-11_110_3777660166632364512/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key > '8' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-43-19_422_3815132344110447879/-mr-10000 +86 val_86 +98 val_98 +82 val_82 +92 val_92 +83 val_83 +84 val_84 +96 val_96 +95 val_95 +98 val_98 +85 val_85 +87 val_87 +90 val_90 +95 val_95 +80 val_80 +90 val_90 +83 val_83 +9 val_9 +97 val_97 +84 val_84 +90 val_90 +97 val_97 +PREHOOK: query: SELECT * FROM src WHERE key >= '9' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-43-27_745_1810121018618848191/-mr-10000 +98 val_98 +92 val_92 +96 val_96 +95 val_95 +98 val_98 +90 val_90 +95 val_95 +90 val_90 +9 val_9 +97 val_97 +90 val_90 +97 val_97 +PREHOOK: query: DROP INDEX src_index ON src +PREHOOK: type: DROPINDEX +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: ALTERINDEX_REBUILD +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +PREHOOK: query: SELECT * FROM src WHERE key = '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-43-44_507_2899250930696267103/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key < '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-43-54_286_8396514171386303619/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key <= '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-44-02_734_6190184343757298705/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key > '8' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-44-11_148_2585326215606833937/-mr-10000 +86 val_86 +98 val_98 +82 val_82 +92 val_92 +83 val_83 +84 val_84 +96 val_96 +95 val_95 +98 val_98 +85 val_85 +87 val_87 +90 val_90 +95 val_95 +80 val_80 +90 val_90 +83 val_83 +9 val_9 +97 val_97 +84 val_84 +90 val_90 +97 val_97 +PREHOOK: query: SELECT * FROM src WHERE key >= '9' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-44-19_864_141000770293215626/-mr-10000 +98 val_98 +92 val_92 +96 val_96 +95 val_95 +98 val_98 +90 val_90 +95 val_95 +90 val_90 +9 val_9 +97 val_97 +90 val_90 +97 val_97 +PREHOOK: query: DROP INDEX src_index ON src +PREHOOK: type: DROPINDEX +PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD +PREHOOK: type: CREATEINDEX +PREHOOK: query: ALTER INDEX src_index ON src REBUILD +PREHOOK: type: ALTERINDEX_REBUILD +PREHOOK: Input: default@src +PREHOOK: Output: default@default__src_src_index__ +PREHOOK: query: SELECT * FROM src WHERE key = '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-44-35_742_3458890365894267611/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key < '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-44-52_643_5976094484766190620/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key <= '0' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-45-01_866_7032915076645672295/-mr-10000 +0 val_0 +0 val_0 +0 val_0 +PREHOOK: query: SELECT * FROM src WHERE key > '8' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-45-10_335_2912899588229390594/-mr-10000 +86 val_86 +98 val_98 +82 val_82 +92 val_92 +83 val_83 +84 val_84 +96 val_96 +95 val_95 +98 val_98 +85 val_85 +87 val_87 +90 val_90 +95 val_95 +80 val_80 +90 val_90 +83 val_83 +9 val_9 +97 val_97 +84 val_84 +90 val_90 +97 val_97 +PREHOOK: query: SELECT * FROM src WHERE key >= '9' +PREHOOK: type: QUERY +PREHOOK: Input: default@default__src_src_index__ +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-01_16-45-18_601_5637752154471014245/-mr-10000 +98 val_98 +92 val_92 +96 val_96 +95 val_95 +98 val_98 +90 val_90 +95 val_95 +90 val_90 +9 val_9 +97 val_97 +90 val_90 +97 val_97 +PREHOOK: query: DROP INDEX src_index ON src +PREHOOK: type: DROPINDEX Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0) @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.io.Serializable; +import java.util.ArrayList; + +import junit.framework.Assert; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; +import org.apache.hadoop.hive.ql.plan.MapredWork; + +public class VerifyHiveSortedInputFormatUsedHook implements ExecuteWithHookContext { + + public void run(HookContext hookContext) { + if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) { + + // Go through the root tasks, and verify the input format of the map reduce task(s) is + // HiveSortedInputFormat + ArrayList> rootTasks = + hookContext.getQueryPlan().getRootTasks(); + for (Task rootTask : rootTasks) { + if (rootTask.getWork() instanceof MapredWork) { + Assert.assertTrue("The root map reduce task's input was not marked as sorted.", + ((MapredWork)rootTask.getWork()).isInputFormatSorted()); + } + } + } + } +} Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0) @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.mockito.InOrder; + +/** + * TestHiveBinarySearchRecordReader. + * + */ +public class TestHiveBinarySearchRecordReader extends TestCase { + + private RCFileRecordReader rcfReader; + private JobConf conf; + private TestHiveInputSplit hiveSplit; + private HiveBinarySearchRecordReader hbsReader; + private IOContext ioContext; + + private static class TestHiveInputSplit extends HiveInputSplit { + + @Override + public long getStart() { + return 0; + } + + @Override + public long getLength() { + return 100; + } + + @Override + public Path getPath() { + return new Path("/"); + } + } + + private static class TestHiveRecordReader + extends HiveBinarySearchRecordReader { + + public TestHiveRecordReader(RecordReader recordReader, JobConf conf) throws IOException { + super(recordReader, conf); + } + + @Override + public K createKey() { + return null; + } + + @Override + public V createValue() { + return null; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public boolean doNext(K key, V value) throws IOException { + return doSortedNext(key, value); + } + + @Override + public void doClose() throws IOException { + + } + + } + + private void resetIOContext() { + ioContext = IOContext.get(); + ioContext.setUseSorted(false); + ioContext.setIsBinarySearching(false); + ioContext.setEndBinarySearch(false); + ioContext.setComparison(null); + ioContext.setGenericUDFClassName(null); + } + + private void init() throws IOException { + resetIOContext(); + rcfReader = mock(RCFileRecordReader.class); + when(rcfReader.next((LongWritable)anyObject(), + (BytesRefArrayWritable )anyObject())).thenReturn(true); + // Since the start is 0, and the length is 100, the first call to sync should be with the value + // 50 so return that for getPos() + when(rcfReader.getPos()).thenReturn(50L); + conf = new JobConf(); + hiveSplit = new TestHiveInputSplit(); + hbsReader = new TestHiveRecordReader(rcfReader, conf); + hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader); + } + + private boolean executeDoNext(HiveBinarySearchRecordReader hbsReader) throws IOException { + return hbsReader.doNext(hbsReader.createKey(), hbsReader.createValue()); + } + + public void testNonLinearGreaterThan() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(1); + when(rcfReader.getPos()).thenReturn(25L); + + // By setting the comparison to greater, the search should use the block [0, 50] + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(25); + } + + public void testNonLinearLessThan() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(-1); + when(rcfReader.getPos()).thenReturn(75L); + + // By setting the comparison to less, the search should use the block [50, 100] + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(75); + } + + public void testNonLinearEqualTo() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(0); + when(rcfReader.getPos()).thenReturn(25L); + + // By setting the comparison to equal, the search should use the block [0, 50] + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(25); + } + + public void testHitLastBlock() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(-1); + when(rcfReader.getPos()).thenReturn(100L); + + // When sync is called it will return 100, the value signaling the end of the file, this should + // result in a call to sync to the beginning of the block it was searching [50, 100], and it + // should continue normally + Assert.assertTrue(executeDoNext(hbsReader)); + InOrder inOrder = inOrder(rcfReader); + inOrder.verify(rcfReader).sync(75); + inOrder.verify(rcfReader).sync(50); + Assert.assertFalse(ioContext.isBinarySearching()); + } + + public void testHitSamePositionTwice() throws Exception { + init(); + Assert.assertTrue(executeDoNext(hbsReader)); + verify(rcfReader).sync(50); + + ioContext.setComparison(1); + + // When getPos is called it should return the same value, signaling the end of the search, so + // the search should continue linearly and it should sync to the beginning of the block [0, 50] + Assert.assertTrue(executeDoNext(hbsReader)); + InOrder inOrder = inOrder(rcfReader); + inOrder.verify(rcfReader).sync(25); + inOrder.verify(rcfReader).sync(0); + Assert.assertFalse(ioContext.isBinarySearching()); + } + + public void testResetRange() throws Exception { + init(); + InOrder inOrder = inOrder(rcfReader); + Assert.assertTrue(executeDoNext(hbsReader)); + inOrder.verify(rcfReader).sync(50); + + ioContext.setComparison(-1); + when(rcfReader.getPos()).thenReturn(75L); + + Assert.assertTrue(executeDoNext(hbsReader)); + inOrder.verify(rcfReader).sync(75); + + ioContext.setEndBinarySearch(true); + + // This should make the search linear, sync to the beginning of the block being searched + // [50, 100], set the comparison to be null, and the flag to reset the range should be unset + Assert.assertTrue(executeDoNext(hbsReader)); + inOrder.verify(rcfReader).sync(50); + Assert.assertFalse(ioContext.isBinarySearching()); + Assert.assertFalse(ioContext.shouldEndBinarySearch()); + } + + public void testEqualOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setIsBinarySearching(false); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertFalse(executeDoNext(hbsReader)); + } + + public void testLessThanOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPLessThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertFalse(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertFalse(executeDoNext(hbsReader)); + } + + public void testLessThanOrEqualOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPEqualOrLessThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertFalse(executeDoNext(hbsReader)); + } + + public void testGreaterThanOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setIsBinarySearching(false); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertTrue(executeDoNext(hbsReader)); + } + + public void testGreaterThanOrEqualOpClass() throws Exception { + init(); + ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName()); + Assert.assertTrue(ioContext.isBinarySearching()); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setIsBinarySearching(false); + ioContext.setComparison(-1); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(0); + Assert.assertTrue(executeDoNext(hbsReader)); + ioContext.setComparison(1); + Assert.assertTrue(executeDoNext(hbsReader)); + } + + public static void main(String[] args) throws Exception { + new TestHiveBinarySearchRecordReader().testNonLinearGreaterThan(); + new TestHiveBinarySearchRecordReader().testNonLinearLessThan(); + new TestHiveBinarySearchRecordReader().testNonLinearEqualTo(); + new TestHiveBinarySearchRecordReader().testHitLastBlock(); + new TestHiveBinarySearchRecordReader().testHitSamePositionTwice(); + new TestHiveBinarySearchRecordReader().testResetRange(); + new TestHiveBinarySearchRecordReader().testEqualOpClass(); + new TestHiveBinarySearchRecordReader().testLessThanOpClass(); + new TestHiveBinarySearchRecordReader().testLessThanOrEqualOpClass(); + new TestHiveBinarySearchRecordReader().testGreaterThanOpClass(); + new TestHiveBinarySearchRecordReader().testGreaterThanOrEqualOpClass(); + } +} Index: ql/src/test/queries/clientpositive/index_compact_binary_search.q =================================================================== --- ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0) +++ ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0) @@ -0,0 +1,89 @@ +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET hive.default.fileformat=TextFile; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.optimize.index.filter=true; +SET hive.optimize.index.filter.compact.minsize=1; +SET hive.index.compact.binary.search=true; + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook; + +SELECT * FROM src WHERE key = '0'; + +SELECT * FROM src WHERE key < '1'; + +SELECT * FROM src WHERE key <= '0'; + +SELECT * FROM src WHERE key > '8'; + +SELECT * FROM src WHERE key >= '9'; + +SET hive.exec.post.hooks=; + +DROP INDEX src_index ON src; + +SET hive.default.fileformat=RCFILE; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook; + +SELECT * FROM src WHERE key = '0'; + +SELECT * FROM src WHERE key < '1'; + +SELECT * FROM src WHERE key <= '0'; + +SELECT * FROM src WHERE key > '8'; + +SELECT * FROM src WHERE key >= '9'; + +SET hive.exec.post.hooks=; + +DROP INDEX src_index ON src; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +SET hive.default.fileformat=TextFile; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook; + +SELECT * FROM src WHERE key = '0'; + +SELECT * FROM src WHERE key < '1'; + +SELECT * FROM src WHERE key <= '0'; + +SELECT * FROM src WHERE key > '8'; + +SELECT * FROM src WHERE key >= '9'; + +SET hive.exec.post.hooks=; + +DROP INDEX src_index ON src; + +SET hive.default.fileformat=RCFILE; + +CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD; +ALTER INDEX src_index ON src REBUILD; + +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook; + +SELECT * FROM src WHERE key = '0'; + +SELECT * FROM src WHERE key < '1'; + +SELECT * FROM src WHERE key <= '0'; + +SELECT * FROM src WHERE key > '8'; + +SELECT * FROM src WHERE key >= '9'; + +SET hive.exec.post.hooks=; + +DROP INDEX src_index ON src; \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; @@ -81,12 +82,12 @@ void evaluate() throws HiveException { obj = eval.evaluate(rowObject); } - + public Object get() throws HiveException { return obj; } } - + public ExprNodeGenericFuncEvaluator(ExprNodeGenericFuncDesc expr) { this.expr = expr; children = new ExprNodeEvaluator[expr.getChildExprs().size()]; @@ -152,4 +153,27 @@ return genericUDF.evaluate(deferredChildren); } + /** + * If the genericUDF is a base comparison, it returns an integer based on the result of comparing + * the two sides of the UDF, like the compareTo method in Comparable. + * + * If the genericUDF is not a base comparison, or there is an error executing the comparison, it + * returns null. + * @param row + * @return + * @throws HiveException + */ + public Integer compare(Object row) throws HiveException { + if (!(genericUDF instanceof GenericUDFBaseCompare)) { + return null; + } + + rowObject = row; + if (isEager) { + for (int i = 0; i < deferredChildren.length; i++) { + ((EagerExprObject) deferredChildren[i]).evaluate(); + } + } + return ((GenericUDFBaseCompare)genericUDF).compare(deferredChildren); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -526,6 +526,9 @@ conf.set("hive.index.compact.file", work.getIndexIntermediateFile()); conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile()); } + + // Intentionally overwrites anything the user may have put here + conf.setBoolean("hive.input.format.sorted", work.isInputFormatSorted()); } public boolean mapStarted() { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -49,6 +50,8 @@ private transient ExprNodeEvaluator conditionEvaluator; private transient PrimitiveObjectInspector conditionInspector; private transient int consecutiveFails; + private transient int consecutiveSearches; + private transient IOContext ioContext; transient int heartbeatInterval; public FilterOperator() { @@ -56,6 +59,7 @@ filtered_count = new LongWritable(); passed_count = new LongWritable(); consecutiveFails = 0; + consecutiveSearches = 0; } @Override @@ -67,6 +71,7 @@ statsMap.put(Counter.FILTERED, filtered_count); statsMap.put(Counter.PASSED, passed_count); conditionInspector = null; + ioContext = IOContext.get(); } catch (Throwable e) { throw new HiveException(e); } @@ -80,7 +85,47 @@ conditionInspector = (PrimitiveObjectInspector) conditionEvaluator .initialize(rowInspector); } + + // If the input is sorted, and we are executing a search based on the arguments to this filter, + // set the comparison in the IOContext and the type of the UDF + if (conf.isSortedFilter() && ioContext.useSorted()) { + if (!(conditionEvaluator instanceof ExprNodeGenericFuncEvaluator)) { + LOG.error("Attempted to use the fact data is sorted when the conditionEvaluator is not " + + "of type ExprNodeGenericFuncEvaluator"); + ioContext.setUseSorted(false); + return; + } else { + ioContext.setComparison(((ExprNodeGenericFuncEvaluator)conditionEvaluator).compare(row)); + } + + if (ioContext.getGenericUDFClassName() == null) { + ioContext.setGenericUDFClassName( + ((ExprNodeGenericFuncEvaluator)conditionEvaluator).genericUDF.getClass().getName()); + } + + // If we are currently searching the data for a place to begin, do not return data yet + if (ioContext.isBinarySearching()) { + consecutiveSearches++; + // In case we're searching through an especially large set of data, send a heartbeat in + // order to avoid timeout + if (((consecutiveSearches % heartbeatInterval) == 0) && (reporter != null)) { + reporter.progress(); + } + return; + } + } + Object condition = conditionEvaluator.evaluate(row); + + // If we are currently performing a binary search on the input, don't forward the results + // Currently this value is set when a query is optimized using a compact index. The map reduce + // job responsible for scanning and filtering the index sets this value. It remains set + // throughout the binary search executed by the HiveBinarySearchRecordResder until a starting + // point for a linear scan has been identified, at which point this value is unset. + if (ioContext.isBinarySearching()) { + return; + } + Boolean ret = (Boolean) conditionInspector .getPrimitiveJavaObject(condition); if (Boolean.TRUE.equals(ret)) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (working copy) @@ -68,6 +68,8 @@ private org.apache.hadoop.hive.ql.plan.ExprNodeDesc predicate; private boolean isSamplingPred; private transient sampleDesc sampleDescr; + // Is this a filter that should perform a comparison for sorted searches + private boolean isSortedFilter; public FilterDesc() { } @@ -116,4 +118,12 @@ this.sampleDescr = sampleDescr; } + public boolean isSortedFilter() { + return isSortedFilter; + } + + public void setSortedFilter(boolean isSortedFilter) { + this.isSortedFilter = isSortedFilter; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -87,6 +87,9 @@ private boolean mapperCannotSpanPartns; + // used to indicate the input is sorted, and so a BinarySearchRecordReader shoudl be used + private boolean inputFormatSorted = false; + public MapredWork() { aliasToPartnInfo = new LinkedHashMap(); } @@ -437,6 +440,14 @@ this.opParseCtxMap = opParseCtxMap; } + public boolean isInputFormatSorted() { + return inputFormatSorted; + } + + public void setInputFormatSorted(boolean inputFormatSorted) { + this.inputFormatSorted = inputFormatSorted; + } + public void resolveDynamicPartitionMerge(HiveConf conf, Path path, TableDesc tblDesc, ArrayList aliases, PartitionDesc partDesc) { pathToAliases.put(path.toString(), aliases); Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -43,12 +43,12 @@ import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VersionMismatchException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -61,7 +61,7 @@ * RCFiles, short of Record Columnar File, are flat files * consisting of binary key/value pairs, which shares much similarity with * SequenceFile. - * + * * RCFile stores columns of a table in a record columnar way. It first * partitions rows horizontally into row splits. and then it vertically * partitions each row split in a columnar way. RCFile first stores the meta @@ -75,7 +75,7 @@ * RCFile provides {@link Writer}, {@link Reader} and classes for * writing, reading respectively. *

- * + * *

* RCFile stores columns of a table in a record columnar way. It first * partitions rows horizontally into row splits. and then it vertically @@ -83,21 +83,21 @@ * data of a row split, as the key part of a record, and all the data of a row * split as the value part. *

- * + * *

* RCFile compresses values in a more fine-grained manner then record level * compression. However, It currently does not support compress the key part * yet. The actual compression algorithm used to compress key and/or values can * be specified by using the appropriate {@link CompressionCodec}. *

- * + * *

* The {@link Reader} is used to read and explain the bytes of RCFile. *

- * + * *

RCFile Formats

- * - * + * + * * *
    *
  • version - 3 bytes of magic header SEQ, followed by 1 byte of @@ -113,7 +113,7 @@ *
  • metadata - {@link Metadata} for this file.
  • *
  • sync - A sync marker to denote end of the header.
  • *
- * + * *
RCFile Format
*
    *
  • Header
  • @@ -143,7 +143,7 @@ *
* * - * + * */ public class RCFile { @@ -177,7 +177,7 @@ /** * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as * below: - * + * *
    *
  • record length in bytes,it is the sum of bytes used to store the key * part and the value part.
  • @@ -204,7 +204,7 @@ private int numberRows = 0; // how many columns private int columnNumber = 0; - + // return the number of columns recorded in this file's header public int getColumnNumber() { return columnNumber; @@ -227,7 +227,7 @@ /** * add in a new column's meta data. - * + * * @param columnValueLen * this total bytes number of this column's values in this split * @param colValLenBuffer @@ -277,7 +277,7 @@ /** * get number of bytes to store the keyBuffer. - * + * * @return number of bytes used to store this KeyBuffer on disk * @throws IOException */ @@ -370,7 +370,7 @@ Decompressor valDecompressor = null; NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer(); CompressionInputStream deflatFilter = null; - + public ValueBuffer() throws IOException { } @@ -522,7 +522,7 @@ CodecPool.returnDecompressor(valDecompressor); } } - + @Override public int compareTo(Object arg0) { throw new RuntimeException("compareTo not supported in class " @@ -533,7 +533,7 @@ /** * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is * compatible with SequenceFile's. - * + * */ public static class Writer { @@ -664,7 +664,7 @@ /** * Constructs a RCFile Writer. - * + * * @param fs * the file system used * @param conf @@ -680,7 +680,7 @@ /** * Constructs a RCFile Writer. - * + * * @param fs * the file system used * @param conf @@ -699,9 +699,9 @@ } /** - * + * * Constructs a RCFile Writer. - * + * * @param fs * the file system used * @param conf @@ -837,7 +837,7 @@ * column number in the file, zero bytes are appended for the empty columns. * If its size() is greater then the column number in the file, the exceeded * columns' bytes are ignored. - * + * * @param val * @throws IOException */ @@ -936,7 +936,7 @@ bufferedRecords = 0; columnBufferSize = 0; } - + /** * flush a block out without doing anything except compressing the key part. */ @@ -945,7 +945,7 @@ checkAndWriteSync(); // sync out.writeInt(recordLen); // total record length out.writeInt(keyLength); // key portion length - + if(this.isCompressed()) { //compress key and write key out keyCompressionBuffer.reset(); @@ -1001,7 +1001,7 @@ /** * Read KeyBuffer/ValueBuffer pairs from a RCFile. - * + * */ public static class Reader { private static class SelectedColumn { @@ -1032,7 +1032,7 @@ private final ValueBuffer currentValue; - private boolean[] skippedColIDs = null; + private final boolean[] skippedColIDs = null; private int readRowsIndexInBuffer = 0; @@ -1242,7 +1242,7 @@ /** * Set the current byte position in the input file. - * + * *

    * The position passed must be a position returned by * {@link RCFile.Writer#getLength()} when writing this file. To seek to an @@ -1254,6 +1254,18 @@ in.seek(position); } + /** + * Resets the values which determine if there are more rows in the buffer + * + * This can be used after one calls seek or sync, if one called next before that. + * Otherwise, the seek or sync will have no effect, it will continue to get rows from the + * buffer built up from the call to next. + */ + public synchronized void resetBuffer() { + readRowsIndexInBuffer = 0; + recordsNumInValBuffer = 0; + } + /** Seek to the next sync mark past a given position. */ public synchronized void sync(long position) throws IOException { if (position + SYNC_SIZE >= end) { @@ -1309,7 +1321,7 @@ /** * Read and return the next record length, potentially skipping over a sync * block. - * + * * @return the length of the next record or -1 if there is no next record * @throws IOException */ @@ -1407,7 +1419,7 @@ currentValue.readFields(in); currentValue.inited = true; } - + public boolean nextBlock() throws IOException { int keyLength = nextKeyBuffer(); if(keyLength > 0) { @@ -1430,7 +1442,7 @@ * Calling getColumn() with not change the result of * {@link #next(LongWritable)} and * {@link #getCurrentRow(BytesRefArrayWritable)}. - * + * * @param columnID * @throws IOException */ @@ -1459,7 +1471,7 @@ ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null; boolean decompressed = currentValue.decompressedFlag[selColIdx]; if (decompressed) { - uncompData = + uncompData = currentValue.loadedColumnsValueBuffer[selColIdx].getData(); } else { decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx]; @@ -1485,7 +1497,7 @@ * current value buffer. It will influence the result of * {@link #next(LongWritable)} and * {@link #getCurrentRow(BytesRefArrayWritable)} - * + * * @return whether there still has records or not * @throws IOException */ @@ -1500,7 +1512,7 @@ * of rows passed by, because {@link #seek(long)}, * {@link #nextColumnsBatch()} can change the underlying key buffer and * value buffer. - * + * * @return next row number * @throws IOException */ @@ -1567,7 +1579,7 @@ /** * get the current row used,make sure called {@link #next(LongWritable)} * first. - * + * * @throws IOException */ public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException { @@ -1596,11 +1608,11 @@ for (int j = 0; j < selectedColumns.length; ++j) { SelectedColumn col = selectedColumns[j]; int i = col.colIndex; - + BytesRefWritable ref = ret.unCheckedGet(i); - + colAdvanceRow(j, col); - + if (currentValue.decompressedFlag[j]) { ref.set(currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength); @@ -1611,14 +1623,14 @@ col.rowReadIndex += col.prvLength; } } else { - // This version of the loop eliminates a condition check and branch + // This version of the loop eliminates a condition check and branch // and is measurably faster (20% or so) for (int j = 0; j < selectedColumns.length; ++j) { SelectedColumn col = selectedColumns[j]; int i = col.colIndex; - + BytesRefWritable ref = ret.unCheckedGet(i); - + colAdvanceRow(j, col); ref.set(currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength); @@ -1666,7 +1678,7 @@ public String toString() { return file.toString(); } - + public boolean isCompressedRCFile() { return this.decompress; } @@ -1689,7 +1701,7 @@ public KeyBuffer getCurrentKeyBufferObj() { return this.currentKey; } - + /** * return the ValueBuffer object used in the reader. Internally in each * reader, there is only one ValueBuffer object, which gets reused for every @@ -1698,7 +1710,7 @@ public ValueBuffer getCurrentValueBufferObj() { return this.currentValue; } - + //return the current block's length public int getCurrentBlockLength() { return this.currentRecordLength; @@ -1713,11 +1725,11 @@ public int getCurrentCompressedKeyLen() { return this.compressedKeyLen; } - + //return the CompressionCodec used for this file public CompressionCodec getCompressionCodec() { return this.codec; } - + } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (working copy) @@ -129,6 +129,14 @@ in.seek(pos); } + public void sync(long pos) throws IOException { + in.sync(pos); + } + + public void resetBuffer() { + in.resetBuffer(); + } + public long getStart() { return start; } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinarySearchRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinarySearchRecordReader.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinarySearchRecordReader.java (revision 0) @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.hive.ql.exec.ExecMapper; +import org.apache.hadoop.hive.ql.io.IOContext.Comparison; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +/** + * HiveBinarySearchRecordReader is a wrapper on RecordReader. It should only be used if the data + * is going to a FilterOperator, which filters by comparing a value in the data with a constant, + * using one of the comparisons =, <, >, <=, >=, and the data is sorted on that value in ascending + * order. If the RecordReader's underlying format is an RCFile, this object can perform a binary + * search to find the block to begin reading from, and stop reading once it can be determined no + * other entries will match the filter. + */ +public abstract class HiveBinarySearchRecordReader + extends HiveContextAwareRecordReader { + + private static final Log LOG = LogFactory.getLog(HiveBinarySearchRecordReader.class.getName()); + + private long rangeStart; + private long rangeEnd; + private long splitEnd; + private long previousPosition = -1; + private boolean wasUsingSortedSearch = false; + private String genericUDFClassName = null; + private final List stopComparisons = new ArrayList(); + + protected RecordReader recordReader; + protected JobConf jobConf; + protected boolean isSorted = false; + + public HiveBinarySearchRecordReader(JobConf conf) throws IOException { + this(null, conf); + } + + public HiveBinarySearchRecordReader(RecordReader recordReader) { + this.recordReader = recordReader; + } + + public HiveBinarySearchRecordReader(RecordReader recordReader, JobConf conf) + throws IOException { + this.recordReader = recordReader; + this.jobConf = conf; + + this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false); + } + + public void setRecordReader(RecordReader recordReader) { + this.recordReader = recordReader; + + this.isSorted = false; + } + + @Override + public float getProgress() throws IOException { + if (this.getIOContext().isBinarySearching()) { + return 0; + } else { + return recordReader.getProgress(); + } + } + + public boolean doSortedNext(K key, V value) throws IOException { + if (ExecMapper.getDone()) { + return false; + } + + if (this.getIOContext().shouldEndBinarySearch() || + (!this.getIOContext().useSorted() && this.wasUsingSortedSearch)) { + beginLinearSearch(); + this.wasUsingSortedSearch = false; + this.getIOContext().setEndBinarySearch(false); + } + + if (this.getIOContext().useSorted()) { + if (this.genericUDFClassName == null && + this.getIOContext().getGenericUDFClassName() != null) { + setGenericUDFClassName(this.getIOContext().getGenericUDFClassName()); + } + + if (this.getIOContext().isBinarySearching()) { + // Proceed with a binary search + if (this.getIOContext().getComparison() != null) { + switch (this.getIOContext().getComparison()) { + case GREATER: + case EQUAL: + // Indexes have only one entry per value, could go linear from here, if we want to + // use this for any sorted table, we'll need to continue the search + rangeEnd = previousPosition; + break; + case LESS: + rangeStart = previousPosition; + break; + default: + break; + } + } + + long position = (rangeStart + rangeEnd) / 2; + sync(position); + + long newPosition = getSyncedPosition(); + // If the newPosition is the same as the previousPosition, we've reached the end of the + // binary search, if the new position at least as big as the size of the split, any + // matching rows must be in the final block, so we can end the binary search. + if (newPosition == previousPosition || newPosition >= splitEnd) { + this.getIOContext().setIsBinarySearching(false); + sync(rangeStart); + } + + previousPosition = newPosition; + } else if (foundAllTargets()) { + // Found all possible rows which will not be filtered + return false; + } + } + + try { + return recordReader.next(key, value); + } catch (Exception e) { + return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf); + } + } + + @Override + public void initIOContext(FileSplit split, JobConf job, + Class inputFormatClass, RecordReader recordReader) throws IOException { + // By default, don't use a binary search. + this.getIOContext().resetSortingValues(); + + super.initIOContext(split, job, inputFormatClass, recordReader); + + this.rangeStart = split.getStart(); + this.rangeEnd = split.getStart() + split.getLength(); + this.splitEnd = rangeEnd; + if (recordReader instanceof RCFileRecordReader && rangeEnd != 0 && this.isSorted) { + // Binary search only works if we know the size of the split, and the recordReader is an + // RCFileRecordReader + this.getIOContext().setUseSorted(true); + this.getIOContext().setIsBinarySearching(true); + this.wasUsingSortedSearch = true; + } else { + // Use the defalut methods for next in the child class + this.isSorted = false; + } + } + + private void sync(long position) throws IOException { + ((RCFileRecordReader)recordReader).sync(position); + ((RCFileRecordReader)recordReader).resetBuffer(); + } + + private long getSyncedPosition() throws IOException { + return recordReader.getPos(); + } + /** + * This uses the name of the generic UDF being used by the filter to determine whether we should + * perform a binary search, and what the comparisons we should use to signal the end of the + * linear scan are. + * @param genericUDFClassName + * @throws IOException + */ + private void setGenericUDFClassName(String genericUDFClassName) throws IOException { + this.genericUDFClassName = genericUDFClassName; + if (genericUDFClassName.equals(GenericUDFOPEqual.class.getName())) { + stopComparisons.add(Comparison.GREATER); + } else if (genericUDFClassName.equals(GenericUDFOPLessThan.class.getName())) { + stopComparisons.add(Comparison.EQUAL); + stopComparisons.add(Comparison.GREATER); + if (this.getIOContext().isBinarySearching()) { + beginLinearSearch(); + } + } else if (genericUDFClassName.equals(GenericUDFOPEqualOrLessThan.class.getName())) { + stopComparisons.add(Comparison.GREATER); + if (this.getIOContext().isBinarySearching()) { + beginLinearSearch(); + } + } else if (genericUDFClassName.equals(GenericUDFOPGreaterThan.class.getName()) || + genericUDFClassName.equals(GenericUDFOPEqualOrGreaterThan.class.getName())) { + // Do nothing + } else { + // This is an unsupported operator + LOG.debug(genericUDFClassName + " is not the name of a supported class. " + + "Continuing linearly."); + if (this.getIOContext().isBinarySearching()) { + beginLinearSearch(); + } + } + } + + /** + * This should be called after the binary search is finished and before the linear scan begins + * @throws IOException + */ + private void beginLinearSearch() throws IOException { + sync(rangeStart); + this.getIOContext().setIsBinarySearching(false); + this.wasUsingSortedSearch = false; + } + + /** + * Returns true if the current comparison is in the list of stop comparisons, i.e. we've found + * all records which won't be filtered + * @return + */ + public boolean foundAllTargets() { + if (this.getIOContext().getComparison() == null || + !stopComparisons.contains(this.getIOContext().getComparison())) { + return false; + } + + return true; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (working copy) @@ -30,7 +30,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; /** @@ -40,14 +39,12 @@ * @param */ public class CombineHiveRecordReader - extends HiveContextAwareRecordReader { + extends HiveBinarySearchRecordReader { - private final RecordReader recordReader; - public CombineHiveRecordReader(InputSplit split, Configuration conf, Reporter reporter, Integer partition) throws IOException { - JobConf job = (JobConf) conf; - CombineHiveInputSplit hsplit = new CombineHiveInputSplit(job, + super((JobConf)conf); + CombineHiveInputSplit hsplit = new CombineHiveInputSplit(jobConf, (InputSplitShim) split); String inputFormatClassName = hsplit.inputFormatClassName(); Class inputFormatClass = null; @@ -58,15 +55,16 @@ + inputFormatClassName); } InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache( - inputFormatClass, job); + inputFormatClass, jobConf); // create a split for the given partition FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit .getLocations()); - this.recordReader = inputFormat.getRecordReader(fsplit, job, reporter); - this.initIOContext(fsplit, job, inputFormatClass, this.recordReader); + this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, reporter)); + + this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader); } @Override @@ -86,7 +84,12 @@ return recordReader.getPos(); } + @Override public float getProgress() throws IOException { + if (isSorted) { + return super.getProgress(); + } + return recordReader.getProgress(); } @@ -95,6 +98,10 @@ if (ExecMapper.getDone()) { return false; } - return recordReader.next(key, value); + if (this.isSorted) { + return doSortedNext(key, value); + } else { + return recordReader.next(key, value); + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (working copy) @@ -45,6 +45,24 @@ boolean isBlockPointer; boolean ioExceptions; + // Are we using the fact the input is sorted + boolean useSorted = false; + // Are we currently performing a binary search + boolean isBinarySearching = false; + // Do we want to end the binary search + boolean endBinarySearch = false; + // The result of the comparison of the last row processed + Comparison comparison = null; + // The class name of the generic UDF being used by the filter + String genericUDFClassName = null; + + public static enum Comparison { + GREATER, + LESS, + EQUAL, + UNKNOWN + } + String inputFile; public IOContext() { @@ -102,4 +120,69 @@ public boolean getIOExceptions() { return ioExceptions; } + + public boolean useSorted() { + return useSorted; + } + + public void setUseSorted(boolean useSorted) { + this.useSorted = useSorted; + } + + public boolean isBinarySearching() { + return isBinarySearching; + } + + public void setIsBinarySearching(boolean isBinarySearching) { + this.isBinarySearching = isBinarySearching; + } + + public boolean shouldEndBinarySearch() { + return endBinarySearch; + } + + public void setEndBinarySearch(boolean endBinarySearch) { + this.endBinarySearch = endBinarySearch; + } + + public Comparison getComparison() { + return comparison; + } + + public void setComparison(Integer comparison) { + if (comparison == null && this.isBinarySearching) { + // Nothing we can do here, so just proceed normally from now on + endBinarySearch = true; + } else { + if (comparison == null) { + this.comparison = Comparison.UNKNOWN; + } else if (comparison.intValue() < 0) { + this.comparison = Comparison.LESS; + } else if (comparison.intValue() > 0) { + this.comparison = Comparison.GREATER; + } else { + this.comparison = Comparison.EQUAL; + } + } + } + + public String getGenericUDFClassName() { + return genericUDFClassName; + } + + public void setGenericUDFClassName(String genericUDFClassName) { + this.genericUDFClassName = genericUDFClassName; + } + + /** + * The thread local IOContext is static, we may need to restart the search if, for instance, + * multiple files are being searched as part of a CombinedHiveRecordReader + */ + public void resetSortingValues() { + this.useSorted = false; + this.isBinarySearching = false; + this.endBinarySearch = false; + this.comparison = null; + this.genericUDFClassName = null; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy) @@ -32,23 +32,21 @@ * reading the data when some global flag ExecMapper.getDone() is set. */ public class HiveRecordReader - extends HiveContextAwareRecordReader { + extends HiveBinarySearchRecordReader { - private final RecordReader recordReader; - - private JobConf jobConf; + public HiveRecordReader(RecordReader recordReader) throws IOException { - this.recordReader = recordReader; + super(recordReader); } - + public HiveRecordReader(RecordReader recordReader, JobConf conf) throws IOException { - this.recordReader = recordReader; - this.jobConf = conf; + super(recordReader, conf); } + @Override public void doClose() throws IOException { recordReader.close(); } @@ -65,7 +63,12 @@ return recordReader.getPos(); } + @Override public float getProgress() throws IOException { + if (isSorted) { + return super.getProgress(); + } + return recordReader.getProgress(); } @@ -75,7 +78,11 @@ return false; } try { - return recordReader.next(key, value); + if (isSorted) { + return doSortedNext(key, value); + } else { + return recordReader.next(key, value); + } } catch (Exception e) { return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf); } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -33,8 +33,6 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.io.HiveIOExceptionHandler; -import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -174,7 +172,7 @@ } JobConf job; - + public void configure(JobConf job) { this.job = job; } @@ -207,7 +205,7 @@ Reporter reporter) throws IOException { HiveInputSplit hsplit = (HiveInputSplit) split; - + InputSplit inputSplit = hsplit.getInputSplit(); String inputFormatClassName = null; Class inputFormatClass = null; @@ -249,7 +247,7 @@ rr.initIOContext(hsplit, job, inputFormatClass, innerReader); return rr; } - + protected Map pathToPartitionInfo; MapredWork mrwork = null; @@ -382,11 +380,11 @@ if (this.mrwork == null) { init(job); } - + if(this.mrwork.getPathToAliases() == null) { return; } - + ArrayList aliases = new ArrayList(); Iterator>> iterator = this.mrwork .getPathToAliases().entrySet().iterator(); Index: ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (working copy) @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.ql.index.compact; +import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Set; @@ -27,11 +30,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -39,14 +45,18 @@ import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; @@ -57,6 +67,10 @@ public class CompactIndexHandler extends TableBasedIndexHandler { private Configuration configuration; + // The names of the partition columns + private Set partitionCols; + // Whether or not the conditions have been met to use the fact the index is sorted + private boolean useSorted; private static final Log LOG = LogFactory.getLog(CompactIndexHandler.class.getName()); @@ -170,13 +184,83 @@ Driver driver = new Driver(queryConf); driver.compile(qlCommand.toString(), false); + if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) { + // For now, only works if the predicate is a single condition + MapredWork work = null; + for (Task task : driver.getPlan().getRootTasks()) { + // The index query should have one and only one map reduce task in the root tasks + // Otherwise something is wrong, log the problem and continue using the default format + if (task.getWork() instanceof MapredWork) { + if (work != null) { + LOG.error("Tried to use a binary search on a compact index but there were an " + + "unexpected number (>1) of root level map reduce tasks in the " + + "reentrant query plan."); + work.setInputformat(null); + break; + } + work = (MapredWork)task.getWork(); + String inputFormat = work.getInputformat(); + if (inputFormat == null) { + inputFormat = HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVEINPUTFORMAT); + } + // We can only perform a binary search with HiveInputFormat and CombineHiveInputFormat + if (!inputFormat.equals(HiveInputFormat.class.getName()) && + !inputFormat.equals(CombineHiveInputFormat.class.getName())) { + work = null; + break; + } + + work.setInputFormatSorted(true); + } + } + + if (work != null) { + // Find the filter operator which acts on the index column and mark it + findIndexColumnFilter(work.getAliasToWork().values()); + } + } + + queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs()); queryContext.setQueryTasks(driver.getPlan().getRootTasks()); return; } /** + * Does a depth first search on the operator tree looking for a filter operator whose predicate + * has one child which is a column which is not in the partition + * @param operators + * @return whether or not it has found its target + */ + private boolean findIndexColumnFilter(Collection> operators) { + for (Operator op : operators) { + if (op instanceof FilterOperator && + ((FilterOperator)op).getConf().getPredicate().getChildren().size() == 2) { + ExprNodeColumnDesc columnDesc = null; + ExprNodeDesc predicate = ((FilterOperator)op).getConf().getPredicate(); + if (predicate.getChildren().get(0) instanceof ExprNodeColumnDesc) { + columnDesc = (ExprNodeColumnDesc)predicate.getChildren().get(0); + } else if (predicate.getChildren().get(1) instanceof ExprNodeColumnDesc) { + columnDesc = (ExprNodeColumnDesc)predicate.getChildren().get(1); + } + + // Is this the target + if (columnDesc != null && !partitionCols.contains(columnDesc.getColumn())) { + ((FilterOperator)op).getConf().setSortedFilter(true); + return true; + } + } + + // If the target has been found, no need to continue + if (findIndexColumnFilter(op.getChildOperators())) { + return true; + } + } + return false; + } + + /** * Split the predicate into the piece we can deal with (pushed), and the one we can't (residual) * @param predicate * @param index @@ -193,6 +277,20 @@ return null; } + int numIndexCols = 0; + for (IndexSearchCondition searchCondition : searchConditions) { + if (!partitionCols.contains(searchCondition.getColumnDesc().getColumn())) { + numIndexCols++; + } + } + + // For now, only works if the predicate has a single condition on an index column + if (numIndexCols == 1) { + useSorted = true; + } else { + useSorted = false; + } + DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions); decomposedPredicate.residualPredicate = residualPredicate; @@ -224,12 +322,14 @@ // partitioned columns are treated as if they have indexes so that the partitions // are used during the index query generation + partitionCols = new HashSet(); for (Partition part : queryPartitions) { if (part.getSpec().isEmpty()) { continue; // empty partitions are from whole tables, so we don't want to add them in } for (String column : part.getSpec().keySet()) { analyzer.allowColumnName(column); + partitionCols.add(column); } } Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java (revision 1183507) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java (working copy) @@ -22,11 +22,13 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ReturnObjectInspectorResolver; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; @@ -159,7 +161,35 @@ } + public Integer compare(DeferredObject[] arguments) throws HiveException { + Object o0,o1; + o0 = arguments[0].get(); + if (o0 == null) { + return null; + } + o1 = arguments[1].get(); + if (o1 == null) { + return null; + } + if (compareType == CompareType.NEED_CONVERT) { + Object converted_o0 = converter0.convert(o0); + if (converted_o0 == null) { + return null; + } + Object converted_o1 = converter1.convert(o1); + if (converted_o1 == null) { + return null; + } + return ObjectInspectorUtils.compare( + converted_o0, compareOI, + converted_o1, compareOI); + } else { + return ObjectInspectorUtils.compare( + o0, argumentOIs[0], o1, argumentOIs[1]); + } + } + @Override public String getDisplayString(String[] children) { assert (children.length == 2);