diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 8c7a78b9c5..cadc76988c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -388,6 +388,9 @@ public void addToStat(String statType, long amount) { * * A new FSP is created for each partition, so this only requires the bucket numbering and that * is mapped in directly as an index. + * + * This relies on ReduceSinkOperator to shuffle update/delete rows by + * UDFToInteger(RecordIdentifier), i.e. by writerId in ROW__ID. */ public int createDynamicBucket(int bucketNum) { // this assumes all paths are bucket names (which means no lookup is needed) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 6be0c74f4e..5f87d52ba4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1292,6 +1292,10 @@ private Options modifyForNonAcidSchemaRead(Options baseOptions, long writeId, Pa * a given N. */ if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket); + if(true) { + return new Path[]{deltaFile}; + } //it's not wrong to take all delete events for bucketed tables but it's more efficient //to only take those that belong to the 'bucket' assuming we trust the file name //un-bucketed table - get all files diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 66280b2da1..29d3307373 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -1431,7 +1431,7 @@ private void checkBucketId(int bucketPropertyFromRecord) throws IOException { * in this case a file inside a delete_delta_x_y/bucketN may contain any value for * bucketId in {@link RecordIdentifier#getBucketProperty()} */ - return; + //return; } int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord) .decodeWriterId(bucketPropertyFromRecord);