diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java index 434a5b8ae5..bd3a13f074 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java @@ -24,7 +24,6 @@ import java.util.Arrays; -import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; @@ -69,6 +70,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.RecordReader; /** @@ -78,10 +80,11 @@ * so that the data produced after wrapping a vectorized reader would conform to the original OIs. */ public abstract class BatchToRowReader - implements RecordReader { + implements RecordReader { protected static final Logger LOG = LoggerFactory.getLogger(BatchToRowReader.class); - private final NullWritable key; + private final RecordIdentifier recordIdentifier; + private final NullWritable nullWritable; private final VectorizedRowBatch batch; private final RecordReader vrbReader; @@ -89,10 +92,13 @@ private final boolean[] included; private int rowInBatch = 0; + private int rowIdIdx = -1; + public BatchToRowReader(RecordReader vrbReader, VectorizedRowBatchCtx vrbCtx, List includedCols) { this.vrbReader = vrbReader; - this.key = vrbReader.createKey(); + this.recordIdentifier = new RecordIdentifier(); + this.nullWritable = vrbReader.createKey(); this.batch = vrbReader.createValue(); this.schema = Lists.newArrayList(vrbCtx.getRowColumnTypeInfos()); // TODO: does this include partition columns? @@ -104,6 +110,14 @@ public BatchToRowReader(RecordReader vrbReader } else { Arrays.fill(included, true); } + // Add virtual columns + int vcsIdx = vrbCtx.getDataColumnCount() + vrbCtx.getPartitionColumnCount(); + for (int vcIdx = vcsIdx; vcIdx < vcsIdx + vrbCtx.getNeededVirtualColumns().length; vcIdx++) { + included[vcIdx] = true; + if (vrbCtx.getNeededVirtualColumns()[vcIdx - vcsIdx].equals(VirtualColumn.ROWID)) { + rowIdIdx = vcIdx; + } + } if (LOG.isDebugEnabled()) { LOG.debug("Including the columns " + DebugUtils.toString(included)); } @@ -116,10 +130,11 @@ public BatchToRowReader(RecordReader vrbReader protected abstract UnionType createUnionObject(List childrenTypes, Object previous); protected abstract void setUnion(UnionType unionObj, byte tag, Object object); protected abstract Object getUnionField(UnionType unionObj); + protected abstract void populateRecordIdentifier(StructType structObj, int i, RecordIdentifier ri); @Override - public NullWritable createKey() { - return key; + public WritableComparable createKey() { + return rowIdIdx < 0 ? nullWritable : recordIdentifier; } @Override @@ -138,7 +153,7 @@ public float getProgress() throws IOException { } @Override - public boolean next(NullWritable key, Object previous) throws IOException { + public boolean next(WritableComparable key, Object previous) throws IOException { if (!ensureBatch()) { return false; } @@ -149,6 +164,10 @@ public boolean next(NullWritable key, Object previous) throws IOException { try { setStructCol(value, i, nextValue(batch.cols[i], rowInBatch, schema.get(i), getStructCol(value, i))); + if (i == rowIdIdx) { + // Populate key + populateRecordIdentifier(value, i, (RecordIdentifier) key); + } } catch (Throwable t) { LOG.error("Error at row " + rowInBatch + "/" + batch.size + ", column " + i + "/" + schema.size() + " " + batch.cols[i], t); @@ -166,7 +185,7 @@ public boolean next(NullWritable key, Object previous) throws IOException { private boolean ensureBatch() throws IOException { if (rowInBatch >= batch.size) { rowInBatch = 0; - return vrbReader.next(key, batch) && batch.size > 0; + return vrbReader.next(nullWritable, batch) && batch.size > 0; } return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java index c9ff592d4e..85f1ed1375 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java @@ -19,15 +19,20 @@ import java.util.List; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordIdentifier.Field; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.ql.io.BatchToRowReader; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.RecordReader; /** BatchToRowReader that returns the rows readable by ORC IOs. */ public class OrcOiBatchToRowReader extends BatchToRowReader { + public OrcOiBatchToRowReader(RecordReader vrbReader, VectorizedRowBatchCtx vrbCtx, List includedCols) { super(vrbReader, vrbCtx, includedCols); @@ -68,4 +73,17 @@ protected Object getUnionField(OrcUnion unionObj) { protected void setUnion(OrcUnion unionObj, byte tag, Object object) { unionObj.set(tag, object); } + + @Override + protected void populateRecordIdentifier(OrcStruct structObj, int i, RecordIdentifier ri) { + OrcStruct rowId = (OrcStruct) structObj.getFieldValue(i); + if(rowId == null) { + ri.setValues(0, 0, 0); + return; + } + ri.setValues(((LongWritable) rowId.getFieldValue(Field.writeId.ordinal())).get(), + ((IntWritable) rowId.getFieldValue(Field.bucketId.ordinal())).get(), + ((LongWritable) rowId.getFieldValue(Field.rowId.ordinal())).get()); + } + } diff --git a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q index 4dfb259005..95a0384cca 100644 --- a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q +++ b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q @@ -41,5 +41,16 @@ explain select cint, cstring1 from orc_llap_nonvector limit 1025; select cint, cstring1 from orc_llap_nonvector limit 1025; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +create table orc_llap_nonvector_2 stored as orc tblproperties('transactional'='true') as +select *, rand(1234) rdm from alltypesorc order by rdm; + +explain +select ROW__ID from orc_llap_nonvector_2 limit 10; +select ROW__ID from orc_llap_nonvector_2 limit 10; + DROP TABLE orc_create_staging_n3; DROP TABLE orc_llap_nonvector; +DROP TABLE orc_llap_nonvector_2; diff --git a/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out b/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out index 364da93bb7..c108be03fa 100644 --- a/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out +++ b/ql/src/test/results/clientpositive/llap/orc_llap_nonvector.q.out @@ -1292,6 +1292,92 @@ NULL NULL 1053814436 By4JbbLm4g1Kyq67Er 528534767 cvLH6Eat2yFsyy7p NULL NULL +PREHOOK: query: create table orc_llap_nonvector_2 stored as orc tblproperties('transactional'='true') as +select *, rand(1234) rdm from alltypesorc order by rdm +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap_nonvector_2 +POSTHOOK: query: create table orc_llap_nonvector_2 stored as orc tblproperties('transactional'='true') as +select *, rand(1234) rdm from alltypesorc order by rdm +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap_nonvector_2 +POSTHOOK: Lineage: orc_llap_nonvector_2.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ] +POSTHOOK: Lineage: orc_llap_nonvector_2.rdm EXPRESSION [] +PREHOOK: query: explain +select ROW__ID from orc_llap_nonvector_2 limit 10 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select ROW__ID from orc_llap_nonvector_2 limit 10 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap_nonvector_2 + Statistics: Num rows: 12288 Data size: 4468050 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ROW__ID (type: struct) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 933888 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 10 + Statistics: Num rows: 10 Data size: 760 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 10 Data size: 760 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: may be used (ACID table) + + Stage: Stage-0 + Fetch Operator + limit: 10 + Processor Tree: + ListSink + +PREHOOK: query: select ROW__ID from orc_llap_nonvector_2 limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_nonvector_2 +#### A masked pattern was here #### +POSTHOOK: query: select ROW__ID from orc_llap_nonvector_2 limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_nonvector_2 +#### A masked pattern was here #### +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":0} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":1} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":2} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":3} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":4} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":5} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":6} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":7} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":8} +{"writeid":### Masked writeid ###,"bucketid":536870912,"rowid":9} PREHOOK: query: DROP TABLE orc_create_staging_n3 PREHOOK: type: DROPTABLE PREHOOK: Input: default@orc_create_staging_n3 @@ -1308,3 +1394,11 @@ POSTHOOK: query: DROP TABLE orc_llap_nonvector POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@orc_llap_nonvector POSTHOOK: Output: default@orc_llap_nonvector +PREHOOK: query: DROP TABLE orc_llap_nonvector_2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap_nonvector_2 +PREHOOK: Output: default@orc_llap_nonvector_2 +POSTHOOK: query: DROP TABLE orc_llap_nonvector_2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap_nonvector_2 +POSTHOOK: Output: default@orc_llap_nonvector_2