diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java index c21be8d..f4004e7 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java @@ -39,6 +39,8 @@ /** * For Writable */ + private transient long length = -1; + public HBaseSplit() { super((Path) null, 0, 0, (String[]) null); tableSplit = new TableSplit(); @@ -101,6 +103,9 @@ public void write(DataOutput out) throws IOException { @Override public long getLength() { + if (length >= 0) { + return length; + } long val = 0; try { val = isTableSplit ? tableSplit.getLength() : snapshotSplit.getLength(); @@ -109,6 +114,10 @@ public long getLength() { } } + public void setLength(long length) { + this.length = length; + } + @Override public String[] getLocations() throws IOException { return isTableSplit ? tableSplit.getLocations() : snapshotSplit.getLocations(); diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 4ac0803..99a1fa2 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -20,13 +20,22 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -36,7 +45,9 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; +import org.apache.hadoop.hive.ql.exec.LazySplitInfoProvider; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; @@ -57,6 +68,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -76,7 +88,7 @@ * such as column pruning and filter pushdown. */ public class HiveHBaseTableInputFormat extends TableInputFormatBase - implements InputFormat { + implements InputFormat, LazySplitInfoProvider { static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class); @@ -106,6 +118,8 @@ return new RecordReader() { + private transient long pos; + @Override public void close() throws IOException { recordReader.close(); @@ -123,7 +137,7 @@ public ResultWritable createValue() { @Override public long getPos() throws IOException { - return 0; + return pos; } @Override @@ -148,8 +162,14 @@ public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws next = recordReader.nextKeyValue(); if (next) { - rowKey.set(recordReader.getCurrentValue().getRow()); - value.setResult(recordReader.getCurrentValue()); + Result result = recordReader.getCurrentValue(); + rowKey.set(result.getRow()); + value.setResult(result); + + for (Cell cell : result.rawCells()) { + pos += cell.getValueLength(); + } + pos += result.getRow().length; } } catch (InterruptedException e) { throw new IOException(e); @@ -422,4 +442,81 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( return results; } + + private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ + + String[] mapInfo = spec.split("#"); + boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false; + + switch (mapInfo.length) { + case 1: + return tblLevelDefault; + + case 2: + String storageType = mapInfo[1]; + if(storageType.equals("-")) { + return tblLevelDefault; + } else if ("string".startsWith(storageType)){ + return false; + } else if ("binary".startsWith(storageType)){ + return true; + } + + default: + throw new IOException("Malformed string: " + spec); + } + } + + @Override + public void setSplitInfo(JobConf jobConf, InputSplit[] splits) { + Configuration conf = HBaseConfiguration.create(new Configuration(jobConf)); + + try { + flushTable(conf, getHTable().getTableName()); // hate this + + Path rootPath = FSUtils.getRootDir(conf); + Path tablePath = FSUtils.getTableDir(rootPath, getHTable().getName()); + + FileSystem fs = tablePath.getFileSystem(conf); + + final Set families = toString(getHTable().getTableDescriptor().getFamiliesKeys()); + for (InputSplit split : splits) { + byte[] start = ((HBaseSplit) split).getTableSplit().getStartRow(); + HRegionInfo region = getHTable().getRegionLocation(start).getRegionInfo(); + Path regionPath = new Path(tablePath, region.getEncodedName()); + long splitLength = 0; + for (FileStatus status : fs.globStatus(new Path(regionPath, "**/*"), new PathFilter() { + public boolean accept(Path path) { + return families.contains(path.getParent().getName()); + } + })) { + splitLength += status.getLen(); + } + ((HBaseSplit)split).setLength(splitLength); + } + } catch (Exception e) { + e.printStackTrace(); + // ignore.. best effort + for (InputSplit split : splits) { + ((HBaseSplit)split).setLength(-1); // reset + } + } + } + + private void flushTable(Configuration conf, byte[] tableName) throws Exception { + HBaseAdmin hadmin = new HBaseAdmin(conf); + try { + hadmin.flush(tableName); + } finally { + IOUtils.closeStream(hadmin); + } + } + + private Set toString(Set families) { + Set names = new HashSet(); + for (byte[] familiy : families) { + names.add(new String(familiy)); + } + return names; + } } diff --git hbase-handler/src/test/queries/positive/nonmr_fetch.q hbase-handler/src/test/queries/positive/nonmr_fetch.q new file mode 100644 index 0000000..01b9c02 --- /dev/null +++ hbase-handler/src/test/queries/positive/nonmr_fetch.q @@ -0,0 +1,52 @@ +CREATE TABLE hbase_nonmr (key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string"); + +INSERT OVERWRITE TABLE hbase_nonmr SELECT * FROM src; + +set hive.fetch.task.conversion=more; + +-- backward compatible (more) +explain +select * from hbase_nonmr limit 10; +select * from hbase_nonmr limit 10; + +-- select expression +explain +select cast(key as int) * 10, upper(value) from hbase_nonmr limit 10; +select cast(key as int) * 10, upper(value) from hbase_nonmr limit 10; + +-- filter +explain +select key from hbase_nonmr where key < 100 limit 10; +select key from hbase_nonmr where key < 100 limit 10; + +-- bucket sampling +explain +select * from hbase_nonmr TABLESAMPLE (BUCKET 1 OUT OF 40 ON key); +select * from hbase_nonmr TABLESAMPLE (BUCKET 1 OUT OF 40 ON key); + +-- split sampling +explain +select * from hbase_nonmr TABLESAMPLE (0.25 PERCENT); +select * from hbase_nonmr TABLESAMPLE (0.25 PERCENT); + +-- non deterministic func +explain +select key, value, BLOCK__OFFSET__INSIDE__FILE from hbase_nonmr where rand() > 1; +select key, value, BLOCK__OFFSET__INSIDE__FILE from hbase_nonmr where rand() > 1; + +-- negative, groupby +explain select key, count(value) from hbase_nonmr group by key; + +-- negative, distinct +explain select distinct key, value from hbase_nonmr; + +-- negative, CTAS +explain create table hbase_nonmrx as select distinct key, value from hbase_nonmr; + +-- negative, analyze +explain analyze table hbase_nonmr compute statistics; + +-- negative, join +explain select * from hbase_nonmr join hbase_nonmr hbase_nonmr2 on hbase_nonmr.key=hbase_nonmr2.key; diff --git hbase-handler/src/test/results/positive/nonmr_fetch.q.out hbase-handler/src/test/results/positive/nonmr_fetch.q.out new file mode 100644 index 0000000..28c19df --- /dev/null +++ hbase-handler/src/test/results/positive/nonmr_fetch.q.out @@ -0,0 +1,548 @@ +PREHOOK: query: CREATE TABLE hbase_nonmr (key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hbase_nonmr +POSTHOOK: query: CREATE TABLE hbase_nonmr (key int, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_nonmr +PREHOOK: query: INSERT OVERWRITE TABLE hbase_nonmr SELECT * FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_nonmr +POSTHOOK: query: INSERT OVERWRITE TABLE hbase_nonmr SELECT * FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_nonmr +PREHOOK: query: -- backward compatible (more) +explain +select * from hbase_nonmr limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- backward compatible (more) +explain +select * from hbase_nonmr limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_nonmr limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_nonmr limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +0 val_0 +10 val_10 +100 val_100 +103 val_103 +104 val_104 +105 val_105 +11 val_11 +111 val_111 +113 val_113 +114 val_114 +PREHOOK: query: -- select expression +explain +select cast(key as int) * 10, upper(value) from hbase_nonmr limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- select expression +explain +select cast(key as int) * 10, upper(value) from hbase_nonmr limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: (key * 10) (type: int), upper(value) (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select cast(key as int) * 10, upper(value) from hbase_nonmr limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +POSTHOOK: query: select cast(key as int) * 10, upper(value) from hbase_nonmr limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +0 VAL_0 +100 VAL_10 +1000 VAL_100 +1030 VAL_103 +1040 VAL_104 +1050 VAL_105 +110 VAL_11 +1110 VAL_111 +1130 VAL_113 +1140 VAL_114 +PREHOOK: query: -- filter +explain +select key from hbase_nonmr where key < 100 limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: -- filter +explain +select key from hbase_nonmr where key < 100 limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (key < 100) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Limit + Number of rows: 10 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select key from hbase_nonmr where key < 100 limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +POSTHOOK: query: select key from hbase_nonmr where key < 100 limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +0 +10 +11 +12 +15 +17 +18 +19 +2 +20 +PREHOOK: query: -- bucket sampling +explain +select * from hbase_nonmr TABLESAMPLE (BUCKET 1 OUT OF 40 ON key) +PREHOOK: type: QUERY +POSTHOOK: query: -- bucket sampling +explain +select * from hbase_nonmr TABLESAMPLE (BUCKET 1 OUT OF 40 ON key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (((hash(key) & 2147483647) % 40) = 0) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_nonmr TABLESAMPLE (BUCKET 1 OUT OF 40 ON key) +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_nonmr TABLESAMPLE (BUCKET 1 OUT OF 40 ON key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +0 val_0 +120 val_120 +160 val_160 +200 val_200 +280 val_280 +360 val_360 +400 val_400 +480 val_480 +80 val_80 +PREHOOK: query: -- split sampling +explain +select * from hbase_nonmr TABLESAMPLE (0.25 PERCENT) +PREHOOK: type: QUERY +POSTHOOK: query: -- split sampling +explain +select * from hbase_nonmr TABLESAMPLE (0.25 PERCENT) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_nonmr TABLESAMPLE (0.25 PERCENT) +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_nonmr TABLESAMPLE (0.25 PERCENT) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +0 val_0 +10 val_10 +100 val_100 +103 val_103 +PREHOOK: query: -- non deterministic func +explain +select key, value, BLOCK__OFFSET__INSIDE__FILE from hbase_nonmr where rand() > 1 +PREHOOK: type: QUERY +POSTHOOK: query: -- non deterministic func +explain +select key, value, BLOCK__OFFSET__INSIDE__FILE from hbase_nonmr where rand() > 1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (rand() > 1) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string), BLOCK__OFFSET__INSIDE__FILE (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select key, value, BLOCK__OFFSET__INSIDE__FILE from hbase_nonmr where rand() > 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +POSTHOOK: query: select key, value, BLOCK__OFFSET__INSIDE__FILE from hbase_nonmr where rand() > 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_nonmr +#### A masked pattern was here #### +PREHOOK: query: -- negative, groupby +explain select key, count(value) from hbase_nonmr group by key +PREHOOK: type: QUERY +POSTHOOK: query: -- negative, groupby +explain select key, count(value) from hbase_nonmr group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + aggregations: count(value) + keys: key (type: int) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: _col1 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: -- negative, distinct +explain select distinct key, value from hbase_nonmr +PREHOOK: type: QUERY +POSTHOOK: query: -- negative, distinct +explain select distinct key, value from hbase_nonmr +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + keys: key (type: int), value (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: -- negative, CTAS +explain create table hbase_nonmrx as select distinct key, value from hbase_nonmr +PREHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: query: -- negative, CTAS +explain create table hbase_nonmrx as select distinct key, value from hbase_nonmr +POSTHOOK: type: CREATETABLE_AS_SELECT +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-3 depends on stages: Stage-0 + Stage-2 depends on stages: Stage-3 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Group By Operator + keys: key (type: int), value (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: string) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.hbase_nonmrx + + Stage: Stage-0 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-3 + Create Table Operator: + Create Table + columns: key int, value string + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat + serde name: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.hbase_nonmrx + + Stage: Stage-2 + Stats-Aggr Operator + +PREHOOK: query: -- negative, analyze +explain analyze table hbase_nonmr compute statistics +PREHOOK: type: QUERY +POSTHOOK: query: -- negative, analyze +explain analyze table hbase_nonmr compute statistics +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + Stage-1 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-0 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: COMPLETE + + Stage: Stage-1 + Stats-Aggr Operator + +PREHOOK: query: -- negative, join +explain select * from hbase_nonmr join hbase_nonmr hbase_nonmr2 on hbase_nonmr.key=hbase_nonmr2.key +PREHOOK: type: QUERY +POSTHOOK: query: -- negative, join +explain select * from hbase_nonmr join hbase_nonmr hbase_nonmr2 on hbase_nonmr.key=hbase_nonmr2.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_nonmr2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: key (type: int) + sort order: + + Map-reduce partition columns: key (type: int) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: value (type: string) + TableScan + alias: hbase_nonmr + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Reduce Output Operator + key expressions: key (type: int) + sort order: + + Map-reduce partition columns: key (type: int) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + value expressions: value (type: string) + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {KEY.reducesinkkey0} {VALUE._col0} + 1 {KEY.reducesinkkey0} {VALUE._col0} + outputColumnNames: _col0, _col1, _col5, _col6 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index b94f1e4..9fb92f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -445,7 +445,7 @@ static void setFetchOperatorContext(JobConf conf, inputSplits[i] = new FetchInputFormatSplit(splits[i], formatter.getName()); } if (work.getSplitSample() != null) { - inputSplits = splitSampling(work.getSplitSample(), inputSplits); + inputSplits = splitSampling(work.getSplitSample(), inputSplits, splits); } this.inputSplits = inputSplits; @@ -522,26 +522,31 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException } private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, - FetchInputFormatSplit[] splits) { + FetchInputFormatSplit[] splits, InputSplit[] original) { + if (inputFormat instanceof LazySplitInfoProvider) { + ((LazySplitInfoProvider)inputFormat).setSplitInfo(job, original); + } long totalSize = 0; - for (FetchInputFormatSplit split: splits) { - totalSize += split.getLength(); + for (FetchInputFormatSplit split : splits) { + totalSize += split.getLength(); } - List result = new ArrayList(); long targetSize = splitSample.getTargetSize(totalSize); + if (targetSize == 0) { + return splits; + } + List result = new ArrayList(); int startIndex = splitSample.getSeedNum() % splits.length; - long size = 0; + + long consumed = 0; for (int i = 0; i < splits.length; i++) { FetchInputFormatSplit split = splits[(startIndex + i) % splits.length]; result.add(split); - long splitgLength = split.getLength(); - if (size + splitgLength >= targetSize) { - if (size + splitgLength > targetSize) { - split.shrinkedLength = targetSize - size; - } + long splitLength = split.getLength(); + if (consumed + splitLength >= targetSize) { + split.shrinkedLength = targetSize - consumed; break; } - size += splitgLength; + consumed += splitLength; } return result.toArray(new FetchInputFormatSplit[result.size()]); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/LazySplitInfoProvider.java ql/src/java/org/apache/hadoop/hive/ql/exec/LazySplitInfoProvider.java new file mode 100644 index 0000000..956ce0a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LazySplitInfoProvider.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +/** + * Provides split infos (length, etc.) in a lazy manner + */ +public interface LazySplitInfoProvider { + + void setSplitInfo(JobConf job, InputSplit[] splits); +}