diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0da886b..b5345ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -105,7 +105,7 @@ private transient JobConf job; private transient WritableComparable key; private transient Writable value; - private transient Writable[] vcValues; + private transient Object[] vcValues; private transient Deserializer serde; private transient Deserializer tblSerde; private transient Converter partTblObjectInspectorConverter; @@ -141,8 +141,7 @@ private void initialize() { List names = new ArrayList(vcCols.size()); List inspectors = new ArrayList(vcCols.size()); for (VirtualColumn vc : vcCols) { - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.getObjectInspector()); names.add(vc.getName()); } vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5de58e..80c3f1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -140,7 +141,7 @@ public int hashCode() { String tableName; String partName; List vcs; - Writable[] vcValues; + Object[] vcValues; private boolean isPartitioned() { return partObjectInspector != null; @@ -165,7 +166,7 @@ public StructObjectInspector getRowObjectInspector() { * op. * * @param hconf - * @param mrwork + * @param mapWork * @throws HiveException */ public void initializeAsRoot(Configuration hconf, MapWork mapWork) @@ -250,13 +251,13 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, // The op may not be a TableScan for mapjoins // Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key; - // In that case, it will be a Select, but the rowOI need not be ammended + // In that case, it will be a Select, but the rowOI need not be amended if (ctx.op instanceof TableScanOperator) { TableScanOperator tsOp = (TableScanOperator) ctx.op; TableScanDesc tsDesc = tsOp.getConf(); if (tsDesc != null && tsDesc.hasVirtualCols()) { opCtx.vcs = tsDesc.getVirtualCols(); - opCtx.vcValues = new Writable[opCtx.vcs.size()]; + opCtx.vcValues = new Object[opCtx.vcs.size()]; opCtx.vcsObjectInspector = VirtualColumn.getVCSObjectInspector(opCtx.vcs); if (opCtx.isPartitioned()) { opCtx.rowWithPartAndVC = Arrays.copyOfRange(opCtx.rowWithPart, 0, 3); @@ -550,13 +551,13 @@ public void process(Writable value) throws HiveException { } } - public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx, - List vcs, Writable[] vcValues, Deserializer deserializer) { + public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, + List vcs, Object[] vcValues, Deserializer deserializer) { if (vcs == null) { return vcValues; } if (vcValues == null) { - vcValues = new Writable[vcs.size()]; + vcValues = new Object[vcs.size()]; } for (int i = 0; i < vcs.size(); i++) { VirtualColumn vc = vcs.get(i); @@ -602,6 +603,12 @@ public void process(Writable value) throws HiveException { old.set(current); } } + else if(vc.equals(VirtualColumn.ROWID)) { + if(vcValues[i] == null) { + vcValues[i] = new Object[RecordIdentifier.Field.values().length]; + } + RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().getCurrentRowId(), (Object[])vcValues[i]); + } } return vcValues; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 2f5f60c..f936482 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -155,7 +155,7 @@ public void configure(JobConf job) { } } } - + @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { if (oc == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 71a9dd4..8745ba1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; @@ -87,10 +87,14 @@ * To support transitions between non-ACID layouts to ACID layouts, the input * formats are expected to support both layouts and detect the correct one. * + * @param This really should be RecordIdentifier but OrcInputFormat already shipped; more + * specifically OrgInputFormat works in vectorized and regular mode, the later doesn't + * support Acid and so should return NullWritable key. We could make it return dummy + * RecordIdentifier but that seems misleading. * @param The row type */ -public interface AcidInputFormat - extends InputFormat, InputFormatChecker { +public interface AcidInputFormat + extends InputFormat, InputFormatChecker { /** * Options for controlling the record readers. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index f874d86..5940e14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -20,17 +20,13 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.FooterBuffer; @@ -42,16 +38,13 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; /** This class prepares an IOContext, and provides the ability to perform a binary search on the * data. The binary search can be used by setting the value of inputFormatSorted in the @@ -119,7 +112,12 @@ public boolean next(K key, V value) throws IOException { } updateIOContext(); try { - return doNext(key, value); + boolean retVal = doNext(key, value); + if(key instanceof RecordIdentifier) { + RecordIdentifier ri = (RecordIdentifier)key; + ioCxtRef.currentRowId.set(ri); + } + return retVal; } catch (IOException e) { ioCxtRef.setIOExceptions(true); throw e; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 8cbf32f..c65872a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -58,6 +58,7 @@ public static void clear() { long currentRow; boolean isBlockPointer; boolean ioExceptions; + final RecordIdentifier currentRowId = new RecordIdentifier(); // Are we using the fact the input is sorted boolean useSorted = false; @@ -111,6 +112,14 @@ public void setCurrentRow(long currentRow) { this.currentRow = currentRow; } + /** + * Tables using AcidInputFormat support a row id + * @return + */ + public RecordIdentifier getCurrentRowId() { + return currentRowId; + } + public boolean isBlockPointer() { return isBlockPointer; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 38a0d6b..d491a8c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -19,16 +19,76 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** - * Gives the Record identifer information for the current record. + * Gives the Record identifier information for the current record. */ public class RecordIdentifier implements WritableComparable { + /** + * This is in support of {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID} + * Contains metadata about each field in RecordIdentifier that needs to be part of ROWID + * which is represented as a struct {@link org.apache.hadoop.hive.ql.io.RecordIdentifier.StructInfo}. + * Each field of RecordIdentifier which should be part of ROWID should be in this enum... which + * really means that it should be part of VirtualColumn (so make a subclass for rowid). + */ + public static enum Field { + //note the enum names match field names in the struct + transactionId(TypeInfoFactory.longTypeInfo, + PrimitiveObjectInspectorFactory.javaLongObjectInspector), + bucketId(TypeInfoFactory.intTypeInfo, PrimitiveObjectInspectorFactory.javaIntObjectInspector), + rowId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector); + public final TypeInfo fieldType; + public final ObjectInspector fieldOI; + Field(TypeInfo fieldType, ObjectInspector fieldOI) { + this.fieldType = fieldType; + this.fieldOI = fieldOI; + } + } + /** + * RecordIdentifier is passed along the operator tree as a struct. This class contains a few + * utilities for that. + */ + public static final class StructInfo { + private static final List fieldNames = new ArrayList(Field.values().length); + private static final List fieldTypes = new ArrayList(fieldNames.size()); + private static final List fieldOis = + new ArrayList(fieldNames.size()); + static { + for(Field f : Field.values()) { + fieldNames.add(f.name()); + fieldTypes.add(f.fieldType); + fieldOis.add(f.fieldOI); + } + } + public static final TypeInfo typeInfo = + TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes); + public static final ObjectInspector oi = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOis); + + /** + * Copies relevant fields from {@code ri} to {@code struct} + * @param ri + * @param struct must be of size Field.values().size() + */ + public static void toArray(RecordIdentifier ri, Object[] struct) { + assert struct != null && struct.length == Field.values().length; + struct[Field.transactionId.ordinal()] = ri.getTransactionId(); + struct[Field.bucketId.ordinal()] = ri.getBucketId(); + struct[Field.rowId.ordinal()] = ri.getRowId(); + } + } + private long transactionId; private int bucketId; private long rowId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 7edb3c2..be398f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -62,6 +63,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -96,9 +99,9 @@ * that added this event. Insert and update events include the entire row, while * delete events have null for row. */ -public class OrcInputFormat implements InputFormat, +public class OrcInputFormat implements InputFormat, InputFormatChecker, VectorizedInputFormatInterface, - AcidInputFormat { + AcidInputFormat { private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); @@ -125,7 +128,7 @@ private static final double MIN_INCLUDED_LOCATION = 0.80; private static class OrcRecordReader - implements org.apache.hadoop.mapred.RecordReader, + implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader { private final RecordReader reader; private final long offset; @@ -148,7 +151,7 @@ } @Override - public boolean next(NullWritable key, OrcStruct value) throws IOException { + public boolean next(WritableComparable key, OrcStruct value) throws IOException { if (reader.hasNext()) { reader.next(value); progress = reader.getProgress(); @@ -973,9 +976,10 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, this.types = types; } } + @SuppressWarnings("unchecked") - private org.apache.hadoop.mapred.RecordReader + private org.apache.hadoop.mapred.RecordReader createVectorizedReader(InputSplit split, JobConf conf, Reporter reporter ) throws IOException { return (org.apache.hadoop.mapred.RecordReader) @@ -983,13 +987,13 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, } @Override - public org.apache.hadoop.mapred.RecordReader + public org.apache.hadoop.mapred.RecordReader getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { boolean vectorMode = Utilities.isVectorMode(conf); // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, - // we know it is not ACID. + // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this) if (inputSplit.getClass() == FileSplit.class) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); @@ -1002,61 +1006,64 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, OrcSplit split = (OrcSplit) inputSplit; reporter.setStatus(inputSplit.toString()); + Options options = new Options(conf).reporter(reporter); + final RowReader inner = getReader(inputSplit, options); // if we are strictly old-school, just use the old code if (split.isOriginal() && split.getDeltas().isEmpty()) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); } else { - return new OrcRecordReader(OrcFile.createReader(split.getPath(), - OrcFile.readerOptions(conf)), conf, split); + return new RIAwareRecordReader(inner); } } - Options options = new Options(conf).reporter(reporter); - final RowReader inner = getReader(inputSplit, options); if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); } - final RecordIdentifier id = inner.createKey(); - - // Return a RecordReader that is compatible with the Hive 0.12 reader - // with NullWritable for the key instead of RecordIdentifier. - return new org.apache.hadoop.mapred.RecordReader(){ - @Override - public boolean next(NullWritable nullWritable, - OrcStruct orcStruct) throws IOException { - return inner.next(id, orcStruct); - } + return new RIAwareRecordReader(inner); + } + //a RecordReader that is compatible with the Hive 0.12 reader + // with NullWritable for the key instead of RecordIdentifier. + public static class RIAwareRecordReader implements org.apache.hadoop.mapred.RecordReader { + private final RowReader inner; + public final RecordIdentifier id; + private RIAwareRecordReader(final RowReader inner) { + this.inner = inner; + id = inner.createKey(); + } + @Override + public boolean next(WritableComparable ri, + OrcStruct orcStruct) throws IOException { + return inner.next((RecordIdentifier)ri, orcStruct); + }//2)and here we want to load the RI - @Override - public NullWritable createKey() { - return NullWritable.get(); - } + @Override + public WritableComparable createKey() { + return inner.createKey(); + } - @Override - public OrcStruct createValue() { - return inner.createValue(); - } + @Override + public OrcStruct createValue() { + return inner.createValue();//1)so here we want to augment this struct with RecordIdentifier + } - @Override - public long getPos() throws IOException { - return inner.getPos(); - } + @Override + public long getPos() throws IOException { + return inner.getPos(); + } - @Override - public void close() throws IOException { - inner.close(); - } + @Override + public void close() throws IOException { + inner.close(); + } - @Override - public float getProgress() throws IOException { - return inner.getProgress(); - } - }; + @Override + public float getProgress() throws IOException { + return inner.getProgress(); + } } - @Override public RowReader getReader(InputSplit inputSplit, Options options) throws IOException { @@ -1111,7 +1118,7 @@ public boolean next(RecordIdentifier recordIdentifier, boolean result; // filter out the deleted records do { - result = records.next(recordIdentifier, innerRecord); + result = records.next(recordIdentifier, innerRecord);//1st 5 fields in innerRecord are op, orgTx, etc, last is an OrcStruct } while (result && OrcRecordUpdater.getOperation(innerRecord) == OrcRecordUpdater.DELETE_OPERATION); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java index ca90fc5..d99258e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.*; import java.io.IOException; @@ -41,7 +42,7 @@ * batch. */ class VectorizedOrcAcidRowReader - implements org.apache.hadoop.mapred.RecordReader { private final AcidInputFormat.RowReader innerReader; private final RecordIdentifier key; @@ -75,7 +76,7 @@ } @Override - public boolean next(NullWritable nullWritable, + public boolean next(RecordIdentifier ri, VectorizedRowBatch vectorizedRowBatch ) throws IOException { vectorizedRowBatch.reset(); @@ -104,12 +105,13 @@ public boolean next(NullWritable nullWritable, } catch (HiveException he) { throw new IOException("error iterating", he); } + ri.set(key); return true; } @Override - public NullWritable createKey() { - return NullWritable.get(); + public RecordIdentifier createKey() { + return new RecordIdentifier(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index d7f914a..c5a45fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.InputFormatChecker; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -44,11 +44,11 @@ /** * A MapReduce/Hive input format for ORC files. */ -public class VectorizedOrcInputFormat extends FileInputFormat +public class VectorizedOrcInputFormat extends FileInputFormat implements InputFormatChecker, VectorizedInputFormatInterface { static class VectorizedOrcRecordReader - implements RecordReader { + implements RecordReader { private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; private final long offset; private final long length; @@ -76,7 +76,7 @@ } @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + public boolean next(WritableComparable key, VectorizedRowBatch value) throws IOException { if (!reader.hasNext()) { return false; @@ -100,7 +100,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } @Override - public NullWritable createKey() { + public WritableComparable createKey() { return NullWritable.get(); } @@ -135,7 +135,7 @@ public VectorizedOrcInputFormat() { } @Override - public RecordReader + public RecordReader getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { FileSplit fSplit = (FileSplit)inputSplit; diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java index 0637d46..29c8330 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java @@ -25,11 +25,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; public class VirtualColumn implements Serializable { @@ -41,6 +43,10 @@ public static VirtualColumn ROWOFFSET = new VirtualColumn("ROW__OFFSET__INSIDE__BLOCK", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); public static VirtualColumn RAWDATASIZE = new VirtualColumn("RAW__DATA__SIZE", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); + /** + * {@link org.apache.hadoop.hive.ql.io.RecordIdentifier} + */ + public static VirtualColumn ROWID = new VirtualColumn("ROW__ID", RecordIdentifier.StructInfo.typeInfo, true, RecordIdentifier.StructInfo.oi); /** * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. @@ -53,23 +59,26 @@ new VirtualColumn("GROUPING__ID", (PrimitiveTypeInfo) TypeInfoFactory.intTypeInfo); public static VirtualColumn[] VIRTUAL_COLUMNS = - new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID}; + new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID, ROWID}; private String name; - private PrimitiveTypeInfo typeInfo; + private TypeInfo typeInfo; private boolean isHidden = true; + private ObjectInspector oi; public VirtualColumn() { } public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { - this(name, typeInfo, true); + this(name, typeInfo, true, + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo)); } - VirtualColumn(String name, PrimitiveTypeInfo typeInfo, boolean isHidden) { + VirtualColumn(String name, TypeInfo typeInfo, boolean isHidden, ObjectInspector oi) { this.name = name; this.typeInfo = typeInfo; this.isHidden = isHidden; + this.oi = oi; } public static List getStatsRegistry(Configuration conf) { @@ -87,11 +96,12 @@ public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEROWOFFSET)) { l.add(ROWOFFSET); } + l.add(ROWID); return l; } - public PrimitiveTypeInfo getTypeInfo() { + public TypeInfo getTypeInfo() { return typeInfo; } @@ -118,6 +128,9 @@ public boolean getIsHidden() { public void setIsHidden(boolean isHidden) { this.isHidden = isHidden; } + public ObjectInspector getObjectInspector() { + return oi; + } @Override public boolean equals(Object o) { @@ -144,8 +157,7 @@ public static StructObjectInspector getVCSObjectInspector(List vc List inspectors = new ArrayList(vcs.size()); for (VirtualColumn vc : vcs) { names.add(vc.getName()); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.oi); } return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 01c5500..5ede81b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; @@ -485,13 +486,13 @@ public float getProgress() throws IOException { } static class CompactorMap - implements Mapper { + implements Mapper { JobConf jobConf; RecordWriter writer; @Override - public void map(NullWritable key, CompactorInputSplit split, + public void map(WritableComparable key, CompactorInputSplit split, OutputCollector nullWritableVOutputCollector, Reporter reporter) throws IOException { // This will only get called once, since CompactRecordReader only returns one record, diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 7f5134e..98519bf 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -276,7 +276,7 @@ private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxT } } - static class MockInputFormat implements AcidInputFormat { + static class MockInputFormat implements AcidInputFormat { @Override public AcidInputFormat.RowReader getReader(InputSplit split, @@ -315,7 +315,7 @@ private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxT } @Override - public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, + public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { return null; }