diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index e2d40055dc23f09b4b9ae7634a54a30054c04ccb..553113e9aae0fb7224a0a9473c4c91df7afc2e70 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -148,6 +148,8 @@ RecordWriter[] outWriters; RecordUpdater[] updaters; Stat stat; + int acidLastBucket = -1; + int acidFileOffset = -1; public FSPaths(Path specPath) { tmpPath = Utilities.toTempPath(specPath); @@ -761,20 +763,22 @@ public void process(Object row, int tag) throws HiveException { Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField); int bucketNum = bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); - if(fpaths.updaters[bucketMap.get(bucketNum)] == null){ - fpaths.updaters[bucketMap.get(bucketNum)] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[bucketMap.get(bucketNum)], + if (fpaths.acidLastBucket != bucketNum) { + fpaths.acidLastBucket = bucketNum; + // Switch files + fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater( + jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset], rowInspector, reporter, 0); if (isDebugEnabled) { LOG.debug("Created updater for bucket number " + bucketNum + " using file " + - fpaths.outPaths[bucketMap.get(bucketNum)]); + fpaths.outPaths[fpaths.acidFileOffset]); } } if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { - fpaths.updaters[bucketMap.get(bucketNum)].update(conf.getTransactionId(), row); + fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row); } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) { - fpaths.updaters[bucketMap.get(bucketNum)].delete(conf.getTransactionId(), row); + fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row); } else { throw new HiveException("Unknown write type " + conf.getWriteType().toString()); }