diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index d4e61d8..7e9dc19 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -29,12 +30,15 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -62,6 +66,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -100,6 +105,10 @@ private transient List keyWritables; private transient List keys; private transient int numKeyColToRead; + private StructField recIdField; // field to find record identifier in + private StructField bucketField; // field bucket is in in record id + private StructObjectInspector recIdInspector; // OI for inspecting record id + private IntObjectInspector bucketInspector; // OI for inspecting bucket id /** * RecordWriter. @@ -117,7 +126,10 @@ Path[] outPaths; Path[] finalPaths; RecordWriter[] outWriters; + RecordUpdater[] updaters; Stat stat; + int acidLastBucket = -1; + int acidFileOffset = -1; public FSPaths() { } @@ -128,6 +140,8 @@ public FSPaths(Path specPath) { outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; + updaters = new RecordUpdater[numFiles]; + LOG.debug("Created slots for " + numFiles); stat = new Stat(); } @@ -168,6 +182,15 @@ public void closeWriters(boolean abort) throws HiveException { } } } + try { + for (int i = 0; i < updaters.length; i++) { + if (updaters[i] != null) { + updaters[i].close(abort); + } + } + } catch (IOException e) { + throw new HiveException(e); + } } private void commit(FileSystem fs) throws HiveException { @@ -177,7 +200,21 @@ private void commit(FileSystem fs) throws HiveException { && !fs.exists(finalPaths[idx].getParent())) { fs.mkdirs(finalPaths[idx].getParent()); } - if (!fs.rename(outPaths[idx], finalPaths[idx])) { + boolean needToRename = true; + if (conf.getWriteType() == AcidUtils.Operation.UPDATE || + conf.getWriteType() == AcidUtils.Operation.DELETE) { + // If we're updating or deleting there may be no file to close. This can happen + // because the where clause strained out all of the records for a given bucket. So + // before attempting the rename below, check if our file exists. If it doesn't, + // then skip the rename. If it does try it. We could just blindly try the rename + // and avoid the extra stat, but that would mask other errors. + try { + FileStatus stat = fs.getFileStatus(outPaths[idx]); + } catch (FileNotFoundException fnfe) { + needToRename = false; + } + } + if (needToRename && !fs.rename(outPaths[idx], finalPaths[idx])) { throw new HiveException("Unable to rename output from: " + outPaths[idx] + " to: " + finalPaths[idx]); } @@ -350,6 +387,16 @@ protected void initializeOp(Configuration hconf) throws HiveException { valToPaths.put("", fsp); // special entry for non-DP case } } + + if (conf.getWriteType() == AcidUtils.Operation.UPDATE || + conf.getWriteType() == AcidUtils.Operation.DELETE) { + // ROW__ID is always in the first field + recIdField = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs().get(0); + recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector(); + // bucket is the second field in the record id + bucketField = recIdInspector.getAllStructFieldRefs().get(1); + bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); + } initializeChildren(hconf); } catch (HiveException e) { throw e; @@ -420,6 +467,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { assert totalFiles == 1; } + int bucketNum = 0; if (multiFileSpray) { key.setHashCode(idx); @@ -436,7 +484,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { } } - int bucketNum = prtner.getBucket(key, null, totalFiles); + bucketNum = prtner.getBucket(key, null, totalFiles); if (seenBuckets.contains(bucketNum)) { continue; } @@ -462,7 +510,8 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { filesCreated = true; } - protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { + protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) + throws HiveException { try { if (isNativeTable) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null); @@ -493,11 +542,21 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveExce Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition - fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), - outputClass, conf, fsp.outPaths[filesIdx], reporter); - // If the record writer provides stats, get it from there instead of the serde - statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter; - // increment the CREATED_FILES counter + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { + fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(), + outputClass, conf, fsp.outPaths[filesIdx], reporter); + // If the record writer provides stats, get it from there instead of the serde + statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof + StatsProvidingRecordWriter; + // increment the CREATED_FILES counter + } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { + // Only set up the updater for insert. For update and delete we don't know unitl we see + // the row. + ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector; + int acidBucketNum = Integer.valueOf(taskId.substring(0, 6)); + fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), + acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); + } if (reporter != null) { reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), Operator.HIVECOUNTERCREATEDFILES, 1); @@ -598,27 +657,47 @@ public void processOp(Object row, int tag) throws HiveException { } - RecordWriter rowOutWriter = null; - if (row_count != null) { row_count.set(row_count.get() + 1); } - if (!multiFileSpray) { - rowOutWriter = rowOutWriters[0]; + int writerOffset = findWriterOffset(row); + // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same + // for a given operator branch prediction should work quite nicely on it. + // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we + // pass the row rather than recordValue. + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { + rowOutWriters[writerOffset].write(recordValue); + } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { + fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row); } else { - int keyHashCode = 0; - for (int i = 0; i < partitionEval.length; i++) { - Object o = partitionEval[i].evaluate(row); - keyHashCode = keyHashCode * 31 - + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + // TODO I suspect we could skip much of the stuff above this in the function in the case + // of update and delete. But I don't understand all of the side effects of the above + // code and don't want to skip over it yet. + + // Find the bucket id, and switch buckets if need to + ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector; + Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField); + int bucketNum = + bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); + if (fpaths.acidLastBucket != bucketNum) { + fpaths.acidLastBucket = bucketNum; + // Switch files + fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( + jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset], + rowInspector, reporter, 0); + LOG.debug("Created updater for bucket number " + bucketNum + " using file " + + fpaths.outPaths[fpaths.acidFileOffset]); + } + + if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { + fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row); + } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) { + fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row); + } else { + throw new HiveException("Unknown write type " + conf.getWriteType().toString()); } - key.setHashCode(keyHashCode); - int bucketNum = prtner.getBucket(key, null, totalFiles); - int idx = bucketMap.get(bucketNum); - rowOutWriter = rowOutWriters[idx]; } - rowOutWriter.write(recordValue); } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { @@ -627,6 +706,11 @@ public void processOp(Object row, int tag) throws HiveException { } protected boolean areAllTrue(boolean[] statsFromRW) { + // If we are doing an acid operation they will always all be true as RecordUpdaters always + // collect stats + if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID) { + return true; + } for(boolean b : statsFromRW) { if (!b) { return false; @@ -635,6 +719,23 @@ protected boolean areAllTrue(boolean[] statsFromRW) { return true; } + private int findWriterOffset(Object row) throws HiveException { + if (!multiFileSpray) { + return 0; + } else { + int keyHashCode = 0; + for (int i = 0; i < partitionEval.length; i++) { + Object o = partitionEval[i].evaluate(row); + keyHashCode = keyHashCode * 31 + + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); + } + key.setHashCode(keyHashCode); + int bucketNum = prtner.getBucket(key, null, totalFiles); + return bucketMap.get(bucketNum); + } + + } + /** * Lookup list bucketing path. * @param lbDirName @@ -733,8 +834,10 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive if (dpDir != null) { dpDir = appendToSource(lbDirName, dpDir); pathKey = dpDir; + int numericBucketNum = 0; if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { String buckNum = row.get(row.size() - 1); + numericBucketNum = Integer.valueOf(buckNum); taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum); pathKey = appendToSource(taskId, dpDir); } @@ -756,13 +859,18 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive // since we are closing the previous fsp's record writers, we need to see if we can get // stats from the record writer and store in the previous fsp that is cached if (conf.isGatherStats() && isCollectRWStats) { - RecordWriter outWriter = prevFsp.outWriters[0]; - if (outWriter != null) { - SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); - if (stats != null) { + SerDeStats stats = null; + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { + RecordWriter outWriter = prevFsp.outWriters[0]; + if (outWriter != null) { + stats = ((StatsProvidingRecordWriter) outWriter).getStats(); + } + } else if (prevFsp.updaters[0] != null) { + stats = prevFsp.updaters[0].getStats(); + } + if (stats != null) { prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); - } } } @@ -806,7 +914,12 @@ private String appendToSource(String appendDir, String srcDir) { // return the relative path corresponding to the row. // e.g., ds=2008-04-08/hr=11 private String getDynPartDirectory(List row, List dpColNames, int numDynParts) { - assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns"; + // I don't think this assertion is right. When getDynParts (which calls this function) is + // called from startGroup, startGroup puts both the dynamic partitioning columns and the + // bucket columns in the row. That means this will always fail when called from there. I + // suspect this was put here when getDynParts was only called from processOp, + // which does in fact only pass the dynamic partitioning columns. Thus, I'm removing it. Alan. + // assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns"; return FileUtils.makePartName(dpColNames, row); } @@ -832,6 +945,7 @@ public void startGroup() throws HiveException { @Override public void closeOp(boolean abort) throws HiveException { + if (!bDynParts && !filesCreated) { createBucketFiles(fsp); } @@ -849,13 +963,25 @@ public void closeOp(boolean abort) throws HiveException { // record writer already gathers the statistics, it can simply return the // accumulated statistics which will be aggregated in case of spray writers if (conf.isGatherStats() && isCollectRWStats) { - for (int idx = 0; idx < fsp.outWriters.length; idx++) { - RecordWriter outWriter = fsp.outWriters[idx]; - if (outWriter != null) { - SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); - if (stats != null) { - fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); - fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) { + for (int idx = 0; idx < fsp.outWriters.length; idx++) { + RecordWriter outWriter = fsp.outWriters[idx]; + if (outWriter != null) { + SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats(); + if (stats != null) { + fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + } + } + } + } else { + for (int i = 0; i < fsp.updaters.length; i++) { + if (fsp.updaters[i] != null) { + SerDeStats stats = fsp.updaters[i].getStats(); + if (stats != null) { + fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount()); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index f584926..b1c4441 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -164,6 +164,8 @@ static long parseBase(Path path) { return result; } + public enum Operation { NOT_ACID, INSERT, UPDATE, DELETE } + public static interface Directory { /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index c3a83d4..93e284e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; @@ -249,21 +250,8 @@ private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf, public static RecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class outputClass, FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { - boolean storagehandlerofhivepassthru = false; - HiveOutputFormat hiveOutputFormat; + HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); try { - if (tableInfo.getJobProperties() != null) { - if (tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) { - jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY)); - storagehandlerofhivepassthru = true; - } - } - if (storagehandlerofhivepassthru) { - hiveOutputFormat = ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(),jc); - } - else { - hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance(); - } boolean isCompressed = conf.getCompressed(); JobConf jc_output = jc; if (isCompressed) { @@ -299,6 +287,73 @@ public static RecordWriter getRecordWriter(JobConf jc, return null; } + private static HiveOutputFormat getHiveOutputFormat(JobConf jc, TableDesc tableInfo) + throws HiveException { + boolean storagehandlerofhivepassthru = false; + HiveOutputFormat hiveOutputFormat; + try { + if (tableInfo.getJobProperties() != null) { + if (tableInfo.getJobProperties().get( + HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) { + jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY, + tableInfo.getJobProperties() + .get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY)); + storagehandlerofhivepassthru = true; + } + } + if (storagehandlerofhivepassthru) { + return ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(), jc); + } else { + return tableInfo.getOutputFileFormatClass().newInstance(); + } + } catch (Exception e) { + throw new HiveException(e); + } + } + + public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket, + FileSinkDesc conf, Path outPath, + ObjectInspector inspector, + Reporter reporter, int rowIdColNum) + throws HiveException, IOException { + HiveOutputFormat hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); + AcidOutputFormat acidOutputFormat = null; + if (hiveOutputFormat instanceof AcidOutputFormat) { + acidOutputFormat = (AcidOutputFormat)hiveOutputFormat; + } else { + throw new HiveException("Unable to create RecordUpdater for HiveOutputFormat that does not " + + "implement AcidOutputFormat"); + } + // TODO not 100% sure about this. This call doesn't set the compression type in the conf + // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not + // sure if this is correct or not. + return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(), + bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum); + } + + + private static RecordUpdater getRecordUpdater(JobConf jc, + AcidOutputFormat acidOutputFormat, + boolean isCompressed, + long txnId, + int bucket, + ObjectInspector inspector, + Properties tableProp, + Path outPath, + Reporter reporter, + int rowIdColNum) throws IOException { + return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc) + .isCompressed(isCompressed) + .tableProperties(tableProp) + .reporter(reporter) + .writingBase(false) + .minimumTransactionId(txnId) + .maximumTransactionId(txnId) + .bucket(bucket) + .inspector(inspector) + .recordIdColumn(rowIdColNum)); + } + public static PartitionDesc getPartitionDescFromPathRecursively( Map pathToPartitionInfo, Path dir, Map, Map> cacheMap) diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 301dde5..8b25c2b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidUtils; /** * FileSinkDesc. @@ -84,6 +85,10 @@ private boolean statsCollectRawDataSize; + // Record what type of write this is. Default is non-ACID (ie old style). + private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; + private long txnId = 0; // transaction id for this operation + public FileSinkDesc() { } @@ -137,6 +142,8 @@ public Object clone() throws CloneNotSupportedException { ret.setMaxStatsKeyPrefixLength(maxStatsKeyPrefixLength); ret.setStatsCollectRawDataSize(statsCollectRawDataSize); ret.setDpSortState(dpSortState); + ret.setWriteType(writeType); + ret.setTransactionId(txnId); return (Object) ret; } @@ -398,4 +405,20 @@ public DPSortState getDpSortState() { public void setDpSortState(DPSortState dpSortState) { this.dpSortState = dpSortState; } + + public void setWriteType(AcidUtils.Operation type) { + writeType = type; + } + + public AcidUtils.Operation getWriteType() { + return writeType; + } + + public void setTransactionId(long id) { + txnId = id; + } + + public long getTransactionId() { + return txnId; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java new file mode 100644 index 0000000..3fb770f --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -0,0 +1,757 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnListImpl; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Tests for {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator} + */ +public class TestFileSinkOperator { + private static String PARTCOL_NAME = "partval"; + static final private Log LOG = LogFactory.getLog(TestFileSinkOperator.class.getName()); + + private static File tmpdir; + private static TableDesc nonAcidTableDescriptor; + private static TableDesc acidTableDescriptor; + private static ObjectInspector inspector; + private static List rows; + private static ValidTxnList txnList; + + private Path basePath; + private JobConf jc; + + @BeforeClass + public static void classSetup() { + Properties properties = new Properties(); + properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName()); + nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); + properties = new Properties(properties); + properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1"); + acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties); + + tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") + + "testFileSinkOperator"); + tmpdir.mkdir(); + tmpdir.deleteOnExit(); + txnList = new ValidTxnListImpl(new long[]{}, 2); + } + + @Test + public void testNonAcidWrite() throws Exception { + setBasePath("write"); + setupData(DataFormat.SIMPLE); + FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0); + processRows(op); + confirmOutput(); + } + + @Test + public void testInsert() throws Exception { + setBasePath("insert"); + setupData(DataFormat.SIMPLE); + FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1); + processRows(op); + Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + confirmOutput(); + } + + @Test + public void testUpdate() throws Exception { + setBasePath("update"); + setupData(DataFormat.WITH_RECORD_ID); + FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2); + processRows(op); + Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + confirmOutput(); + } + + @Test + public void testDelete() throws Exception { + setBasePath("delete"); + setupData(DataFormat.WITH_RECORD_ID); + FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2); + processRows(op); + Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + confirmOutput(); + } + + @Test + public void testNonAcidDynamicPartitioning() throws Exception { + setBasePath("writeDP"); + setupData(DataFormat.WITH_PARTITION_VALUE); + FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0); + processRows(op); + confirmOutput(); + } + + + @Test + public void testInsertDynamicPartitioning() throws Exception { + setBasePath("insertDP"); + setupData(DataFormat.WITH_PARTITION_VALUE); + FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, true, 1); + processRows(op); + // We only expect 5 here because we'll get whichever of the partitions published its stats + // last. + Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + confirmOutput(); + } + + @Test + public void testUpdateDynamicPartitioning() throws Exception { + setBasePath("updateDP"); + setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE); + FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2); + processRows(op); + Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + confirmOutput(); + } + + @Test + public void testDeleteDynamicPartitioning() throws Exception { + setBasePath("deleteDP"); + setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE); + FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2); + processRows(op); + // We only expect -5 here because we'll get whichever of the partitions published its stats + // last. + Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT)); + confirmOutput(); + } + + + @Before + public void setup() throws Exception { + jc = new JobConf(); + jc.set(StatsSetupConst.STATS_TMP_LOC, File.createTempFile("TestFileSinkOperator", + "stats").getPath()); + jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER.varname, + TFSOStatsPublisher.class.getName()); + jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR.varname, + TFSOStatsAggregator.class.getName()); + jc.set(HiveConf.ConfVars.HIVESTATSDBCLASS.varname, "custom"); + } + + private void setBasePath(String testName) { + basePath = new Path(new File(tmpdir, testName).getPath()); + + } + + private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE, + WITH_RECORD_ID_AND_PARTITION_VALUE}; + + private void setupData(DataFormat format) { + + // Build object inspector + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + rows = new ArrayList(); + + switch (format) { + case SIMPLE: + // Build rows + for (int i = 0; i < 10; i++) { + rows.add( + new TFSORow( + new Text("mary had a little lamb") + ) + ); + } + break; + + case WITH_RECORD_ID: + for (int i = 0; i < 10; i++) { + rows.add( + new TFSORow( + new Text("its fleect was white as snow"), + new RecordIdentifier(1, 1, i) + ) + ); + } + break; + + case WITH_PARTITION_VALUE: + for (int i = 0; i < 10; i++) { + rows.add( + new TFSORow( + new Text("its fleect was white as snow"), + (i < 5) ? new Text("Monday") : new Text("Tuesday") + ) + ); + } + break; + + case WITH_RECORD_ID_AND_PARTITION_VALUE: + for (int i = 0; i < 10; i++) { + rows.add( + new TFSORow( + new Text("its fleect was white as snow"), + (i < 5) ? new Text("Monday") : new Text("Tuesday"), + new RecordIdentifier(1, 1, i) + ) + ); + } + break; + + default: + throw new RuntimeException("Unknown option!"); + } + } + + private FileSinkOperator getFileSink(AcidUtils.Operation writeType, + boolean dynamic, + long txnId) throws IOException, HiveException { + TableDesc tableDesc = null; + switch (writeType) { + case DELETE: + case UPDATE: + case INSERT: + tableDesc = acidTableDescriptor; + break; + + case NOT_ACID: + tableDesc = nonAcidTableDescriptor; + break; + } + FileSinkDesc desc = null; + if (dynamic) { + ArrayList partCols = new ArrayList(1); + partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true)); + Map partColMap= new LinkedHashMap(1); + partColMap.put(PARTCOL_NAME, null); + DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); + Map partColNames = new HashMap(1); + partColNames.put(PARTCOL_NAME, PARTCOL_NAME); + dpCtx.setInputToDPCols(partColNames); + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx); + } else { + desc = new FileSinkDesc(basePath, tableDesc, false); + } + desc.setWriteType(writeType); + desc.setGatherStats(true); + if (txnId > 0) desc.setTransactionId(txnId); + if (writeType != AcidUtils.Operation.NOT_ACID) desc.setTransactionId(1L); + + FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class); + op.setConf(desc); + op.initialize(jc, new ObjectInspector[]{inspector}); + return op; + } + + private void processRows(FileSinkOperator op) throws HiveException { + for (TFSORow r : rows) op.processOp(r, 0); + op.jobCloseOp(jc, true); + op.close(false); + } + + private void confirmOutput() throws IOException, SerDeException { + Path[] paths = findFilesInBasePath(); + TFSOInputFormat input = new TFSOInputFormat(); + FileInputFormat.setInputPaths(jc, paths); + + InputSplit[] splits = input.getSplits(jc, 1); + RecordReader reader = input.getRecordReader(splits[0], jc, + Mockito.mock(Reporter.class)); + NullWritable key = reader.createKey(); + TFSORow value = reader.createValue(); + List results = new ArrayList(rows.size()); + List sortedRows = new ArrayList(rows.size()); + for (int i = 0; i < rows.size(); i++) { + Assert.assertTrue(reader.next(key, value)); + results.add(new TFSORow(value)); + sortedRows.add(new TFSORow(rows.get(i))); + } + Assert.assertFalse(reader.next(key, value)); + Collections.sort(results); + Collections.sort(sortedRows); + for (int i = 0; i < rows.size(); i++) { + Assert.assertTrue(sortedRows.get(i).equals(results.get(i))); + } + + } + + private Path[] findFilesInBasePath() throws IOException { + Path parent = basePath.getParent(); + String last = basePath.getName(); + Path tmpPath = new Path(parent, "_tmp." + last); + FileSystem fs = basePath.getFileSystem(jc); + List paths = new ArrayList(); + recurseOnPath(tmpPath, fs, paths); + return paths.toArray(new Path[paths.size()]); + } + + private void recurseOnPath(Path p, FileSystem fs, List paths) throws IOException { + if (fs.getFileStatus(p).isDir()) { + FileStatus[] stats = fs.listStatus(p); + for (FileStatus stat : stats) recurseOnPath(stat.getPath(), fs, paths); + } else { + paths.add(p); + } + } + + private static class TFSORow implements WritableComparable { + private RecordIdentifier recId; + private Text data; + private Text partVal; + + TFSORow() { + this(null, null, null); + } + + TFSORow(Text t) { + this(t, null, null); + } + + TFSORow(Text t, Text pv) { + this(t, pv, null); + } + + TFSORow(Text t, RecordIdentifier ri) { + this(t, null, ri); + } + + TFSORow(Text t, Text pv, RecordIdentifier ri) { + data = t; + partVal = pv; + recId = ri; + + } + + TFSORow(TFSORow other) { + this(other.data, other.partVal, other.recId); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + data.write(dataOutput); + if (partVal == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + partVal.write(dataOutput); + } + if (recId == null) { + dataOutput.writeBoolean(false); + } else { + dataOutput.writeBoolean(true); + recId.write(dataOutput); + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + data = new Text(); + data.readFields(dataInput); + boolean notNull = dataInput.readBoolean(); + if (notNull) { + partVal = new Text(); + partVal.readFields(dataInput); + } + notNull = dataInput.readBoolean(); + if (notNull) { + recId = new RecordIdentifier(); + recId.readFields(dataInput); + } + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TFSORow) { + TFSORow other = (TFSORow) obj; + if (data == null && other.data == null) return checkPartVal(other); + else if (data == null) return false; + else if (data.equals(other.data)) return checkPartVal(other); + else return false; + } else { + return false; + } + } + + private boolean checkPartVal(TFSORow other) { + if (partVal == null && other.partVal == null) return checkRecId(other); + else if (partVal == null) return false; + else if (partVal.equals(other.partVal)) return checkRecId(other); + else return false; + } + + private boolean checkRecId(TFSORow other) { + if (recId == null && other.recId == null) return true; + else if (recId == null) return false; + else return recId.equals(other.recId); + } + + @Override + public int compareTo(TFSORow other) { + if (recId == null && other.recId == null) { + return comparePartVal(other); + } else if (recId == null) { + return -1; + } else { + int rc = recId.compareTo(other.recId); + if (rc == 0) return comparePartVal(other); + else return rc; + } + } + + private int comparePartVal(TFSORow other) { + if (partVal == null && other.partVal == null) { + return compareData(other); + } else if (partVal == null) { + return -1; + } else { + int rc = partVal.compareTo(other.partVal); + if (rc == 0) return compareData(other); + else return rc; + } + } + + private int compareData(TFSORow other) { + if (data == null && other.data == null) return 0; + else if (data == null) return -1; + else return data.compareTo(other.data); + } + } + + private static class TFSOInputFormat extends FileInputFormat + implements AcidInputFormat { + + FSDataInputStream in[] = null; + int readingFrom = -1; + + @Override + public RecordReader getRecordReader( + InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException { + if (in == null) { + Path paths[] = FileInputFormat.getInputPaths(entries); + in = new FSDataInputStream[paths.length]; + FileSystem fs = paths[0].getFileSystem(entries); + for (int i = 0; i < paths.length; i++) { + in[i] = fs.open(paths[i]); + } + readingFrom = 0; + } + return new RecordReader() { + + @Override + public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws + IOException { + try { + tfsoRecord.readFields(in[readingFrom]); + return true; + } catch (EOFException e) { + in[readingFrom].close(); + if (++readingFrom >= in.length) return false; + else return next(nullWritable, tfsoRecord); + } + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public TFSORow createValue() { + return new TFSORow(); + } + + @Override + public long getPos() throws IOException { + return 0L; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0.0f; + } + }; + } + + @Override + public RowReader getReader(InputSplit split, + Options options) throws + IOException { + return null; + } + + @Override + public RawReader getRawReader(Configuration conf, + boolean collapseEvents, + int bucket, + ValidTxnList validTxnList, + Path baseDirectory, + Path[] deltaDirectory) throws + IOException { + return null; + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList files) throws + IOException { + return false; + } + } + + public static class TFSOOutputFormat extends FileOutputFormat + implements AcidOutputFormat { + List records = new ArrayList(); + long numRecordsAdded = 0; + FSDataOutputStream out = null; + + @Override + public RecordUpdater getRecordUpdater(final Path path, final Options options) throws + IOException { + + final StructObjectInspector inspector = (StructObjectInspector)options.getInspector(); + return new RecordUpdater() { + @Override + public void insert(long currentTransaction, Object row) throws IOException { + addRow(row); + numRecordsAdded++; + } + + @Override + public void update(long currentTransaction, Object row) throws IOException { + addRow(row); + } + + @Override + public void delete(long currentTransaction, Object row) throws IOException { + addRow(row); + numRecordsAdded--; + } + + private void addRow(Object row) { + assert row instanceof TFSORow : "Expected TFSORow but got " + + row.getClass().getName(); + records.add((TFSORow)row); + } + + @Override + public void flush() throws IOException { + if (out == null) { + FileSystem fs = path.getFileSystem(options.getConfiguration()); + out = fs.create(path); + } + for (TFSORow r : records) r.write(out); + records.clear(); + out.flush(); + } + + @Override + public void close(boolean abort) throws IOException { + flush(); + out.close(); + } + + @Override + public SerDeStats getStats() { + SerDeStats stats = new SerDeStats(); + stats.setRowCount(numRecordsAdded); + return stats; + } + }; + } + + @Override + public FileSinkOperator.RecordWriter getRawRecordWriter(Path path, + Options options) throws + IOException { + return null; + } + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter(final JobConf jc, + final Path finalOutPath, + Class valueClass, + boolean isCompressed, + Properties tableProperties, + Progressable progress) + throws IOException { + return new FileSinkOperator.RecordWriter() { + @Override + public void write(Writable w) throws IOException { + Assert.assertTrue(w instanceof TFSORow); + records.add((TFSORow) w); + } + + @Override + public void close(boolean abort) throws IOException { + if (out == null) { + FileSystem fs = finalOutPath.getFileSystem(jc); + out = fs.create(finalOutPath); + } + for (TFSORow r : records) r.write(out); + records.clear(); + out.flush(); + out.close(); + } + }; + } + + @Override + public RecordWriter getRecordWriter( + FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws + IOException { + return null; + } + + @Override + public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException { + + } + } + + public static class TFSOSerDe implements SerDe { + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + + } + + @Override + public Class getSerializedClass() { + return TFSORow.class; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + assert obj instanceof TFSORow : "Expected TFSORow or decendent, got " + + obj.getClass().getName(); + return (TFSORow)obj; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + assert blob instanceof TFSORow : "Expected TFSORow or decendent, got " + + blob.getClass().getName(); + return blob; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return null; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } + } + + public static class TFSOStatsPublisher implements StatsPublisher { + static Map stats; + + @Override + public boolean init(Configuration hconf) { + return true; + } + + @Override + public boolean connect(Configuration hconf) { + return true; + } + + @Override + public boolean publishStat(String fileID, Map stats) { + this.stats = stats; + return true; + } + + @Override + public boolean closeConnection() { + return true; + } + } + + public static class TFSOStatsAggregator implements StatsAggregator { + + @Override + public boolean connect(Configuration hconf, Task sourceTask) { + return true; + } + + @Override + public String aggregateStats(String keyPrefix, String statType) { + return null; + } + + @Override + public boolean closeConnection() { + return true; + } + + @Override + public boolean cleanUp(String keyPrefix) { + return true; + } + } +}