Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1183507)
+++ conf/hive-default.xml (working copy)
@@ -1109,6 +1109,12 @@
+ hive.index.compact.binary.search
+ true
+ Whether or not to use a binary search to find the entries in an index table that match the filter, where possible
+
+
+
hive.exim.uri.scheme.whitelist
hdfs,pfile
A comma separated list of acceptable URI schemes for import and export.
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1183507)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -389,6 +389,7 @@
HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity
HIVE_INDEX_COMPACT_QUERY_MAX_ENTRIES("hive.index.compact.query.max.entries", (long) 10000000), // 10M
HIVE_INDEX_COMPACT_QUERY_MAX_SIZE("hive.index.compact.query.max.size", (long) 10 * 1024 * 1024 * 1024), // 10G
+ HIVE_INDEX_COMPACT_BINARY_SEARCH("hive.index.compact.binary.search", true),
// Statistics
HIVESTATSAUTOGATHER("hive.stats.autogather", true),
Index: ql/src/test/results/clientpositive/index_compact_binary_search.q.out
===================================================================
--- ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0)
+++ ql/src/test/results/clientpositive/index_compact_binary_search.q.out (revision 0)
@@ -0,0 +1,459 @@
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+POSTHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+POSTHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+POSTHOOK: query: ALTER INDEX src_index ON src REBUILD
+POSTHOOK: type: ALTERINDEX_REBUILD
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@default__src_src_index__
+POSTHOOK: Lineage: default__src_src_index__._bucketname SIMPLE [(src)src.FieldSchema(name:INPUT__FILE__NAME, type:string, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__._offsets EXPRESSION [(src)src.FieldSchema(name:BLOCK__OFFSET__INSIDE__FILE, type:bigint, comment:), ]
+POSTHOOK: Lineage: default__src_src_index__.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-28-16_162_4530544379793726356/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-28-24_910_8422770577218536886/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-28-33_506_4731041870462994974/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-28-42_116_5213638754635953384/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-28-50_540_3532759530680718240/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-29-07_718_5321452601706822472/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-29-16_481_5807706878522087187/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-29-24_683_6289978315989311017/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-29-33_397_8866306423858815160/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-29-42_150_3909923166888031983/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-29-58_166_3678265151794926604/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-30-06_723_4428955348995805938/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-30-15_252_6268745784357839984/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-30-23_915_5724065339829448949/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-30-32_371_334426532692286168/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-30-47_153_9199966558974802063/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-30-55_387_6708123102148378807/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-31-03_712_8643764022963931262/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-31-12_217_4545846333420817182/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-31-20_405_7128617548959801569/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-31-35_592_7263189839927522587/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-31-44_027_7622604667919537891/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-31-52_682_8498170072495224496/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-32-01_171_3483974250307194429/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-32-09_626_7969856304834644252/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
+PREHOOK: query: CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD
+PREHOOK: type: CREATEINDEX
+PREHOOK: query: ALTER INDEX src_index ON src REBUILD
+PREHOOK: type: ALTERINDEX_REBUILD
+PREHOOK: Input: default@src
+PREHOOK: Output: default@default__src_src_index__
+PREHOOK: query: SELECT * FROM src WHERE key = '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-32-24_952_7986831589488854044/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key < '1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-32-33_434_6857699378136194430/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key <= '0'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-32-41_908_1172462155728735633/-mr-10000
+0 val_0
+0 val_0
+0 val_0
+PREHOOK: query: SELECT * FROM src WHERE key > '8'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-32-52_719_4367930844159597815/-mr-10000
+86 val_86
+98 val_98
+82 val_82
+92 val_92
+83 val_83
+84 val_84
+96 val_96
+95 val_95
+98 val_98
+85 val_85
+87 val_87
+90 val_90
+95 val_95
+80 val_80
+90 val_90
+83 val_83
+9 val_9
+97 val_97
+84 val_84
+90 val_90
+97 val_97
+PREHOOK: query: SELECT * FROM src WHERE key >= '9'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@default__src_src_index__
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/var/folders/Y1/Y1Kf7th8FAawW1lYb6Tt+l+pemQ/-Tmp-/kevinwilfong/hive_2011-11-03_17-33-01_910_3573322999685212570/-mr-10000
+98 val_98
+92 val_92
+96 val_96
+95 val_95
+98 val_98
+90 val_90
+95 val_95
+90 val_90
+9 val_9
+97 val_97
+90 val_90
+97 val_97
+PREHOOK: query: DROP INDEX src_index ON src
+PREHOOK: type: DROPINDEX
Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0)
+++ ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyHiveSortedInputFormatUsedHook.java (revision 0)
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+
+public class VerifyHiveSortedInputFormatUsedHook implements ExecuteWithHookContext {
+
+ public void run(HookContext hookContext) {
+ if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) {
+
+ // Go through the root tasks, and verify the input format of the map reduce task(s) is
+ // HiveSortedInputFormat
+ ArrayList> rootTasks =
+ hookContext.getQueryPlan().getRootTasks();
+ for (Task extends Serializable> rootTask : rootTasks) {
+ if (rootTask.getWork() instanceof MapredWork) {
+ Assert.assertTrue("The root map reduce task's input was not marked as sorted.",
+ ((MapredWork)rootTask.getWork()).isInputFormatSorted());
+ }
+ }
+ }
+ }
+}
Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0)
+++ ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (revision 0)
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.mockito.InOrder;
+
+/**
+ * TestHiveBinarySearchRecordReader.
+ *
+ */
+public class TestHiveBinarySearchRecordReader extends TestCase {
+
+ private RCFileRecordReader rcfReader;
+ private JobConf conf;
+ private TestHiveInputSplit hiveSplit;
+ private HiveContextAwareRecordReader hbsReader;
+ private IOContext ioContext;
+
+ private static class TestHiveInputSplit extends HiveInputSplit {
+
+ @Override
+ public long getStart() {
+ return 0;
+ }
+
+ @Override
+ public long getLength() {
+ return 100;
+ }
+
+ @Override
+ public Path getPath() {
+ return new Path("/");
+ }
+ }
+
+ private static class TestHiveRecordReader
+ extends HiveContextAwareRecordReader {
+
+ public TestHiveRecordReader(RecordReader recordReader, JobConf conf) throws IOException {
+ super(recordReader, conf);
+ }
+
+ @Override
+ public K createKey() {
+ return null;
+ }
+
+ @Override
+ public V createValue() {
+ return null;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean doNext(K key, V value) throws IOException {
+ return super.doNext(key, value);
+ }
+
+ @Override
+ public void doClose() throws IOException {
+
+ }
+
+ }
+
+ private void resetIOContext() {
+ ioContext = IOContext.get();
+ ioContext.setUseSorted(false);
+ ioContext.setIsBinarySearching(false);
+ ioContext.setEndBinarySearch(false);
+ ioContext.setComparison(null);
+ ioContext.setGenericUDFClassName(null);
+ }
+
+ private void init() throws IOException {
+ resetIOContext();
+ rcfReader = mock(RCFileRecordReader.class);
+ when(rcfReader.next((LongWritable)anyObject(),
+ (BytesRefArrayWritable )anyObject())).thenReturn(true);
+ // Since the start is 0, and the length is 100, the first call to sync should be with the value
+ // 50 so return that for getPos()
+ when(rcfReader.getPos()).thenReturn(50L);
+ conf = new JobConf();
+ conf.setBoolean("hive.input.format.sorted", true);
+ hiveSplit = new TestHiveInputSplit();
+ hbsReader = new TestHiveRecordReader(rcfReader, conf);
+ hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader);
+ }
+
+ private boolean executeDoNext(HiveContextAwareRecordReader hbsReader) throws IOException {
+ return hbsReader.next(hbsReader.createKey(), hbsReader.createValue());
+ }
+
+ public void testNonLinearGreaterThan() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(1);
+ when(rcfReader.getPos()).thenReturn(25L);
+
+ // By setting the comparison to greater, the search should use the block [0, 50]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(25);
+ }
+
+ public void testNonLinearLessThan() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(-1);
+ when(rcfReader.getPos()).thenReturn(75L);
+
+ // By setting the comparison to less, the search should use the block [50, 100]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(75);
+ }
+
+ public void testNonLinearEqualTo() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(0);
+ when(rcfReader.getPos()).thenReturn(25L);
+
+ // By setting the comparison to equal, the search should use the block [0, 50]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(25);
+ }
+
+ public void testHitLastBlock() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(-1);
+ when(rcfReader.getPos()).thenReturn(100L);
+
+ // When sync is called it will return 100, the value signaling the end of the file, this should
+ // result in a call to sync to the beginning of the block it was searching [50, 100], and it
+ // should continue normally
+ Assert.assertTrue(executeDoNext(hbsReader));
+ InOrder inOrder = inOrder(rcfReader);
+ inOrder.verify(rcfReader).sync(75);
+ inOrder.verify(rcfReader).sync(50);
+ Assert.assertFalse(ioContext.isBinarySearching());
+ }
+
+ public void testHitSamePositionTwice() throws Exception {
+ init();
+ Assert.assertTrue(executeDoNext(hbsReader));
+ verify(rcfReader).sync(50);
+
+ ioContext.setComparison(1);
+
+ // When getPos is called it should return the same value, signaling the end of the search, so
+ // the search should continue linearly and it should sync to the beginning of the block [0, 50]
+ Assert.assertTrue(executeDoNext(hbsReader));
+ InOrder inOrder = inOrder(rcfReader);
+ inOrder.verify(rcfReader).sync(25);
+ inOrder.verify(rcfReader).sync(0);
+ Assert.assertFalse(ioContext.isBinarySearching());
+ }
+
+ public void testResetRange() throws Exception {
+ init();
+ InOrder inOrder = inOrder(rcfReader);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ inOrder.verify(rcfReader).sync(50);
+
+ ioContext.setComparison(-1);
+ when(rcfReader.getPos()).thenReturn(75L);
+
+ Assert.assertTrue(executeDoNext(hbsReader));
+ inOrder.verify(rcfReader).sync(75);
+
+ ioContext.setEndBinarySearch(true);
+
+ // This should make the search linear, sync to the beginning of the block being searched
+ // [50, 100], set the comparison to be null, and the flag to reset the range should be unset
+ Assert.assertTrue(executeDoNext(hbsReader));
+ inOrder.verify(rcfReader).sync(50);
+ Assert.assertFalse(ioContext.isBinarySearching());
+ Assert.assertFalse(ioContext.shouldEndBinarySearch());
+ }
+
+ public void testEqualOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setIsBinarySearching(false);
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ }
+
+ public void testLessThanOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPLessThan.class.getName());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertFalse(ioContext.isBinarySearching());
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ }
+
+ public void testLessThanOrEqualOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPEqualOrLessThan.class.getName());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertFalse(ioContext.isBinarySearching());
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertFalse(executeDoNext(hbsReader));
+ }
+
+ public void testGreaterThanOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setIsBinarySearching(false);
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ }
+
+ public void testGreaterThanOrEqualOpClass() throws Exception {
+ init();
+ ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName());
+ Assert.assertTrue(ioContext.isBinarySearching());
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setIsBinarySearching(false);
+ ioContext.setComparison(-1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(0);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ ioContext.setComparison(1);
+ Assert.assertTrue(executeDoNext(hbsReader));
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestHiveBinarySearchRecordReader().testNonLinearGreaterThan();
+ new TestHiveBinarySearchRecordReader().testNonLinearLessThan();
+ new TestHiveBinarySearchRecordReader().testNonLinearEqualTo();
+ new TestHiveBinarySearchRecordReader().testHitLastBlock();
+ new TestHiveBinarySearchRecordReader().testHitSamePositionTwice();
+ new TestHiveBinarySearchRecordReader().testResetRange();
+ new TestHiveBinarySearchRecordReader().testEqualOpClass();
+ new TestHiveBinarySearchRecordReader().testLessThanOpClass();
+ new TestHiveBinarySearchRecordReader().testLessThanOrEqualOpClass();
+ new TestHiveBinarySearchRecordReader().testGreaterThanOpClass();
+ new TestHiveBinarySearchRecordReader().testGreaterThanOrEqualOpClass();
+ }
+}
Index: ql/src/test/queries/clientpositive/index_compact_binary_search.q
===================================================================
--- ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0)
+++ ql/src/test/queries/clientpositive/index_compact_binary_search.q (revision 0)
@@ -0,0 +1,132 @@
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+SET hive.default.fileformat=TextFile;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.optimize.index.filter=true;
+SET hive.optimize.index.filter.compact.minsize=1;
+SET hive.index.compact.binary.search=true;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.default.fileformat=RCFILE;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+SET hive.default.fileformat=TextFile;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.default.fileformat=RCFILE;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+SET hive.default.fileformat=TextFile;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
+
+SET hive.default.fileformat=RCFILE;
+
+CREATE INDEX src_index ON TABLE src(key) as 'COMPACT' WITH DEFERRED REBUILD;
+ALTER INDEX src_index ON src REBUILD;
+
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.VerifyHiveSortedInputFormatUsedHook;
+
+SELECT * FROM src WHERE key = '0';
+
+SELECT * FROM src WHERE key < '1';
+
+SELECT * FROM src WHERE key <= '0';
+
+SELECT * FROM src WHERE key > '8';
+
+SELECT * FROM src WHERE key >= '9';
+
+SET hive.exec.post.hooks=;
+
+DROP INDEX src_index ON src;
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (working copy)
@@ -24,6 +24,7 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
@@ -81,12 +82,12 @@
void evaluate() throws HiveException {
obj = eval.evaluate(rowObject);
}
-
+
public Object get() throws HiveException {
return obj;
}
}
-
+
public ExprNodeGenericFuncEvaluator(ExprNodeGenericFuncDesc expr) {
this.expr = expr;
children = new ExprNodeEvaluator[expr.getChildExprs().size()];
@@ -152,4 +153,27 @@
return genericUDF.evaluate(deferredChildren);
}
+ /**
+ * If the genericUDF is a base comparison, it returns an integer based on the result of comparing
+ * the two sides of the UDF, like the compareTo method in Comparable.
+ *
+ * If the genericUDF is not a base comparison, or there is an error executing the comparison, it
+ * returns null.
+ * @param row
+ * @return
+ * @throws HiveException
+ */
+ public Integer compare(Object row) throws HiveException {
+ if (!(genericUDF instanceof GenericUDFBaseCompare)) {
+ return null;
+ }
+
+ rowObject = row;
+ if (isEager) {
+ for (int i = 0; i < deferredChildren.length; i++) {
+ ((EagerExprObject) deferredChildren[i]).evaluate();
+ }
+ }
+ return ((GenericUDFBaseCompare)genericUDF).compare(deferredChildren);
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -526,6 +526,9 @@
conf.set("hive.index.compact.file", work.getIndexIntermediateFile());
conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile());
}
+
+ // Intentionally overwrites anything the user may have put here
+ conf.setBoolean("hive.input.format.sorted", work.isInputFormatSorted());
}
public boolean mapStarted() {
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -49,6 +50,8 @@
private transient ExprNodeEvaluator conditionEvaluator;
private transient PrimitiveObjectInspector conditionInspector;
private transient int consecutiveFails;
+ private transient int consecutiveSearches;
+ private transient IOContext ioContext;
transient int heartbeatInterval;
public FilterOperator() {
@@ -56,6 +59,7 @@
filtered_count = new LongWritable();
passed_count = new LongWritable();
consecutiveFails = 0;
+ consecutiveSearches = 0;
}
@Override
@@ -67,6 +71,7 @@
statsMap.put(Counter.FILTERED, filtered_count);
statsMap.put(Counter.PASSED, passed_count);
conditionInspector = null;
+ ioContext = IOContext.get();
} catch (Throwable e) {
throw new HiveException(e);
}
@@ -80,7 +85,47 @@
conditionInspector = (PrimitiveObjectInspector) conditionEvaluator
.initialize(rowInspector);
}
+
+ // If the input is sorted, and we are executing a search based on the arguments to this filter,
+ // set the comparison in the IOContext and the type of the UDF
+ if (conf.isSortedFilter() && ioContext.useSorted()) {
+ if (!(conditionEvaluator instanceof ExprNodeGenericFuncEvaluator)) {
+ LOG.error("Attempted to use the fact data is sorted when the conditionEvaluator is not " +
+ "of type ExprNodeGenericFuncEvaluator");
+ ioContext.setUseSorted(false);
+ return;
+ } else {
+ ioContext.setComparison(((ExprNodeGenericFuncEvaluator)conditionEvaluator).compare(row));
+ }
+
+ if (ioContext.getGenericUDFClassName() == null) {
+ ioContext.setGenericUDFClassName(
+ ((ExprNodeGenericFuncEvaluator)conditionEvaluator).genericUDF.getClass().getName());
+ }
+
+ // If we are currently searching the data for a place to begin, do not return data yet
+ if (ioContext.isBinarySearching()) {
+ consecutiveSearches++;
+ // In case we're searching through an especially large set of data, send a heartbeat in
+ // order to avoid timeout
+ if (((consecutiveSearches % heartbeatInterval) == 0) && (reporter != null)) {
+ reporter.progress();
+ }
+ return;
+ }
+ }
+
Object condition = conditionEvaluator.evaluate(row);
+
+ // If we are currently performing a binary search on the input, don't forward the results
+ // Currently this value is set when a query is optimized using a compact index. The map reduce
+ // job responsible for scanning and filtering the index sets this value. It remains set
+ // throughout the binary search executed by the HiveBinarySearchRecordResder until a starting
+ // point for a linear scan has been identified, at which point this value is unset.
+ if (ioContext.isBinarySearching()) {
+ return;
+ }
+
Boolean ret = (Boolean) conditionInspector
.getPrimitiveJavaObject(condition);
if (Boolean.TRUE.equals(ret)) {
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java (working copy)
@@ -68,6 +68,8 @@
private org.apache.hadoop.hive.ql.plan.ExprNodeDesc predicate;
private boolean isSamplingPred;
private transient sampleDesc sampleDescr;
+ // Is this a filter that should perform a comparison for sorted searches
+ private boolean isSortedFilter;
public FilterDesc() {
}
@@ -116,4 +118,12 @@
this.sampleDescr = sampleDescr;
}
+ public boolean isSortedFilter() {
+ return isSortedFilter;
+ }
+
+ public void setSortedFilter(boolean isSortedFilter) {
+ this.isSortedFilter = isSortedFilter;
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -87,6 +87,9 @@
private boolean mapperCannotSpanPartns;
+ // used to indicate the input is sorted, and so a BinarySearchRecordReader shoudl be used
+ private boolean inputFormatSorted = false;
+
public MapredWork() {
aliasToPartnInfo = new LinkedHashMap();
}
@@ -437,6 +440,14 @@
this.opParseCtxMap = opParseCtxMap;
}
+ public boolean isInputFormatSorted() {
+ return inputFormatSorted;
+ }
+
+ public void setInputFormatSorted(boolean inputFormatSorted) {
+ this.inputFormatSorted = inputFormatSorted;
+ }
+
public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
TableDesc tblDesc, ArrayList aliases, PartitionDesc partDesc) {
pathToAliases.put(path.toString(), aliases);
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (working copy)
@@ -19,28 +19,73 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
+/** This class prepares an IOContext, and provides the ability to perform a binary search on the
+ * data. The binary search can be used by setting the value of inputFormatSorted in the
+ * MapreduceWork to true, but it should only be used if the data is going to a FilterOperator,
+ * which filters by comparing a value in the data with a constant, using one of the comparisons
+ * =, <, >, <=, >=. If the RecordReader's underlying format is an RCFile, this object can perform
+ * a binary search to find the block to begin reading from, and stop reading once it can be
+ * determined no other entries will match the filter.
+ */
public abstract class HiveContextAwareRecordReader implements RecordReader {
+ private static final Log LOG = LogFactory.getLog(HiveContextAwareRecordReader.class.getName());
+
private boolean initDone = false;
+ private long rangeStart;
+ private long rangeEnd;
+ private long splitEnd;
+ private long previousPosition = -1;
+ private boolean wasUsingSortedSearch = false;
+ private String genericUDFClassName = null;
+ private final List stopComparisons = new ArrayList();
- /**
- * Reads the next key/value pair from the input for processing.
- *
- * @param key the key to read data into
- * @param value the value to read data into
- * @return true if a key/value was read, false if at EOF
- */
- public abstract boolean doNext(K key, V value) throws IOException;
+ protected RecordReader recordReader;
+ protected JobConf jobConf;
+ protected boolean isSorted = false;
+ public HiveContextAwareRecordReader(JobConf conf) throws IOException {
+ this(null, conf);
+ }
+
+ public HiveContextAwareRecordReader(RecordReader recordReader) {
+ this.recordReader = recordReader;
+ }
+
+ public HiveContextAwareRecordReader(RecordReader recordReader, JobConf conf)
+ throws IOException {
+ this.recordReader = recordReader;
+ this.jobConf = conf;
+
+ this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false);
+ }
+
+ public void setRecordReader(RecordReader recordReader) {
+ this.recordReader = recordReader;
+
+ this.isSorted = false;
+ }
+
/**
* Close this {@link InputSplit} to future operations.
*
@@ -120,6 +165,7 @@
public void initIOContext(FileSplit split, JobConf job,
Class inputFormatClass, RecordReader recordReader) throws IOException {
+
boolean blockPointer = false;
long blockStart = -1;
FileSplit fileSplit = (FileSplit) split;
@@ -142,5 +188,161 @@
in.close();
}
this.initIOContext(blockStart, blockPointer, split.getPath().toString());
+
+ this.initIOContextSortedProps(split, recordReader);
}
+
+ public void initIOContextSortedProps(FileSplit split, RecordReader recordReader) {
+ this.getIOContext().resetSortingValues();
+
+ this.rangeStart = split.getStart();
+ this.rangeEnd = split.getStart() + split.getLength();
+ this.splitEnd = rangeEnd;
+ if (recordReader instanceof RCFileRecordReader && rangeEnd != 0 && this.isSorted) {
+ // Binary search only works if we know the size of the split, and the recordReader is an
+ // RCFileRecordReader
+ this.getIOContext().setUseSorted(true);
+ this.getIOContext().setIsBinarySearching(true);
+ this.wasUsingSortedSearch = true;
+ } else {
+ // Use the defalut methods for next in the child class
+ this.isSorted = false;
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (this.getIOContext().isBinarySearching()) {
+ return 0;
+ } else {
+ return recordReader.getProgress();
+ }
+ }
+
+ public boolean doNext(K key, V value) throws IOException {
+ if (this.isSorted) {
+ if (this.getIOContext().shouldEndBinarySearch() ||
+ (!this.getIOContext().useSorted() && this.wasUsingSortedSearch)) {
+ beginLinearSearch();
+ this.wasUsingSortedSearch = false;
+ this.getIOContext().setEndBinarySearch(false);
+ }
+
+ if (this.getIOContext().useSorted()) {
+ if (this.genericUDFClassName == null &&
+ this.getIOContext().getGenericUDFClassName() != null) {
+ setGenericUDFClassName(this.getIOContext().getGenericUDFClassName());
+ }
+
+ if (this.getIOContext().isBinarySearching()) {
+ // Proceed with a binary search
+ if (this.getIOContext().getComparison() != null) {
+ switch (this.getIOContext().getComparison()) {
+ case GREATER:
+ case EQUAL:
+ // Indexes have only one entry per value, could go linear from here, if we want to
+ // use this for any sorted table, we'll need to continue the search
+ rangeEnd = previousPosition;
+ break;
+ case LESS:
+ rangeStart = previousPosition;
+ break;
+ default:
+ break;
+ }
+ }
+
+ long position = (rangeStart + rangeEnd) / 2;
+ sync(position);
+
+ long newPosition = getSyncedPosition();
+ // If the newPosition is the same as the previousPosition, we've reached the end of the
+ // binary search, if the new position at least as big as the size of the split, any
+ // matching rows must be in the final block, so we can end the binary search.
+ if (newPosition == previousPosition || newPosition >= splitEnd) {
+ this.getIOContext().setIsBinarySearching(false);
+ sync(rangeStart);
+ }
+
+ previousPosition = newPosition;
+ } else if (foundAllTargets()) {
+ // Found all possible rows which will not be filtered
+ return false;
+ }
+ }
+ }
+
+ try {
+ return recordReader.next(key, value);
+ } catch (Exception e) {
+ return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
+ }
+ }
+
+ private void sync(long position) throws IOException {
+ ((RCFileRecordReader)recordReader).sync(position);
+ ((RCFileRecordReader)recordReader).resetBuffer();
+ }
+
+ private long getSyncedPosition() throws IOException {
+ return recordReader.getPos();
+ }
+ /**
+ * This uses the name of the generic UDF being used by the filter to determine whether we should
+ * perform a binary search, and what the comparisons we should use to signal the end of the
+ * linear scan are.
+ * @param genericUDFClassName
+ * @throws IOException
+ */
+ private void setGenericUDFClassName(String genericUDFClassName) throws IOException {
+ this.genericUDFClassName = genericUDFClassName;
+ if (genericUDFClassName.equals(GenericUDFOPEqual.class.getName())) {
+ stopComparisons.add(Comparison.GREATER);
+ } else if (genericUDFClassName.equals(GenericUDFOPLessThan.class.getName())) {
+ stopComparisons.add(Comparison.EQUAL);
+ stopComparisons.add(Comparison.GREATER);
+ if (this.getIOContext().isBinarySearching()) {
+ beginLinearSearch();
+ }
+ } else if (genericUDFClassName.equals(GenericUDFOPEqualOrLessThan.class.getName())) {
+ stopComparisons.add(Comparison.GREATER);
+ if (this.getIOContext().isBinarySearching()) {
+ beginLinearSearch();
+ }
+ } else if (genericUDFClassName.equals(GenericUDFOPGreaterThan.class.getName()) ||
+ genericUDFClassName.equals(GenericUDFOPEqualOrGreaterThan.class.getName())) {
+ // Do nothing
+ } else {
+ // This is an unsupported operator
+ LOG.debug(genericUDFClassName + " is not the name of a supported class. " +
+ "Continuing linearly.");
+ if (this.getIOContext().isBinarySearching()) {
+ beginLinearSearch();
+ }
+ }
+ }
+
+ /**
+ * This should be called after the binary search is finished and before the linear scan begins
+ * @throws IOException
+ */
+ private void beginLinearSearch() throws IOException {
+ sync(rangeStart);
+ this.getIOContext().setIsBinarySearching(false);
+ this.wasUsingSortedSearch = false;
+ }
+
+ /**
+ * Returns true if the current comparison is in the list of stop comparisons, i.e. we've found
+ * all records which won't be filtered
+ * @return
+ */
+ public boolean foundAllTargets() {
+ if (this.getIOContext().getComparison() == null ||
+ !stopComparisons.contains(this.getIOContext().getComparison())) {
+ return false;
+ }
+
+ return true;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy)
@@ -43,12 +43,12 @@
import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
@@ -61,7 +61,7 @@
* RCFiles, short of Record Columnar File, are flat files
* consisting of binary key/value pairs, which shares much similarity with
* SequenceFile.
- *
+ *
* RCFile stores columns of a table in a record columnar way. It first
* partitions rows horizontally into row splits. and then it vertically
* partitions each row split in a columnar way. RCFile first stores the meta
@@ -75,7 +75,7 @@
* RCFile provides {@link Writer}, {@link Reader} and classes for
* writing, reading respectively.
*
- *
+ *
*
* RCFile stores columns of a table in a record columnar way. It first
* partitions rows horizontally into row splits. and then it vertically
@@ -83,21 +83,21 @@
* data of a row split, as the key part of a record, and all the data of a row
* split as the value part.
*
- *
+ *
*
* RCFile compresses values in a more fine-grained manner then record level
* compression. However, It currently does not support compress the key part
* yet. The actual compression algorithm used to compress key and/or values can
* be specified by using the appropriate {@link CompressionCodec}.
*
- *
+ *
*
* The {@link Reader} is used to read and explain the bytes of RCFile.
*
- *
+ *
*
- *
- *
+ *
+ *
*
*
* - version - 3 bytes of magic header SEQ, followed by 1 byte of
@@ -113,7 +113,7 @@
*
- metadata - {@link Metadata} for this file.
* - sync - A sync marker to denote end of the header.
*
- *
+ *
* RCFile Format
*
* - Header
@@ -143,7 +143,7 @@
*
*
*
- *
+ *
*/
public class RCFile {
@@ -177,7 +177,7 @@
/**
* KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
* below:
- *
+ *
*
* - record length in bytes,it is the sum of bytes used to store the key
* part and the value part.
@@ -204,7 +204,7 @@
private int numberRows = 0;
// how many columns
private int columnNumber = 0;
-
+
// return the number of columns recorded in this file's header
public int getColumnNumber() {
return columnNumber;
@@ -227,7 +227,7 @@
/**
* add in a new column's meta data.
- *
+ *
* @param columnValueLen
* this total bytes number of this column's values in this split
* @param colValLenBuffer
@@ -277,7 +277,7 @@
/**
* get number of bytes to store the keyBuffer.
- *
+ *
* @return number of bytes used to store this KeyBuffer on disk
* @throws IOException
*/
@@ -370,7 +370,7 @@
Decompressor valDecompressor = null;
NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
CompressionInputStream deflatFilter = null;
-
+
public ValueBuffer() throws IOException {
}
@@ -522,7 +522,7 @@
CodecPool.returnDecompressor(valDecompressor);
}
}
-
+
@Override
public int compareTo(Object arg0) {
throw new RuntimeException("compareTo not supported in class "
@@ -533,7 +533,7 @@
/**
* Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
* compatible with SequenceFile's.
- *
+ *
*/
public static class Writer {
@@ -664,7 +664,7 @@
/**
* Constructs a RCFile Writer.
- *
+ *
* @param fs
* the file system used
* @param conf
@@ -680,7 +680,7 @@
/**
* Constructs a RCFile Writer.
- *
+ *
* @param fs
* the file system used
* @param conf
@@ -699,9 +699,9 @@
}
/**
- *
+ *
* Constructs a RCFile Writer.
- *
+ *
* @param fs
* the file system used
* @param conf
@@ -837,7 +837,7 @@
* column number in the file, zero bytes are appended for the empty columns.
* If its size() is greater then the column number in the file, the exceeded
* columns' bytes are ignored.
- *
+ *
* @param val
* @throws IOException
*/
@@ -936,7 +936,7 @@
bufferedRecords = 0;
columnBufferSize = 0;
}
-
+
/**
* flush a block out without doing anything except compressing the key part.
*/
@@ -945,7 +945,7 @@
checkAndWriteSync(); // sync
out.writeInt(recordLen); // total record length
out.writeInt(keyLength); // key portion length
-
+
if(this.isCompressed()) {
//compress key and write key out
keyCompressionBuffer.reset();
@@ -1001,7 +1001,7 @@
/**
* Read KeyBuffer/ValueBuffer pairs from a RCFile.
- *
+ *
*/
public static class Reader {
private static class SelectedColumn {
@@ -1032,7 +1032,7 @@
private final ValueBuffer currentValue;
- private boolean[] skippedColIDs = null;
+ private final boolean[] skippedColIDs = null;
private int readRowsIndexInBuffer = 0;
@@ -1242,7 +1242,7 @@
/**
* Set the current byte position in the input file.
- *
+ *
*
* The position passed must be a position returned by
* {@link RCFile.Writer#getLength()} when writing this file. To seek to an
@@ -1254,6 +1254,18 @@
in.seek(position);
}
+ /**
+ * Resets the values which determine if there are more rows in the buffer
+ *
+ * This can be used after one calls seek or sync, if one called next before that.
+ * Otherwise, the seek or sync will have no effect, it will continue to get rows from the
+ * buffer built up from the call to next.
+ */
+ public synchronized void resetBuffer() {
+ readRowsIndexInBuffer = 0;
+ recordsNumInValBuffer = 0;
+ }
+
/** Seek to the next sync mark past a given position. */
public synchronized void sync(long position) throws IOException {
if (position + SYNC_SIZE >= end) {
@@ -1309,7 +1321,7 @@
/**
* Read and return the next record length, potentially skipping over a sync
* block.
- *
+ *
* @return the length of the next record or -1 if there is no next record
* @throws IOException
*/
@@ -1407,7 +1419,7 @@
currentValue.readFields(in);
currentValue.inited = true;
}
-
+
public boolean nextBlock() throws IOException {
int keyLength = nextKeyBuffer();
if(keyLength > 0) {
@@ -1430,7 +1442,7 @@
* Calling getColumn() with not change the result of
* {@link #next(LongWritable)} and
* {@link #getCurrentRow(BytesRefArrayWritable)}.
- *
+ *
* @param columnID
* @throws IOException
*/
@@ -1459,7 +1471,7 @@
ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
boolean decompressed = currentValue.decompressedFlag[selColIdx];
if (decompressed) {
- uncompData =
+ uncompData =
currentValue.loadedColumnsValueBuffer[selColIdx].getData();
} else {
decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
@@ -1485,7 +1497,7 @@
* current value buffer. It will influence the result of
* {@link #next(LongWritable)} and
* {@link #getCurrentRow(BytesRefArrayWritable)}
- *
+ *
* @return whether there still has records or not
* @throws IOException
*/
@@ -1500,7 +1512,7 @@
* of rows passed by, because {@link #seek(long)},
* {@link #nextColumnsBatch()} can change the underlying key buffer and
* value buffer.
- *
+ *
* @return next row number
* @throws IOException
*/
@@ -1567,7 +1579,7 @@
/**
* get the current row used,make sure called {@link #next(LongWritable)}
* first.
- *
+ *
* @throws IOException
*/
public synchronized void getCurrentRow(BytesRefArrayWritable ret) throws IOException {
@@ -1596,11 +1608,11 @@
for (int j = 0; j < selectedColumns.length; ++j) {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
-
+
BytesRefWritable ref = ret.unCheckedGet(i);
-
+
colAdvanceRow(j, col);
-
+
if (currentValue.decompressedFlag[j]) {
ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
col.rowReadIndex, col.prvLength);
@@ -1611,14 +1623,14 @@
col.rowReadIndex += col.prvLength;
}
} else {
- // This version of the loop eliminates a condition check and branch
+ // This version of the loop eliminates a condition check and branch
// and is measurably faster (20% or so)
for (int j = 0; j < selectedColumns.length; ++j) {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
-
+
BytesRefWritable ref = ret.unCheckedGet(i);
-
+
colAdvanceRow(j, col);
ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
col.rowReadIndex, col.prvLength);
@@ -1666,7 +1678,7 @@
public String toString() {
return file.toString();
}
-
+
public boolean isCompressedRCFile() {
return this.decompress;
}
@@ -1689,7 +1701,7 @@
public KeyBuffer getCurrentKeyBufferObj() {
return this.currentKey;
}
-
+
/**
* return the ValueBuffer object used in the reader. Internally in each
* reader, there is only one ValueBuffer object, which gets reused for every
@@ -1698,7 +1710,7 @@
public ValueBuffer getCurrentValueBufferObj() {
return this.currentValue;
}
-
+
//return the current block's length
public int getCurrentBlockLength() {
return this.currentRecordLength;
@@ -1713,11 +1725,11 @@
public int getCurrentCompressedKeyLen() {
return this.compressedKeyLen;
}
-
+
//return the CompressionCodec used for this file
public CompressionCodec getCompressionCodec() {
return this.codec;
}
-
+
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (working copy)
@@ -129,6 +129,14 @@
in.seek(pos);
}
+ public void sync(long pos) throws IOException {
+ in.sync(pos);
+ }
+
+ public void resetBuffer() {
+ in.resetBuffer();
+ }
+
public long getStart() {
return start;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (working copy)
@@ -30,7 +30,6 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
/**
@@ -42,12 +41,10 @@
public class CombineHiveRecordReader
extends HiveContextAwareRecordReader {
- private final RecordReader recordReader;
-
public CombineHiveRecordReader(InputSplit split, Configuration conf,
Reporter reporter, Integer partition) throws IOException {
- JobConf job = (JobConf) conf;
- CombineHiveInputSplit hsplit = new CombineHiveInputSplit(job,
+ super((JobConf)conf);
+ CombineHiveInputSplit hsplit = new CombineHiveInputSplit(jobConf,
(InputSplitShim) split);
String inputFormatClassName = hsplit.inputFormatClassName();
Class inputFormatClass = null;
@@ -58,15 +55,16 @@
+ inputFormatClassName);
}
InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(
- inputFormatClass, job);
+ inputFormatClass, jobConf);
// create a split for the given partition
FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
.getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit
.getLocations());
- this.recordReader = inputFormat.getRecordReader(fsplit, job, reporter);
- this.initIOContext(fsplit, job, inputFormatClass, this.recordReader);
+ this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, reporter));
+
+ this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader);
}
@Override
@@ -86,7 +84,12 @@
return recordReader.getPos();
}
+ @Override
public float getProgress() throws IOException {
+ if (isSorted) {
+ return super.getProgress();
+ }
+
return recordReader.getProgress();
}
@@ -95,6 +98,6 @@
if (ExecMapper.getDone()) {
return false;
}
- return recordReader.next(key, value);
+ return super.doNext(key, value);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (working copy)
@@ -45,6 +45,24 @@
boolean isBlockPointer;
boolean ioExceptions;
+ // Are we using the fact the input is sorted
+ boolean useSorted = false;
+ // Are we currently performing a binary search
+ boolean isBinarySearching = false;
+ // Do we want to end the binary search
+ boolean endBinarySearch = false;
+ // The result of the comparison of the last row processed
+ Comparison comparison = null;
+ // The class name of the generic UDF being used by the filter
+ String genericUDFClassName = null;
+
+ public static enum Comparison {
+ GREATER,
+ LESS,
+ EQUAL,
+ UNKNOWN
+ }
+
String inputFile;
public IOContext() {
@@ -102,4 +120,69 @@
public boolean getIOExceptions() {
return ioExceptions;
}
+
+ public boolean useSorted() {
+ return useSorted;
+ }
+
+ public void setUseSorted(boolean useSorted) {
+ this.useSorted = useSorted;
+ }
+
+ public boolean isBinarySearching() {
+ return isBinarySearching;
+ }
+
+ public void setIsBinarySearching(boolean isBinarySearching) {
+ this.isBinarySearching = isBinarySearching;
+ }
+
+ public boolean shouldEndBinarySearch() {
+ return endBinarySearch;
+ }
+
+ public void setEndBinarySearch(boolean endBinarySearch) {
+ this.endBinarySearch = endBinarySearch;
+ }
+
+ public Comparison getComparison() {
+ return comparison;
+ }
+
+ public void setComparison(Integer comparison) {
+ if (comparison == null && this.isBinarySearching) {
+ // Nothing we can do here, so just proceed normally from now on
+ endBinarySearch = true;
+ } else {
+ if (comparison == null) {
+ this.comparison = Comparison.UNKNOWN;
+ } else if (comparison.intValue() < 0) {
+ this.comparison = Comparison.LESS;
+ } else if (comparison.intValue() > 0) {
+ this.comparison = Comparison.GREATER;
+ } else {
+ this.comparison = Comparison.EQUAL;
+ }
+ }
+ }
+
+ public String getGenericUDFClassName() {
+ return genericUDFClassName;
+ }
+
+ public void setGenericUDFClassName(String genericUDFClassName) {
+ this.genericUDFClassName = genericUDFClassName;
+ }
+
+ /**
+ * The thread local IOContext is static, we may need to restart the search if, for instance,
+ * multiple files are being searched as part of a CombinedHiveRecordReader
+ */
+ public void resetSortingValues() {
+ this.useSorted = false;
+ this.isBinarySearching = false;
+ this.endBinarySearch = false;
+ this.comparison = null;
+ this.genericUDFClassName = null;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (working copy)
@@ -20,7 +20,6 @@
import java.io.IOException;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.ql.exec.ExecMapper;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -34,21 +33,19 @@
public class HiveRecordReader
extends HiveContextAwareRecordReader {
- private final RecordReader recordReader;
-
- private JobConf jobConf;
+
public HiveRecordReader(RecordReader recordReader)
throws IOException {
- this.recordReader = recordReader;
+ super(recordReader);
}
-
+
public HiveRecordReader(RecordReader recordReader, JobConf conf)
throws IOException {
- this.recordReader = recordReader;
- this.jobConf = conf;
+ super(recordReader, conf);
}
+ @Override
public void doClose() throws IOException {
recordReader.close();
}
@@ -65,7 +62,12 @@
return recordReader.getPos();
}
+ @Override
public float getProgress() throws IOException {
+ if (isSorted) {
+ return super.getProgress();
+ }
+
return recordReader.getProgress();
}
@@ -74,11 +76,7 @@
if (ExecMapper.getDone()) {
return false;
}
- try {
- return recordReader.next(key, value);
- } catch (Exception e) {
- return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
- }
+ return super.doNext(key, value);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy)
@@ -33,8 +33,6 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandler;
-import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -174,7 +172,7 @@
}
JobConf job;
-
+
public void configure(JobConf job) {
this.job = job;
}
@@ -207,7 +205,7 @@
Reporter reporter) throws IOException {
HiveInputSplit hsplit = (HiveInputSplit) split;
-
+
InputSplit inputSplit = hsplit.getInputSplit();
String inputFormatClassName = null;
Class inputFormatClass = null;
@@ -249,7 +247,7 @@
rr.initIOContext(hsplit, job, inputFormatClass, innerReader);
return rr;
}
-
+
protected Map pathToPartitionInfo;
MapredWork mrwork = null;
@@ -382,11 +380,11 @@
if (this.mrwork == null) {
init(job);
}
-
+
if(this.mrwork.getPathToAliases() == null) {
return;
}
-
+
ArrayList aliases = new ArrayList();
Iterator>> iterator = this.mrwork
.getPathToAliases().entrySet().iterator();
Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (working copy)
@@ -23,12 +23,10 @@
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
/**
* BucketizedHiveRecordReader is a wrapper on a list of RecordReader. It behaves
@@ -39,68 +37,66 @@
extends HiveContextAwareRecordReader {
protected final BucketizedHiveInputSplit split;
protected final InputFormat inputFormat;
- protected final JobConf jobConf;
protected final Reporter reporter;
- protected RecordReader curReader;
protected long progress;
protected int idx;
-
+
public BucketizedHiveRecordReader(InputFormat inputFormat,
BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf,
Reporter reporter) throws IOException {
+ super(jobConf);
+
this.split = bucketizedSplit;
this.inputFormat = inputFormat;
- this.jobConf = jobConf;
this.reporter = reporter;
initNextRecordReader();
}
+ @Override
public void doClose() throws IOException {
- if (curReader != null) {
- curReader.close();
- curReader = null;
+ if (recordReader != null) {
+ recordReader.close();
+ recordReader = null;
}
idx = 0;
}
public K createKey() {
- return (K) curReader.createKey();
+ return (K) recordReader.createKey();
}
public V createValue() {
- return (V) curReader.createValue();
+ return (V) recordReader.createValue();
}
public long getPos() throws IOException {
- if (curReader != null) {
- return curReader.getPos();
+ if (recordReader != null) {
+ return recordReader.getPos();
} else {
return 0;
}
}
+ @Override
public float getProgress() throws IOException {
// The calculation is strongly dependent on the assumption that all splits
// came from the same file
- return Math.min(1.0f, ((curReader == null) ? progress : curReader.getPos())
- / (float) (split.getLength()));
+ return Math.min(1.0f, ((recordReader == null || this.getIOContext().isBinarySearching()) ?
+ progress : recordReader.getPos()) / (float) (split.getLength()));
}
+ @Override
public boolean doNext(K key, V value) throws IOException {
- while ((curReader == null) || !doNextWithExceptionHandler(key, value)) {
+ while ((recordReader == null) || !doNextWithExceptionHandler(key, value)) {
if (!initNextRecordReader()) {
return false;
}
}
return true;
}
-
+
private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
- try {
- return curReader.next(key, value);
- } catch (Exception e) {
- return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
- }
+ return super.doNext(key, value);
}
/**
@@ -108,9 +104,9 @@
* BucketizedHiveRecordReader.
*/
protected boolean initNextRecordReader() throws IOException {
- if (curReader != null) {
- curReader.close();
- curReader = null;
+ if (recordReader != null) {
+ recordReader.close();
+ recordReader = null;
if (idx > 0) {
progress += split.getLength(idx - 1); // done processing so far
}
@@ -123,11 +119,16 @@
// get a record reader for the idx-th chunk
try {
- curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf,
+ recordReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf,
reporter);
} catch (Exception e) {
- curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jobConf);
+ recordReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jobConf);
}
+
+ // if we're performing a binary search, we need to restart it
+ if (isSorted) {
+ initIOContextSortedProps((FileSplit) split.getSplit(idx), recordReader);
+ }
idx++;
return true;
}
Index: ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (working copy)
@@ -18,7 +18,10 @@
package org.apache.hadoop.hive.ql.index.compact;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
@@ -27,11 +30,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -39,14 +45,19 @@
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.index.TableBasedIndexHandler;
+import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate;
import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -57,6 +68,10 @@
public class CompactIndexHandler extends TableBasedIndexHandler {
private Configuration configuration;
+ // The names of the partition columns
+ private Set partitionCols;
+ // Whether or not the conditions have been met to use the fact the index is sorted
+ private boolean useSorted;
private static final Log LOG = LogFactory.getLog(CompactIndexHandler.class.getName());
@@ -170,13 +185,85 @@
Driver driver = new Driver(queryConf);
driver.compile(qlCommand.toString(), false);
+ if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
+ // For now, only works if the predicate is a single condition
+ MapredWork work = null;
+ for (Task task : driver.getPlan().getRootTasks()) {
+ // The index query should have one and only one map reduce task in the root tasks
+ // Otherwise something is wrong, log the problem and continue using the default format
+ if (task.getWork() instanceof MapredWork) {
+ if (work != null) {
+ LOG.error("Tried to use a binary search on a compact index but there were an " +
+ "unexpected number (>1) of root level map reduce tasks in the " +
+ "reentrant query plan.");
+ work.setInputformat(null);
+ break;
+ }
+ work = (MapredWork)task.getWork();
+ String inputFormat = work.getInputformat();
+ if (inputFormat == null) {
+ inputFormat = HiveConf.getVar(pctx.getConf(), HiveConf.ConfVars.HIVEINPUTFORMAT);
+ }
+ // We can only perform a binary search with HiveInputFormat and CombineHiveInputFormat
+ // and BucketizedHiveInputFormat
+ if (!inputFormat.equals(HiveInputFormat.class.getName()) &&
+ !inputFormat.equals(CombineHiveInputFormat.class.getName()) &&
+ !inputFormat.equals(BucketizedHiveInputFormat.class.getName())) {
+ work = null;
+ break;
+ }
+
+ work.setInputFormatSorted(true);
+ }
+ }
+
+ if (work != null) {
+ // Find the filter operator which acts on the index column and mark it
+ findIndexColumnFilter(work.getAliasToWork().values());
+ }
+ }
+
+
queryContext.addAdditionalSemanticInputs(driver.getPlan().getInputs());
queryContext.setQueryTasks(driver.getPlan().getRootTasks());
return;
}
/**
+ * Does a depth first search on the operator tree looking for a filter operator whose predicate
+ * has one child which is a column which is not in the partition
+ * @param operators
+ * @return whether or not it has found its target
+ */
+ private boolean findIndexColumnFilter(Collection> operators) {
+ for (Operator extends Serializable> op : operators) {
+ if (op instanceof FilterOperator &&
+ ((FilterOperator)op).getConf().getPredicate().getChildren().size() == 2) {
+ ExprNodeColumnDesc columnDesc = null;
+ ExprNodeDesc predicate = ((FilterOperator)op).getConf().getPredicate();
+ if (predicate.getChildren().get(0) instanceof ExprNodeColumnDesc) {
+ columnDesc = (ExprNodeColumnDesc)predicate.getChildren().get(0);
+ } else if (predicate.getChildren().get(1) instanceof ExprNodeColumnDesc) {
+ columnDesc = (ExprNodeColumnDesc)predicate.getChildren().get(1);
+ }
+
+ // Is this the target
+ if (columnDesc != null && !partitionCols.contains(columnDesc.getColumn())) {
+ ((FilterOperator)op).getConf().setSortedFilter(true);
+ return true;
+ }
+ }
+
+ // If the target has been found, no need to continue
+ if (findIndexColumnFilter(op.getChildOperators())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Split the predicate into the piece we can deal with (pushed), and the one we can't (residual)
* @param predicate
* @param index
@@ -193,6 +280,20 @@
return null;
}
+ int numIndexCols = 0;
+ for (IndexSearchCondition searchCondition : searchConditions) {
+ if (!partitionCols.contains(searchCondition.getColumnDesc().getColumn())) {
+ numIndexCols++;
+ }
+ }
+
+ // For now, only works if the predicate has a single condition on an index column
+ if (numIndexCols == 1) {
+ useSorted = true;
+ } else {
+ useSorted = false;
+ }
+
DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions);
decomposedPredicate.residualPredicate = residualPredicate;
@@ -224,12 +325,14 @@
// partitioned columns are treated as if they have indexes so that the partitions
// are used during the index query generation
+ partitionCols = new HashSet();
for (Partition part : queryPartitions) {
if (part.getSpec().isEmpty()) {
continue; // empty partitions are from whole tables, so we don't want to add them in
}
for (String column : part.getSpec().keySet()) {
analyzer.allowColumnName(column);
+ partitionCols.add(column);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java (revision 1183507)
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBaseCompare.java (working copy)
@@ -22,11 +22,13 @@
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ReturnObjectInspectorResolver;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
@@ -159,7 +161,35 @@
}
+ public Integer compare(DeferredObject[] arguments) throws HiveException {
+ Object o0,o1;
+ o0 = arguments[0].get();
+ if (o0 == null) {
+ return null;
+ }
+ o1 = arguments[1].get();
+ if (o1 == null) {
+ return null;
+ }
+ if (compareType == CompareType.NEED_CONVERT) {
+ Object converted_o0 = converter0.convert(o0);
+ if (converted_o0 == null) {
+ return null;
+ }
+ Object converted_o1 = converter1.convert(o1);
+ if (converted_o1 == null) {
+ return null;
+ }
+ return ObjectInspectorUtils.compare(
+ converted_o0, compareOI,
+ converted_o1, compareOI);
+ } else {
+ return ObjectInspectorUtils.compare(
+ o0, argumentOIs[0], o1, argumentOIs[1]);
+ }
+ }
+
@Override
public String getDisplayString(String[] children) {
assert (children.length == 2);