Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1183507)
+++ conf/hive-default.xml (working copy)
@@ -1109,6 +1109,12 @@
+ hive.index.compact.binary.search
+ false
+ Whether or not to use a binary search to find the entries in an index table that match the filter, where possible
+
+
+
hive.exim.uri.scheme.whitelist
hdfs,pfile
A comma separated list of acceptable URI schemes for import and export.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1183507)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -389,6 +389,7 @@
HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity
HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES("hive.index.compact.query.max.entries", (long) 10000000), // 10M
HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G
+ HIVE_INDEX_COMPACT_BINARY_SEARCH("hive.index.compact.binary.search", false),
// Statistics
HIVESTATSAUTOGATHER("hive.stats.autogather", true),
Index: ql/src/test/results/clientpositive/index_compact_binary_search.q.out
===================================================================
--- ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0)
+++ ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0)
@@ -0,0 +1,159 @@
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+POSTHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+POSTHOOK: query: ALTER INDEX src_index ON src REBUILD
+POSTHOOK: type: ALTERINDEX_REBUILD
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@default__src_src_index__
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-25_362_9037436441485749853/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-36_737_915702574002966361/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-45_460_2820974923545239100/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-33-53_484_7730503182025797843/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-01_644_2645522069983623952/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-16_817_2468472390975224229/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-25_022_2579452550595602420/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-33_138_3044294309369249675/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-41_270_9170656333370391768/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-10-25_18-34-49_478_2699176147331878951/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0)
+++ ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0)
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.io.HiveSortedInputFormat;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+
+public class VerifyHiveSortedInputFormatUsedHook implements ExecuteWithHookContext {
+
+ public void run(HookContext hookContext) {
+ if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) {
+
+ // Go through the root tasks, and verify the input format of the map reduce task(s) is
+ // HiveSortedInputFormat
+ ArrayList> rootTasks =
+ hookContext.getQueryPlan().getRootTasks();
+ for (Task extends Serializable> rootTask : rootTasks) {
+ if (rootTask.getWork() instanceof MapredWork) {
+ Assert.assertEquals("HiveSortedInputFormat was not used for the root map reduce task.",
+ ((MapredWork)rootTask.getWork()).getInputformat(),
+ HiveSortedInputFormat.class.getName());
+ }
+ }
+ }
+ }
+}
Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0)
+++ ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0)
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.mockito.InOrder;
+
+/**
+ * TestHiveBinarySearchRecordReader.
+ *
+ */
+public class TestHiveBinarySearchRecordReader extends TestCase {
+
+ private RCFileRecordReader rcfReader;
+ private JobConf conf;
+ private TestHiveInputSplit hiveSplit;
+ private HiveBinarySearchRecordReader hbsReader;
+ private IOContext ioContext;
+
+ private static class TestHiveInputSplit extends HiveInputSplit {
+
+ @Override
+ public long getStart() {
+ return 0;
+ }
+
+ @Override
+ public long getLength() {
+ return 100;
+ }
+
+ @Override
+ public Path getPath() {
+ return new Path("/");
+ }
+ }
+
+ private void resetIOContext() {
+ ioContext = IOContext.get();
+ ioContext.setUseSorted(false);
+ ioContext.setIsBinarySearching(false);
+ ioContext.setEndBinarySearch(false);
+ ioContext.setComparison(null);
+ ioContext.setGenericUDFClassName(null);
+ }
+
+ private void init() throws IOException {
+ resetIOContext();
+ rcfReader = mock(RCFileRecordReader.class);
+ when(rcfReader.next((LongWritable)anyObject(),
+ (BytesRefArrayWritable )anyObject())).thenReturn(true);
+ // Since the start is 0, and the length is 100, the first call to sync should be with the value
+ // 50 so return that for getPos()
+ when(rcfReader.getPos()).thenReturn(50L);
+ conf = new JobConf();
+ hiveSplit = new TestHiveInputSplit();
+ hbsReader = new HiveBinarySearchRecordReader(rcfReader, conf);
+ hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader);
+ }
+
+ private boolean executeDoNext(HiveBinarySearchRecordReader hbsReader) throws IOException {
+ return hbsReader.doNext(hbsReader.createKey(), hbsReader.createValue());
+ }
+
+ public void testNonLinearGreaterThan() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(1);
+ when(rcfReader.getPos()).thenReturn(25L);
+
+ // By setting the comparison to greater, the search should use the block [0, 50]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(25);
+ }
+
+ public void testNonLinearLessThan() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(-1);
+ when(rcfReader.getPos()).thenReturn(75L);
+
+ // By setting the comparison to less, the search should use the block [50, 100]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(75);
+ }
+
+ public void testNonLinearEqualTo() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(0);
+ when(rcfReader.getPos()).thenReturn(25L);
+
+ // By setting the comparison to equal, the search should use the block [0, 50]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(25);
+ }
+
+ public void testHitLastBlock() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(-1);
+ when(rcfReader.getPos()).thenReturn(100L);
+
+ // When sync is called it will return 100, the value signaling the end of the file, this should
+ // result in a call to sync to the beginning of the block it was searching [50, 100], and it
+ // should continue normally
+ Assert.assertTrue(executeDoNext(hbsReader));
+ InOrder inOrder = inOrder(rcfReader);
+ inOrder.verify(rcfReader).sync(75);
+ inOrder.verify(rcfReader).sync(50);
+ Assert.assertFalse(ioContext.isBinarySearching());
+ }
+
+ public void testHitSamePositionTwice() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(1);
+
+ // When getPos is called it should return the same value, signaling the end of the search, so
+ // the search should continue linearly and it should sync to the beginning of the block [0, 50]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ InOrder inOrder = inOrder(rcfReader);
+ inOrder.verify(rcfReader).sync(25);
+ inOrder.verify(rcfReader).sync(0);
+ Assert.assertFalse(ioContext.isBinarySearching());
+ }
+
+ public void testResetRange() throws Exception {
+ init();
+ InOrder inOrder = inOrder(rcfReader);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ inOrder.verify(rcfReader).sync(50);
+
+ ioContext.setComparison(-1);
+ when(rcfReader.getPos()).thenReturn(75L);
+
+ Assert.assertTrue(executeDoNext(hbsReader));
+ inOrder.verify(rcfReader).sync(75);
+
+ ioContext.setEndBinarySearch(true);
+
+ // This should make the search linear, sync to the beginning of the block being searched
+ // [50, 100], set the comparison to be null, and the flag to reset the range should be unset
+ Assert.assertTrue(executeDoNext(hbsReader));
+ inOrder.verify(rcfReader).sync(50);
+ Assert.assertFalse(ioContext.isBinarySearching());
+ Assert.assertFalse(ioContext.shouldEndBinarySearch());
+ }
+
+ public void testEqualOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setIsBinarySearching(false);
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ }
+
+ public void testLessThanOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPLessThan.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ }
+
+ public void testLessThanOrEqualOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPEqualOrLessThan.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ }
+
+ public void testGreaterThanOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setIsBinarySearching(false);
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ }
+
+ public void testGreaterThanOrEqualOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setIsBinarySearching(false);
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestHiveBinarySearchRecordReader().testNonLinearGreaterThan();
+ new TestHiveBinarySearchRecordReader().testNonLinearLessThan();
+ new TestHiveBinarySearchRecordReader().testNonLinearEqualTo();
+ new TestHiveBinarySearchRecordReader().testHitLastBlock();
+ new TestHiveBinarySearchRecordReader().testHitSamePositionTwice();
+ new TestHiveBinarySearchRecordReader().testResetRange();
+ new TestHiveBinarySearchRecordReader().testEqualOpClass();
+ new TestHiveBinarySearchRecordReader().testLessThanOpClass();
+ new TestHiveBinarySearchRecordReader().testLessThanOrEqualOpClass();
+ new TestHiveBinarySearchRecordReader().testGreaterThanOpClass();
+ new TestHiveBinarySearchRecordReader().testGreaterThanOrEqualOpClass();
+ }
+}
Index: ql/src/test/queries/clientpositive/index_compact_binary_search.q
===================================================================
--- ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0)
+++ ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0)
@@ -0,0 +1,45 @@
+SET hive.default.fileformat=TextFile;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.optimize.index.filter=true;
+SET hive.optimize.index.filter.compact.minsize=1;
+SET hive.index.compact.binary.search=true;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.default.fileformat=RCFILE;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (working copy)
@@ -24,6 +24,7 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
@@ -81,12 +82,12 @@
void evaluate() throws HiveException {
obj = eval.evaluate(rowObject);
}
-
+
public Object get() throws HiveException {
return obj;
}
}
-
+
public ExprNodeGenericFuncEvaluator(ExprNodeGenericFuncDesc expr) {
this.expr = expr;
children = new ExprNodeEvaluator[expr.getChildExprs().size()];
@@ -152,4 +153,27 @@
return genericUDF.evaluate(deferredChildren);
}
+ /**
+ * If the genericUDF is a base comparison, it returns an integer based on the result of comparing
+ * the two sides of the UDF, like the compareTo method in Comparable.
+ *
+ * If the genericUDF is not a base comparison, or there is an error executing the comparison, it
+ * returns null.
+ * @param row
+ * @return
+ * @throws HiveException
+ */
+ public Integer compare(Object row) throws HiveException {
+ if (!(genericUDF instanceof GenericUDFBaseCompare)) {
+ return null;
+ }
+
+ rowObject = row;
+ if (isEager) {
+ for (int i = 0; i < deferredChildren.length; i++) {
+ ((EagerExprObject) deferredChildren[i]).evaluate();
+ }
+ }
+ return ((GenericUDFBaseCompare)genericUDF).compare(deferredChildren);
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -49,6 +50,8 @@
private transient ExprNodeEvaluator conditionEvaluator;
private transient PrimitiveObjectInspector conditionInspector;
private transient int consecutiveFails;
+ private transient int consecutiveSearches;
+ private transient IOContext ioContext;
transient int heartbeatInterval;
public FilterOperator() {
@@ -56,6 +59,7 @@
filtered_count = new LongWritable();
passed_count = new LongWritable();
consecutiveFails = 0;
+ consecutiveSearches = 0;
}
@Override
@@ -67,6 +71,7 @@
statsMap.put(Counter.FILTERED, filtered_count);
statsMap.put(Counter.PASSED, passed_count);
conditionInspector = null;
+ ioContext = IOContext.get();
} catch (Throwable e) {
throw new HiveException(e);
}
@@ -80,7 +85,43 @@
conditionInspector = (PrimitiveObjectInspector) conditionEvaluator
.initialize(rowInspector);
}
+
+ // If the input is sorted, and we are executing a search based on the arguments to this filter,
+ // set the comparison in the IOContext and the type of the UDF
+ if (conf.isSortedFilter() && ioContext.useSorted()) {
+ if (!(conditionEvaluator instanceof ExprNodeGenericFuncEvaluator)) {
+ LOG.error("Attempted to use the fact data is sorted when the conditionEvaluator is not " +
+ "of type ExprNodeGenericFuncEvaluator");
+ ioContext.setUseSorted(false);
+ return;
+ } else {
+ ioContext.setComparison(((ExprNodeGenericFuncEvaluator)conditionEvaluator).compare(row));
+ }
+
+ if (ioContext.getGenericUDFClassName() == null) {
+ ioContext.setGenericUDFClassName(
+ ((ExprNodeGenericFuncEvaluator)conditionEvaluator).genericUDF.getClass().getName());
+ }
+
+ // If we are currently searching the data for a place to begin, do not return data yet
+ if (ioContext.isBinarySearching()) {
+ consecutiveSearches++;
+ // In case we're searching through an espeially large set of data, send a heartbeat in
+ // order to avoid timeout
+ if (((consecutiveSearches % heartbeatInterval) == 0) && (reporter != null)) {
+ reporter.progress();
+ }
+ return;
+ }
+ }
+
Object condition = conditionEvaluator.evaluate(row);
+
+ // If we are currently peforming a binary search on the input, don't forward the results
+ if (ioContext.isBinarySearching()) {
+ return;
+ }
+
Boolean ret = (Boolean) conditionInspector
.getPrimitiveJavaObject(condition);
if (Boolean.TRUE.equals(ret)) {
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (working copy)
@@ -68,6 +68,8 @@
private org.apache.hadoop.hive.ql.plan.ExprNodeDesc predicate;
private boolean isSamplingPred;
private transient sampleDesc sampleDescr;
+ // Is this a filter that should perform a comparison for sorted searches
+ private boolean isSortedFilter;
public FilterDesc() {
}
@@ -116,4 +118,12 @@
this.sampleDescr = sampleDescr;
}
+ public boolean isSortedFilter() {
+ return isSortedFilter;
+ }
+
+ public void setSortedFilter(boolean isSortedFilter) {
+ this.isSortedFilter = isSortedFilter;
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy)
@@ -43,12 +43,12 @@
import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -61,7 +61,7 @@
* RCFiles, short of Record Columnar File, are flat files
* consisting of binary key/value pairs, which shares much similarity with
* SequenceFile.
- *
+ *
* RCFile stores columns of a table in a record columnar way. It first
* partitions rows horizontally into row splits. and then it vertically
* partitions each row split in a columnar way. RCFile first stores the meta
@@ -75,7 +75,7 @@
* RCFile provides {@link Writer}, {@link Reader} and classes for
* writing, reading respectively.
*
- *
+ *
*
* RCFile stores columns of a table in a record columnar way. It first
* partitions rows horizontally into row splits. and then it vertically
@@ -83,21 +83,21 @@
* data of a row split, as the key part of a record, and all the data of a row
* split as the value part.
*
- *
+ *
*
* RCFile compresses values in a more fine-grained manner then record level
* compression. However, It currently does not support compress the key part
* yet. The actual compression algorithm used to compress key and/or values can
* be specified by using the appropriate {@link CompressionCodec}.
*
- *
+ *
*
* The {@link Reader} is used to read and explain the bytes of RCFile.
*
- *
+ *
*
- *
- *
+ *
+ *
*
*
* - version - 3 bytes of magic header SEQ, followed by 1 byte of
@@ -113,7 +113,7 @@
*
- metadata - {@link Metadata} for this file.
* - sync - A sync marker to denote end of the header.
*
- *
+ *
* RCFile Format
*
* - Header
@@ -143,7 +143,7 @@
*
*
*
- *
+ *
*/
public class RCFile {
@@ -177,7 +177,7 @@
/**
* KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
* below:
- *
+ *
*
* - record length in bytes,it is the sum of bytes used to store the key
* part and the value part.
@@ -204,7 +204,7 @@
private int numberRows = 0;
// how many columns
private int columnNumber = 0;
-
+
// return the number of columns recorded in this file's header
public int getColumnNumber() {
return columnNumber;
@@ -227,7 +227,7 @@
/**
* add in a new column's meta data.
- *
+ *
* @param columnValueLen
* this total bytes number of this column's values in this split
* @param colValLenBuffer
@@ -277,7 +277,7 @@
/**
* get number of bytes to store the keyBuffer.
- *
+ *
* @return number of bytes used to store this KeyBuffer on disk
* @throws IOException
*/
@@ -370,7 +370,7 @@
Decompressor valDecompressor = null;
NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
CompressionInputStream deflatFilter = null;
-
+
public ValueBuffer() throws IOException {
}
@@ -522,7 +522,7 @@
CodecPool.returnDecompressor(valDecompressor);
}
}
-
+
@Override
public int compareTo(Object arg0) {
throw new RuntimeException("compareTo not supported in class "
@@ -533,7 +533,7 @@
/**
* Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
* compatible with SequenceFile's.
- *
+ *
*/
public static class Writer {
@@ -664,7 +664,7 @@
/**
* Constructs a RCFile Writer.
- *
+ *
* @param fs
* the file system used
* @param conf
@@ -680,7 +680,7 @@
/**
* Constructs a RCFile Writer.
- *
+ *
* @param fs
* the file system used
* @param conf
@@ -699,9 +699,9 @@
}
/**
- *
+ *
* Constructs a RCFile Writer.
- *
+ *
* @param fs
* the file system used
* @param conf
@@ -837,7 +837,7 @@
* column number in the file, zero bytes are appended for the empty columns.
* If its size() is greater then the column number in the file, the exceeded
* columns' bytes are ignored.
- *
+ *
* @param val
* @throws IOException
*/
@@ -936,7 +936,7 @@
bufferedRecords = 0;
columnBufferSize = 0;
}
-
+
/**
* flush a block out without doing anything except compressing the key part.
*/
@@ -945,7 +945,7 @@
checkAndWriteSync(); // sync
out.writeInt(recordLen); // total record length
out.writeInt(keyLength); // key portion length
-
+
if(this.isCompressed()) {
//compress key and write key out
keyCompressionBuffer.reset();
@@ -1001,7 +1001,7 @@
/**
* Read KeyBuffer/ValueBuffer pairs from a RCFile.
- *
+ *
*/
public static class Reader {
private static class SelectedColumn {
@@ -1032,7 +1032,7 @@
private final ValueBuffer currentValue;
- private boolean[] skippedColIDs = null;
+ private final boolean[] skippedColIDs = null;
private int readRowsIndexInBuffer = 0;
@@ -1242,7 +1242,7 @@
/**
* Set the current byte position in the input file.
- *
+ *
*
* The position passed must be a position returned by
* {@link RCFile.Writer#getLength()} when writing this file. To seek to an
@@ -1254,6 +1254,18 @@
in.seek(position);
}
+ /**
+ * Resets the values which determine if there are more rows in the buffer
+ *
+ * This can be used after one calls seek or sync, if one called next before that.
+ * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
+ * buffer built up from the call to next.
+ */
+ public synchronized void resetBuffer() {
+ readRowsIndexInBuffer = 0;
+ recordsNumInValBuffer = 0;
+ }
+
/** Seek to the next sync mark past a given position. */
public synchronized void sync(long position) throws IOException {
if (position + SYNC_SIZE >= end) {
@@ -1309,7 +1321,7 @@
/**
* Read and return the next record length, potentially skipping over a sync
* block.
- *
+ *
* @return the length of the next record or -1 if there is no next record
* @throws IOException
*/
@@ -1407,7 +1419,7 @@
currentValue.readFields(in);
currentValue.inited = true;
}
-
+
public boolean nextBlock() throws IOException {
int keyLength = nextKeyBuffer();
if(keyLength > 0) {
@@ -1430,7 +1442,7 @@
* Calling getColumn() with not change the result of
* {@link #next(LongWritable)} and
* {@link #getCurrentRow(BytesRefArrayWritable)}.
- *
+ *
* @param columnID
* @throws IOException
*/
@@ -1459,7 +1471,7 @@
ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
boolean decompressed = currentValue.decompressedFlag[selColIdx];
if (decompressed) {
- uncompData =
+ uncompData =
currentValue.loadedColumnsValueBuffer[selColIdx].getData();
} else {
decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
@@ -1485,7 +1497,7 @@
* current value buffer. It will influence the result of
* {@link #next(LongWritable)} and
* {@link #getCurrentRow(BytesRefArrayWritable)}
- *
+ *
* @return whether there still has records or not
* @throws IOException
*/
@@ -1500,7 +1512,7 @@
* of rows passed by, because {@link #seek(long)},
* {@link #nextColumnsBatch()} can change the underlying key buffer and
* value buffer.
- *
+ *
* @return next row number
* @throws IOException
*/
@@ -1567,7 +1579,7 @@
/**
* get the current row used,make sure called {@link #next(LongWritable)}
* first.
- *
+ *
* @throws IOException
*/
public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
@@ -1596,11 +1608,11 @@
for (int j = 0; j < selectedColumns.length; ++j) {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
-
+
BytesRefWritable ref = ret.unCheckedGet(i);
-
+
colAdvanceRow(j, col);
-
+
if (currentValue.decompressedFlag[j]) {
ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
col.rowReadIndex, col.prvLength);
@@ -1611,14 +1623,14 @@
col.rowReadIndex += col.prvLength;
}
} else {
- // This version of the loop eliminates a condition check and branch
+ // This version of the loop eliminates a condition check and branch
// and is measurably faster (20% or so)
for (int j = 0; j < selectedColumns.length; ++j) {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
-
+
BytesRefWritable ref = ret.unCheckedGet(i);
-
+
colAdvanceRow(j, col);
ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
col.rowReadIndex, col.prvLength);
@@ -1666,7 +1678,7 @@
public String toString() {
return file.toString();
}
-
+
public boolean isCompressedRCFile() {
return this.decompress;
}
@@ -1689,7 +1701,7 @@
public KeyBuffer getCurrentKeyBufferObj() {
return this.currentKey;
}
-
+
/**
* return the ValueBuffer object used in the reader. Internally in each
* reader, there is only one ValueBuffer object, which gets reused for every
@@ -1698,7 +1710,7 @@
public ValueBuffer getCurrentValueBufferObj() {
return this.currentValue;
}
-
+
//return the current block's length
public int getCurrentBlockLength() {
return this.currentRecordLength;
@@ -1713,11 +1725,11 @@
public int getCurrentCompressedKeyLen() {
return this.compressedKeyLen;
}
-
+
//return the CompressionCodec used for this file
public CompressionCodec getCompressionCodec() {
return this.codec;
}
-
+
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (working copy)
@@ -129,6 +129,14 @@
in.seek(pos);
}
+ public void sync(long pos) throws IOException {
+ in.sync(pos);
+ }
+
+ public void resetBuffer() {
+ in.resetBuffer();
+ }
+
public long getStart() {
return start;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveSortedInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveSortedInputFormat.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveSortedInputFormat.java (revision 0)
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * Input format for performing queries on data that meets the requirements specified in
+ * HiveBinarySearchRecordreader.
+ */
+public class HiveSortedInputFormat extends HiveInputFormat {
+
+ public HiveSortedInputFormat() {
+ super();
+ }
+
+ @Override
+ protected HiveRecordReader getHiveRecordReader(RecordReader innerReader, JobConf job)
+ throws IOException {
+ return new HiveBinarySearchRecordReader(innerReader, job);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinarySearchRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinarySearchRecordReader.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinarySearchRecordReader.java (revision 0)
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.ql.exec.ExecMapper;
+import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * HiveBinarySearchRecordReader is a wrapper on RecordReader. It should only be used if the data
+ * is going to a FilterOperator, which filters by comparing a value in the data with a constant,
+ * using one of the comparisons =, <, >, <=, >=, and the data is sorted on that value in ascending
+ * order. If the RecordReader's underlying format is an RCFile, this object can perform a binary
+ * search to find the block to begin reading from, and stop reading once it can be determined no
+ * other entries will match the filter.
+ */
+public class HiveBinarySearchRecordReader
+ extends HiveRecordReader {
+
+ private static final Log LOG = LogFactory.getLog(HiveBinarySearchRecordReader.class.getName());
+
+ private long rangeStart;
+ private long rangeEnd;
+ private long splitEnd;
+ private long previousPosition = -1;
+ private boolean wasUsingSortedSearch = false;
+ private String genericUDFClassName = null;
+ private final List stopComparisons = new ArrayList();
+
+ public HiveBinarySearchRecordReader(RecordReader recordReader, JobConf conf)
+ throws IOException {
+ super(recordReader, conf);
+ }
+
+ @Override
+ public void doClose() throws IOException {
+ recordReader.close();
+ }
+
+ @Override
+ public K createKey() {
+ return (K) recordReader.createKey();
+ }
+
+ @Override
+ public V createValue() {
+ return (V) recordReader.createValue();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return recordReader.getPos();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (this.getIOContext().isBinarySearching()) {
+ return 0;
+ } else {
+ return recordReader.getProgress();
+ }
+ }
+
+ @Override
+ public boolean doNext(K key, V value) throws IOException {
+ if (ExecMapper.getDone()) {
+ return false;
+ }
+
+ if (this.getIOContext().shouldEndBinarySearch() ||
+ (!this.getIOContext().useSorted() && this.wasUsingSortedSearch)) {
+ beginLinearSearch();
+ this.wasUsingSortedSearch = false;
+ this.getIOContext().setEndBinarySearch(false);
+ }
+
+ if (this.getIOContext().useSorted()) {
+ if (this.genericUDFClassName == null &&
+ this.getIOContext().getGenericUDFClassName() != null) {
+ setGenericUDFClassName(this.getIOContext().getGenericUDFClassName());
+ }
+
+ if (this.getIOContext().isBinarySearching()) {
+ // Proceed with a binary search
+ if (this.getIOContext().getComparison() != null) {
+ switch (this.getIOContext().getComparison()) {
+ case GREATER:
+ case EQUAL:
+ // Indexes have only one entry per value, could go linear from here, if we want to
+ // use this for any sorted table, we'll need to continue the search
+ rangeEnd = previousPosition;
+ break;
+ case LESS:
+ rangeStart = previousPosition;
+ break;
+ default:
+ break;
+ }
+ }
+
+ long position = (rangeStart + rangeEnd) / 2;
+ sync(position);
+
+ long newPosition = getPos();
+ // If the newPosition is the same as the previousPosition, we've reached the end of the
+ // binary search, if the new position at least as big as the size of the split, any
+ // matching rows must be in the final block, so we can end the binary search.
+ if (newPosition == previousPosition || newPosition >= splitEnd) {
+ this.getIOContext().setIsBinarySearching(false);
+ sync(rangeStart);
+ }
+
+ previousPosition = newPosition;
+ } else if (foundAllTargets()) {
+ // Found all possible rows which will not be filtered
+ return false;
+ }
+ }
+
+ try {
+ return recordReader.next(key, value);
+ } catch (Exception e) {
+ return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
+ }
+ }
+
+ @Override
+ public void initIOContext(FileSplit split, JobConf job,
+ Class inputFormatClass, RecordReader recordReader) throws IOException {
+ super.initIOContext(split, job, inputFormatClass, recordReader);
+ this.rangeStart = split.getStart();
+ this.rangeEnd = split.getStart() + split.getLength();
+ this.splitEnd = rangeEnd;
+ if (recordReader instanceof RCFileRecordReader && rangeEnd != 0) {
+ // Binary search only works if we know the size of the split, and the recordReader is an
+ // RCFileRecordReader
+ this.getIOContext().setUseSorted(true);
+ this.getIOContext().setIsBinarySearching(true);
+ this.wasUsingSortedSearch = true;
+ }
+ }
+
+ private void sync(long position) throws IOException {
+ ((RCFileRecordReader)recordReader).sync(position);
+ ((RCFileRecordReader)recordReader).resetBuffer();
+ }
+
+ /**
+ * This uses the name of the generic UDF being used by the filter to determine whether we should
+ * perform a binary search, and what the comparisons we should use to signal the end of the
+ * linear scan are.
+ * @param genericUDFClassName
+ * @throws IOException
+ */
+ private void setGenericUDFClassName(String genericUDFClassName) throws IOException {
+ this.genericUDFClassName = genericUDFClassName;
+ if (genericUDFClassName.equals(GenericUDFOPEqual.class.getName())) {
+ stopComparisons.add(Comparison.GREATER);
+ } else if (genericUDFClassName.equals(GenericUDFOPLessThan.class.getName())) {
+ stopComparisons.add(Comparison.EQUAL);
+ stopComparisons.add(Comparison.GREATER);
+ if (this.getIOContext().isBinarySearching()) {
+ beginLinearSearch();
+ }
+ } else if (genericUDFClassName.equals(GenericUDFOPEqualOrLessThan.class.getName())) {
+ stopComparisons.add(Comparison.GREATER);
+ if (this.getIOContext().isBinarySearching()) {
+ beginLinearSearch();
+ }
+ } else if (genericUDFClassName.equals(GenericUDFOPGreaterThan.class.getName()) ||
+ genericUDFClassName.equals(GenericUDFOPEqualOrGreaterThan.class.getName())) {
+ // Do nothing
+ } else {
+ // This is an unsupported operator
+ LOG.debug(genericUDFClassName + " is not the name of a supported class. " +
+ "Continuing linearly.");
+ if (this.getIOContext().isBinarySearching()) {
+ beginLinearSearch();
+ }
+ }
+ }
+
+ /**
+ * This should be called after the binary search is finished and before the linear scan begins
+ * @throws IOException
+ */
+ private void beginLinearSearch() throws IOException {
+ sync(rangeStart);
+ this.getIOContext().setIsBinarySearching(false);
+ this.wasUsingSortedSearch = false;
+ }
+
+ /**
+ * Returns true if the current comparison is in the list of stop comparisons, i.e. we've found
+ * all records which won't be filtered
+ * @return
+ */
+ public boolean foundAllTargets() {
+ if (this.getIOContext().getComparison() == null ||
+ !stopComparisons.contains(this.getIOContext().getComparison())) {
+ return false;
+ }
+
+ return true;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (working copy)
@@ -45,6 +45,24 @@
boolean isBlockPointer;
boolean ioExceptions;
+ // Are we using the fact the input is sorted
+ boolean useSorted = false;
+ // Are we currently performing a binary search
+ boolean isBinarySearching = false;
+ // Do we want to end the binary search
+ boolean endBinarySearch = false;
+ // The result of the comparison of the last row processed
+ Comparison comparison = null;
+ // The class name of the generic UDF being used by the filter
+ String genericUDFClassName = null;
+
+ public static enum Comparison {
+ GREATER,
+ LESS,
+ EQUAL,
+ UNKNOWN
+ }
+
String inputFile;
public IOContext() {
@@ -102,4 +120,57 @@
public boolean getIOExceptions() {
return ioExceptions;
}
+
+ public boolean useSorted() {
+ return useSorted;
+ }
+
+ public void setUseSorted(boolean useSorted) {
+ this.useSorted = useSorted;
+ }
+
+ public boolean isBinarySearching() {
+ return isBinarySearching;
+ }
+
+ public void setIsBinarySearching(boolean isBinarySearching) {
+ this.isBinarySearching = isBinarySearching;
+ }
+
+ public boolean shouldEndBinarySearch() {
+ return endBinarySearch;
+ }
+
+ public void setEndBinarySearch(boolean endBinarySearch) {
+ this.endBinarySearch = endBinarySearch;
+ }
+
+ public Comparison getComparison() {
+ return comparison;
+ }
+
+ public void setComparison(Integer comparison) {
+ if (comparison == null && this.isBinarySearching) {
+ // Nothing we can do here, so just proceed normally from now on
+ endBinarySearch = true;
+ } else {
+ if (comparison == null) {
+ this.comparison = Comparison.UNKNOWN;
+ } else if (comparison.intValue() < 0) {
+ this.comparison = Comparison.LESS;
+ } else if (comparison.intValue() > 0) {
+ this.comparison = Comparison.GREATER;
+ } else {
+ this.comparison = Comparison.EQUAL;
+ }
+ }
+ }
+
+ public String getGenericUDFClassName() {
+ return genericUDFClassName;
+ }
+
+ public void setGenericUDFClassName(String genericUDFClassName) {
+ this.genericUDFClassName = genericUDFClassName;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy)
@@ -34,21 +34,22 @@
public class HiveRecordReader
extends HiveContextAwareRecordReader {
- private final RecordReader recordReader;
-
- private JobConf jobConf;
+ protected final RecordReader recordReader;
+ protected JobConf jobConf;
+
public HiveRecordReader(RecordReader recordReader)
throws IOException {
this.recordReader = recordReader;
}
-
+
public HiveRecordReader(RecordReader recordReader, JobConf conf)
throws IOException {
this.recordReader = recordReader;
this.jobConf = conf;
}
+ @Override
public void doClose() throws IOException {
recordReader.close();
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy)
@@ -33,8 +33,6 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandler;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -174,7 +172,7 @@
}
JobConf job;
-
+
public void configure(JobConf job) {
this.job = job;
}
@@ -207,7 +205,7 @@
Reporter reporter) throws IOException {
HiveInputSplit hsplit = (HiveInputSplit) split;
-
+
InputSplit inputSplit = hsplit.getInputSplit();
String inputFormatClassName = null;
Class inputFormatClass = null;
@@ -245,11 +243,16 @@
innerReader = HiveIOExceptionHandlerUtil
.handleRecordReaderCreationException(e, cloneJobConf);
}
- HiveRecordReader rr = new HiveRecordReader(innerReader, job);
+ HiveRecordReader rr = getHiveRecordReader(innerReader, job);
rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
return rr;
}
-
+
+ protected HiveRecordReader getHiveRecordReader(RecordReader innerReader, JobConf job)
+ throws IOException {
+ return new HiveRecordReader(innerReader, job);
+ }
+
protected Map pathToPartitionInfo;
MapredWork mrwork = null;
@@ -382,11 +385,11 @@
if (this.mrwork == null) {
init(job);
}
-
+
if(this.mrwork.getPathToAliases() == null) {
return;
}
-
+
ArrayList aliases = new ArrayList();
Iterator>> iterator = this.mrwork
.getPathToAliases().entrySet().iterator();
Index: ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (working copy)
@@ -52,7 +52,7 @@
// IndexBucket
static class IBucket {
private String name = null;
- private SortedSet offsets = new TreeSet();
+ private final SortedSet offsets = new TreeSet();
public IBucket(String n) {
name = n;
@@ -70,6 +70,7 @@
return offsets;
}
+ @Override
public boolean equals(Object obj) {
if (obj.getClass() != this.getClass()) {
return false;
@@ -91,10 +92,10 @@
ignoreHdfsLoc = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INDEX_IGNORE_HDFS_LOC);
if (indexFiles != null && indexFiles.size() > 0) {
- FileSystem fs = FileSystem.get(conf);
List paths = new ArrayList();
for (String indexFile : indexFiles) {
Path indexFilePath = new Path(indexFile);
+ FileSystem fs = indexFilePath.getFileSystem(conf);
FileStatus indexStat = fs.getFileStatus(indexFilePath);
if (indexStat.isDir()) {
FileStatus[] fss = fs.listStatus(indexFilePath);
@@ -113,6 +114,7 @@
long lineCounter = 0;
for (Path indexFinalPath : paths) {
+ FileSystem fs = indexFinalPath.getFileSystem(conf);
FSDataInputStream ifile = fs.open(indexFinalPath);
LineReader lr = new LineReader(ifile, conf);
try {
Index: ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (working copy)
@@ -18,7 +18,10 @@
package org.apache.hadoop.hive.ql.index.compact;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
@@ -27,11 +30,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -39,14 +45,17 @@
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
+import org.apache.hadoop.hive.ql.io.HiveSortedInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -57,6 +66,10 @@
public class CompactIndexHandler extends TableBasedIndexHandler {
private Configuration configuration;
+ // The names of the partition columns
+ private Set partitionCols;
+ // Whether or not the conditions have been met to use the fact the index is sorted
+ private boolean useSorted;
private static final Log LOG = LogFactory.getLog(CompactIndexHandler.class.getName());
@@ -170,13 +183,73 @@
Driver driver = new Driver(queryConf);
driver.compile(qlCommand.toString(), false);
+ if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
+ // For now, only works if the predicate is a single condition
+ MapredWork work = null;
+ for (Task task : driver.getPlan().getRootTasks()) {
+ // The index query should have one and only one map reduce task in the root tasks
+ // Otherwise something is wrong, log the problem and continue using the default format
+ if (task.getWork() instanceof MapredWork) {
+ if (work != null) {
+ LOG.error("Tried to use a binary search on a compact index but there were an " +
+ "unexpected number (>1) of root level map reduce tasks in the " +
+ "reentrant query plan.");
+ work.setInputformat(null);
+ break;
+ }
+ work = (MapredWork)task.getWork();
+ if (work.getInputformat() == null) {
+ work.setInputformat(HiveSortedInputFormat.class.getName());
+ }
+ }
+ }
+ if (work != null) {
+ // Find the filter operator which acts on the non-partition column and mark it
+ findNonPartitionFilter(work.getAliasToWork().values());
+ }
+ }
+
+
queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs());
queryContext.setQueryTasks(driver.getPlan().getRootTasks());
return;
}
/**
+ * Does a depth first search on the operator tree looking for a filter operator whose predicate
+ * has one child which is a column which is not in the partition
+ * @param operators
+ * @return whether or not it has found its target
+ */
+ private boolean findNonPartitionFilter(Collection> operators) {
+ for (Operator extends Serializable> op : operators) {
+ if (op instanceof FilterOperator &&
+ ((FilterOperator)op).getConf().getPredicate().getChildren().size() == 2) {
+ ExprNodeColumnDesc columnDesc = null;
+ ExprNodeDesc predicate = ((FilterOperator)op).getConf().getPredicate();
+ if (predicate.getChildren().get(0) instanceof ExprNodeColumnDesc) {
+ columnDesc = (ExprNodeColumnDesc)predicate.getChildren().get(0);
+ } else if (predicate.getChildren().get(1) instanceof ExprNodeColumnDesc) {
+ columnDesc = (ExprNodeColumnDesc)predicate.getChildren().get(1);
+ }
+
+ // Is this the target
+ if (columnDesc != null && !partitionCols.contains(columnDesc.getColumn())) {
+ ((FilterOperator)op).getConf().setSortedFilter(true);
+ return true;
+ }
+ }
+
+ // If the target has been found, no need to continue
+ if (findNonPartitionFilter(op.getChildOperators())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Split the predicate into the piece we can deal with (pushed), and the one we can't (residual)
* @param predicate
* @param index
@@ -193,6 +266,20 @@
return null;
}
+ int numNonPartitionCols = 0;
+ for (IndexSearchCondition searchCondition : searchConditions) {
+ if (!partitionCols.contains(searchCondition.getColumnDesc().getColumn())) {
+ numNonPartitionCols++;
+ }
+ }
+
+ // For now, only works if the predicate has a single non-partition condition
+ if (numNonPartitionCols == 1) {
+ useSorted = true;
+ } else {
+ useSorted = false;
+ }
+
DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions);
decomposedPredicate.residualPredicate = residualPredicate;
@@ -224,12 +311,14 @@
// partitioned columns are treated as if they have indexes so that the partitions
// are used during the index query generation
+ partitionCols = new HashSet();
for (Partition part : queryPartitions) {
if (part.getSpec().isEmpty()) {
continue; // empty partitions are from whole tables, so we don't want to add them in
}
for (String column : part.getSpec().keySet()) {
analyzer.allowColumnName(column);
+ partitionCols.add(column);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java (working copy)
@@ -22,11 +22,13 @@
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ReturnObjectInspectorResolver;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
@@ -159,7 +161,35 @@
}
+ public Integer compare(DeferredObject[] arguments) throws HiveException {
+ Object o0,o1;
+ o0 = arguments[0].get();
+ if (o0 == null) {
+ return null;
+ }
+ o1 = arguments[1].get();
+ if (o1 == null) {
+ return null;
+ }
+ if (compareType == CompareType.NEED_CONVERT) {
+ Object converted_o0 = converter0.convert(o0);
+ if (converted_o0 == null) {
+ return null;
+ }
+ Object converted_o1 = converter1.convert(o1);
+ if (converted_o1 == null) {
+ return null;
+ }
+ return ObjectInspectorUtils.compare(
+ converted_o0, compareOI,
+ converted_o1, compareOI);
+ } else {
+ return ObjectInspectorUtils.compare(
+ o0, argumentOIs[0], o1, argumentOIs[1]);
+ }
+ }
+
@Override
public String getDisplayString(String[] children) {
assert (children.length == 2);