diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index ffb6074..0fe33e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Operator; @@ -939,6 +940,12 @@ public CommandProcessorResponse run() public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { + try { + TxnDbUtil.prepDb(); + } + catch(Exception e) { + LOG.error(e); + } CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); if(cpr.getResponseCode() == 0) { return cpr; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0da886b..b5345ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -105,7 +105,7 @@ private transient JobConf job; private transient WritableComparable key; private transient Writable value; - private transient Writable[] vcValues; + private transient Object[] vcValues; private transient Deserializer serde; private transient Deserializer tblSerde; private transient Converter partTblObjectInspectorConverter; @@ -141,8 +141,7 @@ private void initialize() { List names = new ArrayList(vcCols.size()); List inspectors = new ArrayList(vcCols.size()); for (VirtualColumn vc : vcCols) { - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.getObjectInspector()); names.add(vc.getName()); } vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 1dde78e..41c87ec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -117,6 +118,7 @@ Path[] outPaths; Path[] finalPaths; RecordWriter[] outWriters; + RecordUpdater[] updaters; Stat stat; public FSPaths() { @@ -128,6 +130,7 @@ public FSPaths(Path specPath) { outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; + updaters = new RecordUpdater[numFiles]; stat = new Stat(); } @@ -168,6 +171,13 @@ public void closeWriters(boolean abort) throws HiveException { } } } + try { + for (int i = 0; i < updaters.length; i++) { + if (updaters[i] != null) updaters[i].close(false); + } + } catch (IOException e) { + throw new HiveException(e); + } } private void commit(FileSystem fs) throws HiveException { @@ -420,6 +430,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { assert totalFiles == 1; } + int bucketNum = 0; if (multiFileSpray) { key.setHashCode(idx); @@ -436,7 +447,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { } } - int bucketNum = prtner.getBucket(key, null, totalFiles); + bucketNum = prtner.getBucket(key, null, totalFiles); if (seenBuckets.contains(bucketNum)) { continue; } @@ -445,7 +456,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { bucketMap.put(bucketNum, filesIdx); taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); } - createBucketForFileIdx(fsp, filesIdx); + createBucketForFileIdx(fsp, filesIdx, bucketNum); filesIdx++; } assert filesIdx == numFiles; @@ -462,7 +473,8 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { filesCreated = true; } - protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { + protected void createBucketForFileIdx(FSPaths fsp, int filesIdx, int bucketNum) + throws HiveException { try { if (isNativeTable) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null); @@ -493,11 +505,19 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveExce Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition - fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), - outputClass, conf, fsp.outPaths[filesIdx], reporter); - // If the record writer provides stats, get it from there instead of the serde - statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter; - // increment the CREATED_FILES counter + if (conf.getWriteType() == FileSinkDesc.WriteType.NON_ACID) { + fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), + outputClass, conf, fsp.outPaths[filesIdx], reporter); + // If the record writer provides stats, get it from there instead of the serde + statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof + StatsProvidingRecordWriter; + // increment the CREATED_FILES counter + } else { + ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector; + fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), + bucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter); + statsFromRecordWriter[filesIdx] = true; + } if (reporter != null) { reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), Operator.HIVECOUNTERCREATEDFILES, 1); @@ -598,27 +618,26 @@ public void processOp(Object row, int tag) throws HiveException { } - RecordWriter rowOutWriter = null; - if (row_count != null) { row_count.set(row_count.get() + 1); } - if (!multiFileSpray) { - rowOutWriter = rowOutWriters[0]; + int writerOffset = findWriterOffset(row); + // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same + // for a given operator branch prediction should work quite nicely on it. + if (conf.getWriteType() == FileSinkDesc.WriteType.NON_ACID) { + rowOutWriters[writerOffset].write(recordValue); + } else if (conf.getWriteType() == FileSinkDesc.WriteType.ACID_INSERT) { + fpaths.updaters[writerOffset].insert(conf.getTransactionId(), recordValue); } else { - int keyHashCode = 0; - for (int i = 0; i < partitionEval.length; i++) { - Object o = partitionEval[i].evaluate(row); - keyHashCode = keyHashCode * 31 - + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + if (conf.getWriteType() == FileSinkDesc.WriteType.ACID_UPDATE) { + fpaths.updaters[writerOffset].update(conf.getTransactionId(), recordValue); + } else if (conf.getWriteType() == FileSinkDesc.WriteType.ACID_DELETE) { + fpaths.updaters[writerOffset].delete(conf.getTransactionId(), recordValue); + } else { + throw new HiveException("Unknown write type " + conf.getWriteType().toString()); } - key.setHashCode(keyHashCode); - int bucketNum = prtner.getBucket(key, null, totalFiles); - int idx = bucketMap.get(bucketNum); - rowOutWriter = rowOutWriters[idx]; } - rowOutWriter.write(recordValue); } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { @@ -635,6 +654,23 @@ private boolean areAllTrue(boolean[] statsFromRW) { return true; } + private int findWriterOffset(Object row) throws HiveException { + if (!multiFileSpray) { + return 0; + } else { + int keyHashCode = 0; + for (int i = 0; i < partitionEval.length; i++) { + Object o = partitionEval[i].evaluate(row); + keyHashCode = keyHashCode * 31 + + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + } + key.setHashCode(keyHashCode); + int bucketNum = prtner.getBucket(key, null, totalFiles); + return bucketMap.get(bucketNum); + } + + } + /** * Lookup list bucketing path. * @param lbDirName @@ -733,8 +769,10 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive if (dpDir != null) { dpDir = appendToSource(lbDirName, dpDir); pathKey = dpDir; + int numericBucketNum = 0; if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { String buckNum = row.get(row.size() - 1); + numericBucketNum = Integer.valueOf(buckNum); taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum); pathKey = appendToSource(taskId, dpDir); } @@ -756,13 +794,18 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive // since we are closing the previous fsp's record writers, we need to see if we can get // stats from the record writer and store in the previous fsp that is cached if (conf.isGatherStats() && isCollectRWStats) { - RecordWriter outWriter = prevFsp.outWriters[0]; - if (outWriter != null) { - SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); - if (stats != null) { + SerDeStats stats = null; + if (conf.getWriteType() == FileSinkDesc.WriteType.NON_ACID) { + RecordWriter outWriter = prevFsp.outWriters[0]; + if (outWriter != null) { + stats = ((StatsProvidingRecordWriter) outWriter).getStats(); + } + } else if (prevFsp.updaters[0] != null) { + stats = prevFsp.updaters[0].getStats(); + } + if (stats != null) { prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); - } } } @@ -778,7 +821,7 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive } if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { - createBucketForFileIdx(fsp2, 0); + createBucketForFileIdx(fsp2, 0, numericBucketNum); valToPaths.put(pathKey, fsp2); } } @@ -832,6 +875,7 @@ public void startGroup() throws HiveException { @Override public void closeOp(boolean abort) throws HiveException { + if (!bDynParts && !filesCreated) { createBucketFiles(fsp); } @@ -849,13 +893,25 @@ public void closeOp(boolean abort) throws HiveException { // record writer already gathers the statistics, it can simply return the // accumulated statistics which will be aggregated in case of spray writers if (conf.isGatherStats() && isCollectRWStats) { - for (int idx = 0; idx < fsp.outWriters.length; idx++) { - RecordWriter outWriter = fsp.outWriters[idx]; - if (outWriter != null) { - SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); - if (stats != null) { - fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); - fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + if (conf.getWriteType() == FileSinkDesc.WriteType.NON_ACID) { + for (int idx = 0; idx < fsp.outWriters.length; idx++) { + RecordWriter outWriter = fsp.outWriters[idx]; + if (outWriter != null) { + SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); + if (stats != null) { + fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + } + } + } + } else { + for (int i = 0; i < fsp.updaters.length; i++) { + if (fsp.updaters[i] != null) { + SerDeStats stats = fsp.updaters[i].getStats(); + if (stats != null) { + fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5de58e..cad61ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -140,7 +140,7 @@ public int hashCode() { String tableName; String partName; List vcs; - Writable[] vcValues; + Object[] vcValues; private boolean isPartitioned() { return partObjectInspector != null; @@ -165,7 +165,7 @@ public StructObjectInspector getRowObjectInspector() { * op. * * @param hconf - * @param mrwork + * @param mapWork * @throws HiveException */ public void initializeAsRoot(Configuration hconf, MapWork mapWork) @@ -250,13 +250,13 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, // The op may not be a TableScan for mapjoins // Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key; - // In that case, it will be a Select, but the rowOI need not be ammended + // In that case, it will be a Select, but the rowOI need not be amended if (ctx.op instanceof TableScanOperator) { TableScanOperator tsOp = (TableScanOperator) ctx.op; TableScanDesc tsDesc = tsOp.getConf(); if (tsDesc != null && tsDesc.hasVirtualCols()) { opCtx.vcs = tsDesc.getVirtualCols(); - opCtx.vcValues = new Writable[opCtx.vcs.size()]; + opCtx.vcValues = new Object[opCtx.vcs.size()]; opCtx.vcsObjectInspector = VirtualColumn.getVCSObjectInspector(opCtx.vcs); if (opCtx.isPartitioned()) { opCtx.rowWithPartAndVC = Arrays.copyOfRange(opCtx.rowWithPart, 0, 3); @@ -550,13 +550,13 @@ public void process(Writable value) throws HiveException { } } - public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx, - List vcs, Writable[] vcValues, Deserializer deserializer) { + public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, + List vcs, Object[] vcValues, Deserializer deserializer) { if (vcs == null) { return vcValues; } if (vcValues == null) { - vcValues = new Writable[vcs.size()]; + vcValues = new Object[vcs.size()]; } for (int i = 0; i < vcs.size(); i++) { VirtualColumn vc = vcs.get(i); @@ -602,6 +602,17 @@ public void process(Writable value) throws HiveException { old.set(current); } } + else if(vc.equals(VirtualColumn.ROWID)) { + if(vcValues[i] == null) { + vcValues[i] = new Object[3]; + } + Object[] recordIdentifierStruct = (Object[])vcValues[i]; + //order of elements in the array should match RecordIdentifier.sti + //todo: add a method on RecordIdentifier to do this + recordIdentifierStruct[0] = ctx.getIoCxt().getCurrentRowId().getTransactionId(); + recordIdentifierStruct[1] = ctx.getIoCxt().getCurrentRowId().getBucketId(); + recordIdentifierStruct[2] = ctx.getIoCxt().getCurrentRowId().getRowId(); + } } return vcValues; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 4e0fd79..7fb4c46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -155,7 +155,7 @@ public void configure(JobConf job) { } } } - + @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { if (oc == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index f874d86..44c58b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -20,21 +20,18 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.IOContext.Comparison; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; @@ -42,16 +39,13 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; /** This class prepares an IOContext, and provides the ability to perform a binary search on the * data. The binary search can be used by setting the value of inputFormatSorted in the @@ -119,7 +113,13 @@ public boolean next(K key, V value) throws IOException { } updateIOContext(); try { - return doNext(key, value); + boolean retVal = doNext(key, value); + if(recordReader instanceof OrcInputFormat.RIAwareRecordReader) { + OrcInputFormat.RIAwareRecordReader ri = (OrcInputFormat.RIAwareRecordReader)recordReader; + ioCxtRef.currentRowId = new RecordIdentifier(); + ioCxtRef.currentRowId.set(ri.id); + } + return retVal; } catch (IOException e) { ioCxtRef.setIOExceptions(true); throw e; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index c3a83d4..d0d278e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; @@ -249,21 +250,8 @@ private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf, public static RecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class outputClass, FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { - boolean storagehandlerofhivepassthru = false; - HiveOutputFormat hiveOutputFormat; + HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); try { - if (tableInfo.getJobProperties() != null) { - if (tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) { - jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY)); - storagehandlerofhivepassthru = true; - } - } - if (storagehandlerofhivepassthru) { - hiveOutputFormat = ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(),jc); - } - else { - hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance(); - } boolean isCompressed = conf.getCompressed(); JobConf jc_output = jc; if (isCompressed) { @@ -299,6 +287,71 @@ public static RecordWriter getRecordWriter(JobConf jc, return null; } + private static HiveOutputFormat getHiveOutputFormat(JobConf jc, TableDesc tableInfo) + throws HiveException { + boolean storagehandlerofhivepassthru = false; + HiveOutputFormat hiveOutputFormat; + try { + if (tableInfo.getJobProperties() != null) { + if (tableInfo.getJobProperties().get( + HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) { + jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY, + tableInfo.getJobProperties() + .get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY)); + storagehandlerofhivepassthru = true; + } + } + if (storagehandlerofhivepassthru) { + return ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(), jc); + } else { + return tableInfo.getOutputFileFormatClass().newInstance(); + } + } catch (Exception e) { + throw new HiveException(e); + } + } + + public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket, + FileSinkDesc conf, Path outPath, + ObjectInspector inspector, + Reporter reporter) + throws HiveException, IOException { + HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); + AcidOutputFormat acidOutputFormat = null; + if (hiveOutputFormat instanceof AcidOutputFormat) { + acidOutputFormat = (AcidOutputFormat)hiveOutputFormat; + } else { + throw new HiveException("Unable to create RecordUpdater for HiveOutputFormat that does not " + + "implement AcidOutputFormat"); + } + // TODO not 100% sure about this. This call doesn't set the compression type in the conf + // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not + // sure if this is correct or not. + return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(), + bucket, inspector, tableInfo.getProperties(), outPath, reporter); + } + + + private static RecordUpdater getRecordUpdater(JobConf jc, + AcidOutputFormat acidOutputFormat, + boolean isCompressed, + long txnId, + int bucket, + ObjectInspector inspector, + Properties tableProp, + Path outPath, + Reporter reporter) throws IOException { + return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc) + .isCompressed(isCompressed) + .tableProperties(tableProp) + .reporter(reporter) + .writingBase(false) + .minimumTransactionId(txnId) + .maximumTransactionId(txnId) + .bucket(bucket) + .inspector(inspector)); + } + public static PartitionDesc getPartitionDescFromPathRecursively( Map pathToPartitionInfo, Path dir, Map, Map> cacheMap) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 8cbf32f..d229d46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -58,6 +58,7 @@ public static void clear() { long currentRow; boolean isBlockPointer; boolean ioExceptions; + RecordIdentifier currentRowId = new RecordIdentifier(711, 712, 713); // Are we using the fact the input is sorted boolean useSorted = false; @@ -111,6 +112,14 @@ public void setCurrentRow(long currentRow) { this.currentRow = currentRow; } + /** + * Tables using AcidInputFormat support a row id + * @return + */ + public RecordIdentifier getCurrentRowId() { + return currentRowId; + } + public boolean isBlockPointer() { return isBlockPointer; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 38a0d6b..2e02518 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -19,16 +19,41 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; /** * Gives the Record identifer information for the current record. */ public class RecordIdentifier implements WritableComparable { + + public static final String ROW__ID__TXN__ID = "transactionId"; + public static final String ROW__ID__BKT__ID = "bucketId"; + public static final String ROW__ID__ROW__ID = "rowId"; + + private final static List fieldNames = + Arrays.asList(ROW__ID__TXN__ID, ROW__ID__BKT__ID, ROW__ID__ROW__ID); + public final static TypeInfo sti = TypeInfoFactory.getStructTypeInfo( + fieldNames, + Arrays.asList((TypeInfo)TypeInfoFactory.longTypeInfo, TypeInfoFactory.intTypeInfo, + TypeInfoFactory.longTypeInfo) + ); + public final static ObjectInspector oi = ObjectInspectorFactory.getStandardStructObjectInspector( + fieldNames, + Arrays.asList((ObjectInspector)PrimitiveObjectInspectorFactory.javaLongObjectInspector, + PrimitiveObjectInspectorFactory.javaIntObjectInspector, + PrimitiveObjectInspectorFactory.javaLongObjectInspector)); + private long transactionId; private int bucketId; private long rowId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java index 192d216..093f40b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -40,26 +40,17 @@ void insert(long currentTransaction, /** * Update an old record with a new set of values. * @param currentTransaction the current transaction id - * @param originalTransaction the row's original transaction id - * @param rowId the original row id * @param row the new values for the row * @throws IOException */ - void update(long currentTransaction, - long originalTransaction, - long rowId, - Object row) throws IOException; + void update(long currentTransaction, Object row) throws IOException; /** * Delete a row from the table. * @param currentTransaction the current transaction id - * @param originalTransaction the rows original transaction id - * @param rowId the original row id * @throws IOException */ - void delete(long currentTransaction, - long originalTransaction, - long rowId) throws IOException; + void delete(long currentTransaction, Object row) throws IOException; /** * Flush the current set of rows to the underlying file system, so that diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 7edb3c2..f113e34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1002,61 +1002,64 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, OrcSplit split = (OrcSplit) inputSplit; reporter.setStatus(inputSplit.toString()); + Options options = new Options(conf).reporter(reporter); + final RowReader inner = getReader(inputSplit, options); // if we are strictly old-school, just use the old code if (split.isOriginal() && split.getDeltas().isEmpty()) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); } else { - return new OrcRecordReader(OrcFile.createReader(split.getPath(), - OrcFile.readerOptions(conf)), conf, split); + return new RIAwareRecordReader(inner); } } - Options options = new Options(conf).reporter(reporter); - final RowReader inner = getReader(inputSplit, options); if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); } - final RecordIdentifier id = inner.createKey(); - - // Return a RecordReader that is compatible with the Hive 0.12 reader - // with NullWritable for the key instead of RecordIdentifier. - return new org.apache.hadoop.mapred.RecordReader(){ - @Override - public boolean next(NullWritable nullWritable, - OrcStruct orcStruct) throws IOException { - return inner.next(id, orcStruct); - } + return new RIAwareRecordReader(inner); + } + //a RecordReader that is compatible with the Hive 0.12 reader + // with NullWritable for the key instead of RecordIdentifier. + public static class RIAwareRecordReader implements org.apache.hadoop.mapred.RecordReader { + private final RowReader inner; + public RecordIdentifier id; + private RIAwareRecordReader(final RowReader inner) { + this.inner = inner; + id = inner.createKey(); + } + @Override + public boolean next(NullWritable nullWritable, + OrcStruct orcStruct) throws IOException { + return inner.next(id, orcStruct); + }//2)and here we want to load the RI - @Override - public NullWritable createKey() { - return NullWritable.get(); - } + @Override + public NullWritable createKey() { + return NullWritable.get(); + } - @Override - public OrcStruct createValue() { - return inner.createValue(); - } + @Override + public OrcStruct createValue() { + return inner.createValue();//1)so here we want to augment this struct with RecordIdentifier + } - @Override - public long getPos() throws IOException { - return inner.getPos(); - } + @Override + public long getPos() throws IOException { + return inner.getPos(); + } - @Override - public void close() throws IOException { - inner.close(); - } + @Override + public void close() throws IOException { + inner.close(); + } - @Override - public float getProgress() throws IOException { - return inner.getProgress(); - } - }; + @Override + public float getProgress() throws IOException { + return inner.getProgress(); + } } - @Override public RowReader getReader(InputSplit inputSplit, Options options) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 00e0807..0a13211 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -211,18 +211,14 @@ public void insert(long currentTransaction, Object row) throws IOException { } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { out.println("update " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId + " obj: " + - stringifyObject(row, inspector)); + " obj: " + stringifyObject(row, inspector)); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { - out.println("delete " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId); + public void delete(long currentTransaction, Object row) throws IOException { + out.println("delete " + path + " currTxn: " + currentTransaction + " obj: " + row); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 8f17c12..570a8d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -28,21 +28,21 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; @@ -88,7 +88,15 @@ private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); private long insertedRows = 0; + // This records how many rows have been inserted or deleted. It is separate from insertedRows + // because that is monotonically increasing to give new unique row ids. + private long rowCountDelta = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private StructField rowIdField = null; + private StructField numericRowIdField = null; + private StructField originalTxnField = null; + private StructObjectInspector rowInspector; + private StructObjectInspector rowIdInspector; static class AcidStats { long inserts; @@ -176,7 +184,7 @@ public OrcOptions orcOptions(OrcFile.WriterOptions opts) { * @param rowInspector the row's object inspector * @return an object inspector for the event stream */ - static ObjectInspector createEventSchema(ObjectInspector rowInspector) { + static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { List fields = new ArrayList(); fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); @@ -234,7 +242,8 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.bufferSize(DELTA_BUFFER_SIZE); writerOptions.stripeSize(DELTA_STRIPE_SIZE); } - writerOptions.inspector(createEventSchema(options.getInspector())); + rowInspector = (StructObjectInspector)options.getInspector(); + writerOptions.inspector(createEventSchema(findRowId(options.getInspector()))); this.writer = OrcFile.createWriter(this.path, writerOptions); item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); @@ -244,13 +253,50 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } - private void addEvent(int operation, long currentTransaction, - long originalTransaction, long rowId, - Object row) throws IOException { + private ObjectInspector findRowId(ObjectInspector inspector) { + if (!(inspector instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, but got a " + + inspector.getClass().getName()); + } + RowIdStrippingObjectInspector newInspector = new RowIdStrippingObjectInspector(inspector); + rowIdField = newInspector.getRowId(); + if (rowIdField == null) { + return inspector; + } else { + List fields = + ((StructObjectInspector) rowIdField.getFieldObjectInspector()).getAllStructFieldRefs(); + for (StructField field : fields) { + LOG.debug("Found columnname " + field.getFieldName()); + if (RecordIdentifier.ROW__ID__TXN__ID.equalsIgnoreCase(field.getFieldName())) { + originalTxnField = field; + } else if (RecordIdentifier.ROW__ID__ROW__ID.equalsIgnoreCase(field.getFieldName())) { + numericRowIdField = field; + } + } + if (originalTxnField == null || numericRowIdField == null) { + throw new RuntimeException("Serious problem: Found rowId struct but could not find rowId" + + " or originalTxn field!"); + } + rowIdInspector = (StructObjectInspector)rowIdField.getFieldObjectInspector(); + return newInspector; + } + } + + private void addEvent(int operation, long currentTransaction, long rowId, + boolean readRowIdFromRow, Object row) + throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); - this.originalTransaction.set(originalTransaction); + long originalTransaction = currentTransaction; + if (readRowIdFromRow) { + Object rowIdValue = rowInspector.getStructFieldData(row, rowIdField); + originalTransaction = PrimitiveObjectInspectorFactory.javaLongObjectInspector.get( + rowIdInspector.getStructFieldData(rowIdValue, originalTxnField)); + rowId = PrimitiveObjectInspectorFactory.javaLongObjectInspector.get( + rowIdInspector.getStructFieldData(rowIdValue, numericRowIdField)); + } this.rowId.set(rowId); + this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.ROW, row); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); @@ -261,28 +307,27 @@ public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(INSERT_OPERATION, currentTransaction, currentTransaction, - insertedRows++, row); + + addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, false, row); + rowCountDelta++; } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(UPDATE_OPERATION, currentTransaction, originalTransaction, rowId, - row); + addEvent(UPDATE_OPERATION, currentTransaction, -1L, true, row); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { + public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, originalTransaction, rowId, - null); + + addEvent(DELETE_OPERATION, currentTransaction, -1, true, row); + rowCountDelta--; } @Override @@ -317,7 +362,11 @@ public void close(boolean abort) throws IOException { @Override public SerDeStats getStats() { - return null; + SerDeStats stats = new SerDeStats(); + stats.setRowCount(rowCountDelta); + // Don't worry about setting raw data size diff. I have no idea how to calculate that + // without finding the row we are updating or deleting, which would be a mess. + return stats; } @VisibleForTesting @@ -397,4 +446,62 @@ void addKey(int op, long transaction, int bucket, long rowId) { lastRowId = rowId; } } + + private static class RowIdStrippingObjectInspector extends StructObjectInspector { + private StructObjectInspector wrapped; + List fields; + StructField rowId; + + RowIdStrippingObjectInspector(ObjectInspector oi) { + if (!(oi instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, " + + "but got a " + oi.getClass().getName()); + } + wrapped = (StructObjectInspector)oi; + fields = new ArrayList(wrapped.getAllStructFieldRefs().size()); + for (StructField field : wrapped.getAllStructFieldRefs()) { + if (VirtualColumn.ROWID.getName().equalsIgnoreCase(field.getFieldName())) { + rowId = field; + } else { + fields.add(field); + } + } + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return wrapped.getStructFieldRef(fieldName); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + // For performance don't check that that the fieldRef isn't rowId everytime, + // just assume that the caller used getAllStructFieldRefs and thus doesn't have that fieldRef + return wrapped.getStructFieldData(data, fieldRef); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return wrapped.getStructFieldsDataAsList(data); + } + + @Override + public String getTypeName() { + return wrapped.getTypeName(); + } + + @Override + public Category getCategory() { + return wrapped.getCategory(); + } + + StructField getRowId() { + return rowId; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java index 0637d46..8e67a3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java @@ -25,11 +25,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; public class VirtualColumn implements Serializable { @@ -41,6 +43,10 @@ public static VirtualColumn ROWOFFSET = new VirtualColumn("ROW__OFFSET__INSIDE__BLOCK", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); public static VirtualColumn RAWDATASIZE = new VirtualColumn("RAW__DATA__SIZE", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); + /** + * {@link org.apache.hadoop.hive.ql.io.RecordIdentifier} + */ + public static VirtualColumn ROWID = new VirtualColumn("ROW__ID", RecordIdentifier.sti, true, RecordIdentifier.oi); /** * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. @@ -53,23 +59,26 @@ new VirtualColumn("GROUPING__ID", (PrimitiveTypeInfo) TypeInfoFactory.intTypeInfo); public static VirtualColumn[] VIRTUAL_COLUMNS = - new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID}; + new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID, ROWID}; private String name; - private PrimitiveTypeInfo typeInfo; + private TypeInfo typeInfo; private boolean isHidden = true; + private ObjectInspector oi; public VirtualColumn() { } public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { - this(name, typeInfo, true); + this(name, typeInfo, true, + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo)); } - VirtualColumn(String name, PrimitiveTypeInfo typeInfo, boolean isHidden) { + VirtualColumn(String name, TypeInfo typeInfo, boolean isHidden, ObjectInspector oi) { this.name = name; this.typeInfo = typeInfo; this.isHidden = isHidden; + this.oi = oi; } public static List getStatsRegistry(Configuration conf) { @@ -87,11 +96,12 @@ public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEROWOFFSET)) { l.add(ROWOFFSET); } + l.add(ROWID); return l; } - public PrimitiveTypeInfo getTypeInfo() { + public TypeInfo getTypeInfo() { return typeInfo; } @@ -118,6 +128,9 @@ public boolean getIsHidden() { public void setIsHidden(boolean isHidden) { this.isHidden = isHidden; } + public ObjectInspector getObjectInspector() { + return oi; + } @Override public boolean equals(Object o) { @@ -144,8 +157,7 @@ public static StructObjectInspector getVCSObjectInspector(List vc List inspectors = new ArrayList(vcs.size()); for (VirtualColumn vc : vcs) { names.add(vc.getName()); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.oi); } return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 301dde5..8ed143d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -84,6 +84,11 @@ private boolean statsCollectRawDataSize; + // Record what type of write this is. Default is non-ACID (ie old style). + public enum WriteType {NON_ACID, ACID_INSERT, ACID_UPDATE, ACID_DELETE}; + private WriteType writeType = WriteType.NON_ACID; + private long txnId = 0; // transaction id for this operation + public FileSinkDesc() { } @@ -137,6 +142,8 @@ public Object clone() throws CloneNotSupportedException { ret.setMaxStatsKeyPrefixLength(maxStatsKeyPrefixLength); ret.setStatsCollectRawDataSize(statsCollectRawDataSize); ret.setDpSortState(dpSortState); + ret.setWriteType(writeType); + ret.setTransactionId(txnId); return (Object) ret; } @@ -398,4 +405,20 @@ public DPSortState getDpSortState() { public void setDpSortState(DPSortState dpSortState) { this.dpSortState = dpSortState; } + + public void setWriteType(WriteType type) { + writeType = type; + } + + public WriteType getWriteType() { + return writeType; + } + + public void setTransactionId(long id) { + txnId = id; + } + + public long getTransactionId() { + return txnId; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java new file mode 100644 index 0000000..02196a2 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -0,0 +1,741 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Tests for {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator} + */ +public class TestFileSinkOperator { + private static final String TABLE_NAME = "fso_tab"; + private static String COL_NAME = "foo"; + private static String PARTCOL_NAME = "partcol"; + static final private Log LOG = LogFactory.getLog(TestFileSinkOperator.class.getName()); + + private static File tmpdir; + private static TableDesc nonAcidTableDescriptor; + private static TableDesc acidTableDescriptor; + private static ObjectInspector inspectorType = + PrimitiveObjectInspectorFactory.javaStringObjectInspector; + private static ObjectInspector rowIdInspectorType = RecordIdentifier.oi; + private static ObjectInspector partColInspectorType = + PrimitiveObjectInspectorFactory.javaStringObjectInspector; + private static ObjectInspector inspector; + private static ObjectInspector rowIdInspector; + private static ObjectInspector partColInspector; + private static ObjectInspector rowIdPartColInspector; + private static List rows; + private static List rowsWithRowId; + private static List rowsWithPartCol; + private static List rowsWithRowIdAndPartCol; + private static ValidTxnList txnList; + + private int nextFile = 1; + private Path basePath; + private JobConf jc; + + @BeforeClass + public static void classSetup() { + Properties properties = new Properties(); + properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName()); + nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); + properties = new Properties(properties); + properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1"); + acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); + + tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + + "testFileSinkOperator"); + tmpdir.mkdir(); + tmpdir.deleteOnExit(); + setupData(); + txnList = new ValidTxnListImpl(new long[]{}, 2); + } + + @Test + public void testNonAcidWrite() throws Exception { + setBasePath("write"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.NON_ACID, false); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{inspector}); + + for (TFSORecord r : rows) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + + confirmOutput(rows); + } + + @Test + public void testInsert() throws Exception { + setBasePath("insert"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.ACID_INSERT, false); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{inspector}); + + // Handle transaction stuff + for (TFSORecord r : rows) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + + confirmOutput(rows); + } + + @Test + public void testUpdate() throws Exception { + setBasePath("update"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.ACID_UPDATE, false); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{rowIdInspector}); + + // Handle transaction stuff + for (TFSORecord r : rowsWithRowId) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + + confirmOutput(rowsWithRowId); + } + + @Test + public void testDelete() throws Exception { + setBasePath("delete"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.ACID_DELETE, false); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{rowIdInspector}); + + // Handle transaction stuff + for (TFSORecord r : rowsWithRowId) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + + confirmOutput(rowsWithRowId); + } + + @Test + public void testNonAcidDynamicPartitioning() throws Exception { + setBasePath("writeDP"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.NON_ACID, true); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{partColInspector}); + + for (TFSORecord r : rowsWithPartCol) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + + confirmOutput(rowsWithPartCol); + } + + @Test + public void testInsertDynamicPartitioning() throws Exception { + setBasePath("insertDP"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.ACID_INSERT, true); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{rowIdPartColInspector}); + + // Handle transaction stuff + for (TFSORecord r : rowsWithRowIdAndPartCol) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + // We only expect 5 here because we'll get whichever of the partitions published its stats + // last. + Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + + confirmOutput(rowsWithRowIdAndPartCol); + } + + @Test + public void testUpdateDynamicPartitioning() throws Exception { + setBasePath("updateDP"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.ACID_UPDATE, true); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{rowIdPartColInspector}); + + // Handle transaction stuff + for (TFSORecord r : rowsWithRowIdAndPartCol) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + + confirmOutput(rowsWithRowIdAndPartCol); + } + + @Test + public void testDeleteDynamicPartitioning() throws Exception { + setBasePath("deleteDP"); + FileSinkDesc conf = getFileSinkDesc(FileSinkDesc.WriteType.ACID_DELETE, true); + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(conf); + op.initialize(jc, new ObjectInspector[]{rowIdPartColInspector}); + + // Handle transaction stuff + for (TFSORecord r : rowsWithRowIdAndPartCol) op.processOp(r.getRow(), 0); + op.jobCloseOp(jc, true); + op.close(false); + // We only expect -5 here because we'll get whichever of the partitions published its stats + // last. + Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + + confirmOutput(rowsWithRowIdAndPartCol); + } + + @Before + public void setup() throws Exception { + jc = new JobConf(); + jc.set(StatsSetupConst.STATS_TMP_LOC, File.createTempFile("TestFileSinkOperator", + "stats").getPath()); + jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER.varname, + TFSOStatsPublisher.class.getName()); + jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR.varname, + TFSOStatsAggregator.class.getName()); + jc.set(HiveConf.ConfVars.HIVESTATSDBCLASS.varname, "custom"); + setupData(); + } + + private void setBasePath(String testName) { + basePath = new Path(new File(tmpdir, testName).getPath()); + + } + + private static void setupData() { + // Build object inspector + List names = new ArrayList(); + names.add(COL_NAME); + List oi = new ArrayList(1); + oi.add(inspectorType); + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(names, oi); + + names.clear(); + names.add(COL_NAME); + names.add(VirtualColumn.ROWID.getName()); + oi.clear(); + oi.add(inspectorType); + oi.add(rowIdInspectorType); + rowIdInspector = ObjectInspectorFactory.getStandardStructObjectInspector(names, oi); + + names.clear(); + names.add(COL_NAME); + names.add(PARTCOL_NAME); + oi.clear(); + oi.add(inspectorType); + oi.add(partColInspectorType); + partColInspector = ObjectInspectorFactory.getStandardStructObjectInspector(names, oi); + + names.clear(); + names.add(COL_NAME); + names.add(PARTCOL_NAME); + names.add(VirtualColumn.ROWID.getName()); + oi.clear(); + oi.add(inspectorType); + oi.add(rowIdInspectorType); + oi.add(partColInspectorType); + rowIdPartColInspector = ObjectInspectorFactory.getStandardStructObjectInspector(names, oi); + + // Build rows + rows = new ArrayList(); + for (int i = 0; i < 10; i++) { + List row = new ArrayList(); + row.add(new Text("mary had a little lamb")); + rows.add(new TFSORecord(row)); + } + + rowsWithRowId = new ArrayList(); + for (int i = 0; i < 10; i++) { + List row = new ArrayList(); + row.add(new Text("its fleece was white as snow")); + row.add(new RecordIdentifier(1, 1, i)); + rowsWithRowId.add(new TFSORecord(row)); + } + + rowsWithPartCol = new ArrayList(); + for (int i = 0; i < 10; i++) { + List row = new ArrayList(); + row.add(new Text("and everywhere that Mary went")); + if (i < 5) row.add(new Text("Monday")); + else row.add(new Text("Tuesday")); + rowsWithPartCol.add(new TFSORecord(row)); + } + + rowsWithRowIdAndPartCol = new ArrayList(); + for (int i = 0; i < 10; i++) { + List row = new ArrayList(); + row.add(new Text("the lamb was sure to go")); + row.add(new RecordIdentifier(1, 1, i)); + if (i < 5) row.add(new Text("Monday")); + else row.add(new Text("Tuesday")); + rowsWithRowIdAndPartCol.add(new TFSORecord(row)); + } + } + + private FileSinkDesc getFileSinkDesc(FileSinkDesc.WriteType writeType, + boolean dynamic) throws IOException { + TableDesc tableDesc = null; + switch (writeType) { + case ACID_DELETE: + case ACID_UPDATE: + case ACID_INSERT: + tableDesc = acidTableDescriptor; + break; + + case NON_ACID: + tableDesc = nonAcidTableDescriptor; + break; + } + FileSinkDesc desc = null; + if (dynamic) { + ArrayList partCols = new ArrayList(1); + partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true)); + Map partColMap= new LinkedHashMap(1); + partColMap.put(PARTCOL_NAME, null); + DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); + Map partColNames = new HashMap(1); + partColNames.put(PARTCOL_NAME, PARTCOL_NAME); + dpCtx.setInputToDPCols(partColNames); + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx); + } else { + desc = new FileSinkDesc(basePath, tableDesc, false); + } + desc.setWriteType(writeType); + desc.setGatherStats(true); + if (writeType != FileSinkDesc.WriteType.NON_ACID) desc.setTransactionId(1L); + + return desc; + } + + private void confirmOutput(List expectedRows) + throws IOException, SerDeException { + Path[] paths = findFilesInBasePath(); + TFSOInputFormat input = new TFSOInputFormat(); + FileInputFormat.setInputPaths(jc, paths); + + InputSplit[] splits = input.getSplits(jc, 1); + RecordReader reader = input.getRecordReader(splits[0], jc, + Mockito.mock(Reporter.class)); + NullWritable key = reader.createKey(); + TFSORecord value = reader.createValue(); + for (int i = 0; i < expectedRows.size(); i++) { + Assert.assertTrue(reader.next(key, value)); + Assert.assertTrue(expectedRows.get(i).equals(value)); + } + Assert.assertFalse(reader.next(key, value)); + } + + private Path[] findFilesInBasePath() throws IOException { + Path parent = basePath.getParent(); + String last = basePath.getName(); + Path tmpPath = new Path(parent, "_tmp." + last); + FileSystem fs = basePath.getFileSystem(jc); + List paths = new ArrayList(); + recurseOnPath(tmpPath, fs, paths); + return paths.toArray(new Path[paths.size()]); + } + + private void recurseOnPath(Path p, FileSystem fs, List paths) throws IOException { + if (fs.getFileStatus(p).isDir()) { + FileStatus[] stats = fs.listStatus(p); + for (FileStatus stat : stats) recurseOnPath(stat.getPath(), fs, paths); + } else { + paths.add(p); + } + } + + private static class TFSORecord implements Writable { + private List row; + + TFSORecord() { + row = new ArrayList(); + } + + TFSORecord(List r) { + row = r; + } + + List getRow() { + return row; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeInt(row.size()); + for (Writable w : row) { + if (w instanceof Text) dataOutput.writeByte(0); + else if (w instanceof RecordIdentifier) dataOutput.writeByte(1); + else throw new RuntimeException("unknown type"); + w.write(dataOutput); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + row.clear(); + int sz = dataInput.readInt(); + for (int i = 0; i < sz; i++) { + Writable w = null; + byte type = dataInput.readByte(); + switch (type) { + case 0: w = new Text(); break; + case 1: w = new RecordIdentifier(); break; + default: throw new RuntimeException("Unknown type " + type); + } + w.readFields(dataInput); + row.add(w); + } + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TFSORecord) { + TFSORecord other = (TFSORecord)obj; + if (row.size() != other.row.size()) return false; + for (int i = 0; i < row.size(); i++) { + if (!row.get(i).equals(other.row.get(i))) return false; + } + return true; + } else { + return false; + } + } + } + + private static class TFSOInputFormat extends FileInputFormat + implements AcidInputFormat { + + FSDataInputStream in[] = null; + int readingFrom = -1; + + @Override + public RecordReader getRecordReader( + InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { + if (in == null) { + Path paths[] = FileInputFormat.getInputPaths(entries); + in = new FSDataInputStream[paths.length]; + FileSystem fs = paths[0].getFileSystem(entries); + for (int i = 0; i < paths.length; i++) { + in[i] = fs.open(paths[i]); + } + readingFrom = 0; + } + return new RecordReader() { + + @Override + public boolean next(NullWritable nullWritable, TFSORecord tfsoRecord) throws + IOException { + try { + tfsoRecord.readFields(in[readingFrom]); + return true; + } catch (EOFException e) { + in[readingFrom].close(); + if (++readingFrom >= in.length) return false; + else return next(nullWritable, tfsoRecord); + } + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public TFSORecord createValue() { + return new TFSORecord(); + } + + @Override + public long getPos() throws IOException { + return 0L; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0.0f; + } + }; + } + + @Override + public RowReader getReader(InputSplit split, + Options options) throws + IOException { + return null; + } + + @Override + public RawReader getRawReader(Configuration conf, + boolean collapseEvents, + int bucket, + ValidTxnList validTxnList, + Path baseDirectory, + Path[] deltaDirectory) throws + IOException { + return null; + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList files) throws + IOException { + return false; + } + } + + public static class TFSOOutputFormat extends FileOutputFormat + implements AcidOutputFormat { + List records = new ArrayList(); + long numRecordsAdded = 0; + FSDataOutputStream out = null; + + @Override + public RecordUpdater getRecordUpdater(final Path path, final Options options) throws + IOException { + return new RecordUpdater() { + @Override + public void insert(long currentTransaction, Object row) throws IOException { + addRow(row); + numRecordsAdded++; + } + + @Override + public void update(long currentTransaction, Object row) throws IOException { + addRow(row); + } + + @Override + public void delete(long currentTransaction, Object row) throws IOException { + addRow(row); + numRecordsAdded--; + } + + private void addRow(Object row) { + Assert.assertTrue(row instanceof TFSORecord); + records.add((TFSORecord)row); + } + + @Override + public void flush() throws IOException { + if (out == null) { + FileSystem fs = path.getFileSystem(options.getConfiguration()); + out = fs.create(path); + } + for (TFSORecord r : records) r.write(out); + records.clear(); + out.flush(); + } + + @Override + public void close(boolean abort) throws IOException { + flush(); + out.close(); + } + + @Override + public SerDeStats getStats() { + SerDeStats stats = new SerDeStats(); + stats.setRowCount(numRecordsAdded); + return stats; + } + }; + } + + @Override + public FileSinkOperator.RecordWriter getRawRecordWriter(Path path, + Options options) throws + IOException { + return null; + } + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter(final JobConf jc, + final Path finalOutPath, + Class valueClass, + boolean isCompressed, + Properties tableProperties, + Progressable progress) + throws IOException { + return new FileSinkOperator.RecordWriter() { + @Override + public void write(Writable w) throws IOException { + Assert.assertTrue(w instanceof TFSORecord); + records.add((TFSORecord) w); + } + + @Override + public void close(boolean abort) throws IOException { + if (out == null) { + FileSystem fs = finalOutPath.getFileSystem(jc); + out = fs.create(finalOutPath); + } + for (TFSORecord r : records) r.write(out); + records.clear(); + out.flush(); + out.close(); + } + }; + } + + @Override + public RecordWriter getRecordWriter( + FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws + IOException { + return null; + } + + @Override + public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException { + + } + } + + public static class TFSOSerDe implements SerDe { + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + + } + + @Override + public Class getSerializedClass() { + return TFSORecord.class; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + return new TFSORecord((List)obj); + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + Assert.assertTrue(blob instanceof TFSORecord); + return ((TFSORecord)blob).getRow(); + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return rowIdInspector; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } + } + + public static class TFSOStatsPublisher implements StatsPublisher { + static Map stats; + + @Override + public boolean init(Configuration hconf) { + return true; + } + + @Override + public boolean connect(Configuration hconf) { + return true; + } + + @Override + public boolean publishStat(String fileID, Map stats) { + this.stats = stats; + return true; + } + + @Override + public boolean closeConnection() { + return true; + } + } + + public static class TFSOStatsAggregator implements StatsAggregator { + + @Override + public boolean connect(Configuration hconf, Task sourceTask) { + return true; + } + + @Override + public String aggregateStats(String keyPrefix, String statType) { + return null; + } + + @Override + public boolean closeConnection() { + return true; + } + + @Override + public boolean cleanUp(String keyPrefix) { + return true; + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 35e30b8..94b98f2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -973,7 +973,7 @@ public void testInOutFormat() throws Exception { List fields =inspector.getAllStructFieldRefs(); IntObjectInspector intInspector = (IntObjectInspector) fields.get(0).getFieldObjectInspector(); - assertEquals(0.0, reader.getProgress(), 0.00001); + assertEquals(0.33, reader.getProgress(), 0.1); while (reader.next(key, value)) { assertEquals(++rowNum, intInspector.get(inspector. getStructFieldData(serde.deserialize(value), fields.get(0)))); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index b4ce4a0..ab629dc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -454,9 +454,16 @@ public void testNewBase() throws Exception { static class MyRow { Text col1; + RecordIdentifier ROW__ID; + MyRow(String val) { col1 = new Text(val); } + + MyRow(String val, long rowId, long origTxn, int bucket) { + col1 = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } } static String getValue(OrcStruct event) { @@ -534,11 +541,11 @@ public void testNewBaseAndDelta() throws Exception { // write a delta ru = of.getRecordUpdater(root, options.writingBase(false) .minimumTransactionId(200).maximumTransactionId(200)); - ru.update(200, 0, 0, new MyRow("update 1")); - ru.update(200, 0, 2, new MyRow("update 2")); - ru.update(200, 0, 3, new MyRow("update 3")); - ru.delete(200, 0, 7); - ru.delete(200, 0, 8); + ru.update(200, new MyRow("update 1", 0, 0, BUCKET)); + ru.update(200, new MyRow("update 2", 2, 0, BUCKET)); + ru.update(200, new MyRow("update 3", 3, 0, BUCKET)); + ru.delete(200, new MyRow("", 7, 0, BUCKET)); + ru.delete(200, new MyRow("", 8, 0, BUCKET)); ru.close(false); ValidTxnList txnList = new ValidTxnListImpl("200:"); @@ -607,13 +614,13 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -693,7 +700,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -705,7 +712,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -747,6 +754,7 @@ public void testNewBaseAndDelta() throws Exception { Text mytext; float myfloat; double mydouble; + RecordIdentifier ROW__ID; BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) { this.myint = myint; @@ -754,6 +762,21 @@ public void testNewBaseAndDelta() throws Exception { this.mytext = new Text(mytext); this.myfloat = myfloat; this.mydouble = mydouble; + ROW__ID = null; + } + + BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble, + long rowId, long origTxn, int bucket) { + this.myint = myint; + this.mylong = mylong; + this.mytext = new Text(mytext); + this.myfloat = myfloat; + this.mydouble = mydouble; + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + + BigRow(long rowId, long origTxn, int bucket) { + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } } @@ -808,10 +831,10 @@ synchronized void addedRow() throws IOException { "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -820,10 +843,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); @@ -908,10 +931,10 @@ synchronized void addedRow() throws IOException { "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -920,10 +943,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index b53bd85..3604d5d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -23,7 +23,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; @@ -63,9 +65,18 @@ public void testAccessors() throws Exception { static class MyRow { Text field; + RecordIdentifier ROW__ID; + MyRow(String val) { field = new Text(val); + ROW__ID = null; + } + + MyRow(String val, long rowId, long origTxn, int bucket) { + field = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } + } @Test @@ -95,6 +106,10 @@ public void testWriter() throws Exception { updater.insert(12, new MyRow("fourth")); updater.insert(12, new MyRow("fifth")); updater.flush(); + + // Check the stats + assertEquals(5L, updater.getStats().getRowCount()); + Path bucketPath = AcidUtils.createFilename(root, options); Path sidePath = OrcRecordUpdater.getSideFile(bucketPath); DataInputStream side = fs.open(sidePath); @@ -158,6 +173,8 @@ public void testWriter() throws Exception { reader = OrcFile.createReader(bucketPath, new OrcFile.ReaderOptions(conf).filesystem(fs)); assertEquals(6, reader.getNumberOfRows()); + assertEquals(6L, updater.getStats().getRowCount()); + assertEquals(false, fs.exists(sidePath)); } @@ -171,17 +188,19 @@ public void testUpdates() throws Exception { inspector = ObjectInspectorFactory.getReflectionObjectInspector (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } + int bucket = 20; AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) - .bucket(20) + .bucket(bucket) .writingBase(false) .minimumTransactionId(100) .maximumTransactionId(100) .inspector(inspector) .reporter(Reporter.NULL); RecordUpdater updater = new OrcRecordUpdater(root, options); - updater.update(100, 10, 30, new MyRow("update")); - updater.delete(100, 40, 60); + updater.update(100, new MyRow("update", 30, 10, bucket)); + updater.delete(100, new MyRow("", 60, 40, bucket)); + assertEquals(-1L, updater.getStats().getRowCount()); updater.close(false); Path bucketPath = AcidUtils.createFilename(root, options); @@ -208,7 +227,7 @@ public void testUpdates() throws Exception { assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row)); assertEquals(20, OrcRecordUpdater.getBucket(row)); assertEquals(60, OrcRecordUpdater.getRowId(row)); - assertEquals(null, OrcRecordUpdater.getRow(row)); + assertEquals(1, OrcRecordUpdater.getRow(row).getNumFields()); assertEquals(false, rows.hasNext()); } }