diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index fa7a8f0..8c4bca0 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -53,7 +53,7 @@ private int currentBucketId = 0; private final Path partitionPath; - final AcidOutputFormat outf; + final AcidOutputFormat outf; protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) throws ConnectionError, StreamingException { @@ -70,7 +70,7 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) + endPoint); } String outFormatName = this.tbl.getSd().getOutputFormat(); - outf = (AcidOutputFormat) ReflectionUtils.newInstance(Class.forName(outFormatName), conf); + outf = (AcidOutputFormat) ReflectionUtils.newInstance(Class.forName(outFormatName), conf); } catch (MetaException e) { throw new ConnectionError(endPoint, e); } catch (NoSuchObjectException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0da886b..89fff81 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -105,7 +105,7 @@ private transient JobConf job; private transient WritableComparable key; private transient Writable value; - private transient Writable[] vcValues; + private transient Object[] vcValues; private transient Deserializer serde; private transient Deserializer tblSerde; private transient Converter partTblObjectInspectorConverter; @@ -141,12 +141,11 @@ private void initialize() { List names = new ArrayList(vcCols.size()); List inspectors = new ArrayList(vcCols.size()); for (VirtualColumn vc : vcCols) { - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.getObjectInspector()); names.add(vc.getName()); } vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); - vcValues = new Writable[vcCols.size()]; + vcValues = new Object[vcCols.size()]; } isPartitioned = work.isPartitioned(); tblDataDone = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5de58e..b1f8358 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -140,7 +141,7 @@ public int hashCode() { String tableName; String partName; List vcs; - Writable[] vcValues; + Object[] vcValues; private boolean isPartitioned() { return partObjectInspector != null; @@ -165,7 +166,7 @@ public StructObjectInspector getRowObjectInspector() { * op. * * @param hconf - * @param mrwork + * @param mapWork * @throws HiveException */ public void initializeAsRoot(Configuration hconf, MapWork mapWork) @@ -250,13 +251,13 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, // The op may not be a TableScan for mapjoins // Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key; - // In that case, it will be a Select, but the rowOI need not be ammended + // In that case, it will be a Select, but the rowOI need not be amended if (ctx.op instanceof TableScanOperator) { TableScanOperator tsOp = (TableScanOperator) ctx.op; TableScanDesc tsDesc = tsOp.getConf(); if (tsDesc != null && tsDesc.hasVirtualCols()) { opCtx.vcs = tsDesc.getVirtualCols(); - opCtx.vcValues = new Writable[opCtx.vcs.size()]; + opCtx.vcValues = new Object[opCtx.vcs.size()]; opCtx.vcsObjectInspector = VirtualColumn.getVCSObjectInspector(opCtx.vcs); if (opCtx.isPartitioned()) { opCtx.rowWithPartAndVC = Arrays.copyOfRange(opCtx.rowWithPart, 0, 3); @@ -550,13 +551,13 @@ public void process(Writable value) throws HiveException { } } - public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx, - List vcs, Writable[] vcValues, Deserializer deserializer) { + public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, + List vcs, Object[] vcValues, Deserializer deserializer) { if (vcs == null) { return vcValues; } if (vcValues == null) { - vcValues = new Writable[vcs.size()]; + vcValues = new Object[vcs.size()]; } for (int i = 0; i < vcs.size(); i++) { VirtualColumn vc = vcs.get(i); @@ -602,6 +603,19 @@ public void process(Writable value) throws HiveException { old.set(current); } } + else if(vc.equals(VirtualColumn.ROWID)) { + if(ctx.getIoCxt().ri == null) { + vcValues[i] = null; + } + else { + if(vcValues[i] == null) { + vcValues[i] = new Object[RecordIdentifier.Field.values().length]; + } + RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().ri, (Object[])vcValues[i]); + ctx.getIoCxt().ri = null;//so we don't accidentally cache the value; shouldn't + //happen since IO layer either knows how to produce ROW__ID or not - but to be safe + } + } } return vcValues; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 4e0fd79..7fb4c46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -155,7 +155,7 @@ public void configure(JobConf job) { } } } - + @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { if (oc == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 71a9dd4..2f63524 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; @@ -86,11 +86,20 @@ *

* To support transitions between non-ACID layouts to ACID layouts, the input * formats are expected to support both layouts and detect the correct one. - * - * @param The row type + *

+ * A note on the KEY of this InputFormat. + * For row-at-a-time processing, KEY can conveniently pass RowId into the operator + * pipeline. For vectorized execution the KEY could perhaps represent a range in the batch. + * Since {@link org.apache.hadoop.hive.ql.io.orc.OrcInputFormat} is declared to return + * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidRecordReader} is defined + * to provide access to the RowId. Other implementations of AcidInputFormat can use either + * mechanism. + *

+ * + * @param The row type */ -public interface AcidInputFormat - extends InputFormat, InputFormatChecker { +public interface AcidInputFormat + extends InputFormat, InputFormatChecker { /** * Options for controlling the record readers. @@ -140,7 +149,7 @@ public Reporter getReporter() { * @return a record reader * @throws IOException */ - public RowReader getReader(InputSplit split, + public RowReader getReader(InputSplit split, Options options) throws IOException; public static interface RawReader @@ -162,11 +171,18 @@ public Reporter getReporter() { * @return a record reader * @throws IOException */ - RawReader getRawReader(Configuration conf, + RawReader getRawReader(Configuration conf, boolean collapseEvents, int bucket, ValidTxnList validTxnList, Path baseDirectory, Path[] deltaDirectory ) throws IOException; + + /** + * RecordReader returned by AcidInputFormat working in row-at-a-time mode should AcidRecordReader. + */ + public interface AcidRecordReader extends RecordReader { + RecordIdentifier getRecordIdentifier(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index 6b330e1..88e7106 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; @@ -34,7 +34,7 @@ * An extension for OutputFormats that want to implement ACID transactions. * @param the row type of the file */ -public interface AcidOutputFormat extends HiveOutputFormat { +public interface AcidOutputFormat extends HiveOutputFormat { /** * Options to control how the files are written diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index f874d86..c5f6c1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -20,17 +20,13 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.FooterBuffer; @@ -42,16 +38,13 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; /** This class prepares an IOContext, and provides the ability to perform a binary search on the * data. The binary search can be used by setting the value of inputFormatSorted in the @@ -119,7 +112,18 @@ public boolean next(K key, V value) throws IOException { } updateIOContext(); try { - return doNext(key, value); + boolean retVal = doNext(key, value); + if(retVal) { + if(key instanceof RecordIdentifier) { + //supports AcidInputFormat which uses the KEY pass ROW__ID info + ioCxtRef.ri = (RecordIdentifier)key; + } + else if(recordReader instanceof AcidInputFormat.AcidRecordReader) { + //supports AcidInputFormat which do not use the KEY pass ROW__ID info + ioCxtRef.ri = ((AcidInputFormat.AcidRecordReader) recordReader).getRecordIdentifier(); + } + } + return retVal; } catch (IOException e) { ioCxtRef.setIOExceptions(true); throw e; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 8cbf32f..914dd3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -69,6 +69,10 @@ public static void clear() { Comparison comparison = null; // The class name of the generic UDF being used by the filter String genericUDFClassName = null; + /** + * supports {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID} + */ + public RecordIdentifier ri; public static enum Comparison { GREATER, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 38a0d6b..cdde3dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -19,16 +19,81 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; /** - * Gives the Record identifer information for the current record. + * Gives the Record identifier information for the current record. */ public class RecordIdentifier implements WritableComparable { + /** + * This is in support of {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID} + * Contains metadata about each field in RecordIdentifier that needs to be part of ROWID + * which is represented as a struct {@link org.apache.hadoop.hive.ql.io.RecordIdentifier.StructInfo}. + * Each field of RecordIdentifier which should be part of ROWID should be in this enum... which + * really means that it should be part of VirtualColumn (so make a subclass for rowid). + */ + public static enum Field { + //note the enum names match field names in the struct + transactionId(TypeInfoFactory.longTypeInfo, + PrimitiveObjectInspectorFactory.javaLongObjectInspector), + bucketId(TypeInfoFactory.intTypeInfo, PrimitiveObjectInspectorFactory.javaIntObjectInspector), + rowId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector); + public final TypeInfo fieldType; + public final ObjectInspector fieldOI; + Field(TypeInfo fieldType, ObjectInspector fieldOI) { + this.fieldType = fieldType; + this.fieldOI = fieldOI; + } + } + /** + * RecordIdentifier is passed along the operator tree as a struct. This class contains a few + * utilities for that. + */ + public static final class StructInfo { + private static final List fieldNames = new ArrayList(Field.values().length); + private static final List fieldTypes = new ArrayList(fieldNames.size()); + private static final List fieldOis = + new ArrayList(fieldNames.size()); + static { + for(Field f : Field.values()) { + fieldNames.add(f.name()); + fieldTypes.add(f.fieldType); + fieldOis.add(f.fieldOI); + } + } + public static final TypeInfo typeInfo = + TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes); + public static final ObjectInspector oi = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOis); + + /** + * Copies relevant fields from {@code ri} to {@code struct} + * @param ri + * @param struct must be of size Field.values().size() + */ + public static void toArray(RecordIdentifier ri, Object[] struct) { + assert struct != null && struct.length == Field.values().length; + if(ri == null) { + Arrays.fill(struct, null); + return; + } + struct[Field.transactionId.ordinal()] = ri.getTransactionId(); + struct[Field.bucketId.ordinal()] = ri.getBucketId(); + struct[Field.rowId.ordinal()] = ri.getRowId(); + } + } + private long transactionId; private int bucketId; private long rowId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 7edb3c2..2fcc207 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -98,7 +98,7 @@ */ public class OrcInputFormat implements InputFormat, InputFormatChecker, VectorizedInputFormatInterface, - AcidInputFormat { + AcidInputFormat { private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); @@ -989,7 +989,7 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, boolean vectorMode = Utilities.isVectorMode(conf); // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, - // we know it is not ACID. + // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this) if (inputSplit.getClass() == FileSplit.class) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); @@ -998,62 +998,75 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, ((FileSplit) inputSplit).getPath(), OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit); } - + OrcSplit split = (OrcSplit) inputSplit; reporter.setStatus(inputSplit.toString()); - // if we are strictly old-school, just use the old code + Options options = new Options(conf).reporter(reporter); + final RowReader inner = getReader(inputSplit, options); + + + /*Even though there are no delta files, we still need to produce row ids so that an + * UPDATE or DELETE statement would work on a table which didn't have any previous updates*/ if (split.isOriginal() && split.getDeltas().isEmpty()) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); } else { - return new OrcRecordReader(OrcFile.createReader(split.getPath(), - OrcFile.readerOptions(conf)), conf, split); + return new NullKeyRecordReader(inner, conf); } } - Options options = new Options(conf).reporter(reporter); - final RowReader inner = getReader(inputSplit, options); if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); } - final RecordIdentifier id = inner.createKey(); + return new NullKeyRecordReader(inner, conf); + } + /** + * Return a RecordReader that is compatible with the Hive 0.12 reader + * with NullWritable for the key instead of RecordIdentifier. + */ + public static final class NullKeyRecordReader implements AcidRecordReader { + private final RecordIdentifier id; + private final RowReader inner; - // Return a RecordReader that is compatible with the Hive 0.12 reader - // with NullWritable for the key instead of RecordIdentifier. - return new org.apache.hadoop.mapred.RecordReader(){ - @Override - public boolean next(NullWritable nullWritable, - OrcStruct orcStruct) throws IOException { - return inner.next(id, orcStruct); - } + public RecordIdentifier getRecordIdentifier() { + return id; + } + private NullKeyRecordReader(RowReader inner, Configuration conf) { + this.inner = inner; + id = inner.createKey(); + } + @Override + public boolean next(NullWritable nullWritable, + OrcStruct orcStruct) throws IOException { + return inner.next(id, orcStruct); + } - @Override - public NullWritable createKey() { - return NullWritable.get(); - } + @Override + public NullWritable createKey() { + return NullWritable.get(); + } - @Override - public OrcStruct createValue() { - return inner.createValue(); - } + @Override + public OrcStruct createValue() { + return inner.createValue(); + } - @Override - public long getPos() throws IOException { - return inner.getPos(); - } + @Override + public long getPos() throws IOException { + return inner.getPos(); + } - @Override - public void close() throws IOException { - inner.close(); - } + @Override + public void close() throws IOException { + inner.close(); + } - @Override - public float getProgress() throws IOException { - return inner.getProgress(); - } - }; + @Override + public float getProgress() throws IOException { + return inner.getProgress(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 00e0807..2749c7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -50,7 +50,7 @@ * A Hive OutputFormat for ORC files. */ public class OrcOutputFormat extends FileOutputFormat - implements AcidOutputFormat { + implements AcidOutputFormat { private static class OrcRecordWriter implements RecordWriter, diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java index 0637d46..29c8330 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java @@ -25,11 +25,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; public class VirtualColumn implements Serializable { @@ -41,6 +43,10 @@ public static VirtualColumn ROWOFFSET = new VirtualColumn("ROW__OFFSET__INSIDE__BLOCK", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); public static VirtualColumn RAWDATASIZE = new VirtualColumn("RAW__DATA__SIZE", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); + /** + * {@link org.apache.hadoop.hive.ql.io.RecordIdentifier} + */ + public static VirtualColumn ROWID = new VirtualColumn("ROW__ID", RecordIdentifier.StructInfo.typeInfo, true, RecordIdentifier.StructInfo.oi); /** * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. @@ -53,23 +59,26 @@ new VirtualColumn("GROUPING__ID", (PrimitiveTypeInfo) TypeInfoFactory.intTypeInfo); public static VirtualColumn[] VIRTUAL_COLUMNS = - new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID}; + new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID, ROWID}; private String name; - private PrimitiveTypeInfo typeInfo; + private TypeInfo typeInfo; private boolean isHidden = true; + private ObjectInspector oi; public VirtualColumn() { } public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { - this(name, typeInfo, true); + this(name, typeInfo, true, + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo)); } - VirtualColumn(String name, PrimitiveTypeInfo typeInfo, boolean isHidden) { + VirtualColumn(String name, TypeInfo typeInfo, boolean isHidden, ObjectInspector oi) { this.name = name; this.typeInfo = typeInfo; this.isHidden = isHidden; + this.oi = oi; } public static List getStatsRegistry(Configuration conf) { @@ -87,11 +96,12 @@ public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEROWOFFSET)) { l.add(ROWOFFSET); } + l.add(ROWID); return l; } - public PrimitiveTypeInfo getTypeInfo() { + public TypeInfo getTypeInfo() { return typeInfo; } @@ -118,6 +128,9 @@ public boolean getIsHidden() { public void setIsHidden(boolean isHidden) { this.isHidden = isHidden; } + public ObjectInspector getObjectInspector() { + return oi; + } @Override public boolean equals(Object o) { @@ -144,8 +157,7 @@ public static StructObjectInspector getVCSObjectInspector(List vc List inspectors = new ArrayList(vcs.size()); for (VirtualColumn vc : vcs) { names.add(vc.getName()); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.oi); } return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 01c5500..1a83c64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobClient; @@ -485,20 +486,21 @@ public float getProgress() throws IOException { } static class CompactorMap - implements Mapper { + implements Mapper { JobConf jobConf; RecordWriter writer; @Override - public void map(NullWritable key, CompactorInputSplit split, + public void map(WritableComparable key, CompactorInputSplit split, OutputCollector nullWritableVOutputCollector, Reporter reporter) throws IOException { // This will only get called once, since CompactRecordReader only returns one record, // the input split. // Based on the split we're passed we go instantiate the real reader and then iterate on it // until it finishes. - AcidInputFormat aif = + @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class + AcidInputFormat aif = instantiate(AcidInputFormat.class, jobConf.get(INPUT_FORMAT_CLASS_NAME)); ValidTxnList txnList = new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY)); @@ -541,7 +543,8 @@ private void getWriter(Reporter reporter, ObjectInspector inspector, .bucket(bucket); // Instantiate the underlying output format - AcidOutputFormat aof = + @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class + AcidOutputFormat aof = instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 7f5134e..ec1379d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -36,9 +36,9 @@ import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; import org.apache.thrift.TException; @@ -276,7 +276,7 @@ private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxT } } - static class MockInputFormat implements AcidInputFormat { + static class MockInputFormat implements AcidInputFormat { @Override public AcidInputFormat.RowReader getReader(InputSplit split, @@ -315,7 +315,7 @@ private void addFile(HiveConf conf, Table t, Partition p, long minTxn, long maxT } @Override - public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, + public RecordReader getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { return null; } @@ -398,7 +398,7 @@ public float getProgress() throws IOException { // This class isn't used and I suspect does totally the wrong thing. It's only here so that I // can provide some output format to the tables and partitions I create. I actually write to // those tables directory. - static class MockOutputFormat implements AcidOutputFormat { + static class MockOutputFormat implements AcidOutputFormat { @Override public RecordUpdater getRecordUpdater(Path path, Options options) throws @@ -420,7 +420,7 @@ public RecordUpdater getRecordUpdater(Path path, Options options) throws } @Override - public RecordWriter getRecordWriter(FileSystem fileSystem, JobConf entries, + public RecordWriter getRecordWriter(FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws IOException {