diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 8c7a78b9c5..79e41d9178 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -388,6 +388,10 @@ public void addToStat(String statType, long amount) { * * A new FSP is created for each partition, so this only requires the bucket numbering and that * is mapped in directly as an index. + * + * This relies on ReduceSinkOperator to shuffle update/delete rows by + * UDFToInteger(RecordIdentifier), i.e. by writerId in ROW__ID. + * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer#getPartitionColsFromBucketColsForUpdateDelete(Operator, boolean)} */ public int createDynamicBucket(int bucketNum) { // this assumes all paths are bucket names (which means no lookup is needed) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 6be0c74f4e..8cabf960db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -1122,7 +1122,7 @@ public Options clone() { } continue; } - for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) { + for (Path deltaFile : getDeltaFiles(delta, bucket, mergerOptions)) { FileSystem fs = deltaFile.getFileSystem(conf); if(!fs.exists(deltaFile)) { /** @@ -1262,53 +1262,12 @@ private Options modifyForNonAcidSchemaRead(Options baseOptions, long writeId, Pa * This determines the set of {@link ReaderPairAcid} to create for a given delta/. * For unbucketed tables {@code bucket} can be thought of as a write tranche. */ - static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf, - Options mergerOptions, boolean isBucketed) throws IOException { - if(isBucketed) { - /** - * for bucketed tables (for now) we always trust that the N in bucketN file name means that - * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N. This - * means that a delete event in bucketN can only modify an insert in another bucketN file for - * the same N. (Down the road we may trust it only in certain delta dirs) - * - * Compactor takes all types of deltas for a given bucket. For regular read, any file that - * contains (only) insert events is treated as base and only - * delete_delta/ are treated as deltas. - */ - assert (!mergerOptions.isCompacting && - deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) - ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory; - Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket); - return new Path[]{deltaFile}; - } - /** - * For unbucketed tables insert events are also stored in bucketN files but here N is - * the writer ID. We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()} - * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN. - * Thus we always have to take all files in a delete_delta. - * For regular read, any file that has (only) insert events is treated as base so - * {@link deltaDirectory} can only be delete_delta and so we take all files in it. - * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for - * a given N. - */ - if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { - //it's not wrong to take all delete events for bucketed tables but it's more efficient - //to only take those that belong to the 'bucket' assuming we trust the file name - //un-bucketed table - get all files - FileSystem fs = deltaDirectory.getFileSystem(conf); - FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter); - Path[] deltaFiles = new Path[dataFiles.length]; - int i = 0; - for (FileStatus stat : dataFiles) { - deltaFiles[i++] = stat.getPath(); - }//todo: need a test where we actually have more than 1 file - return deltaFiles; - } - //if here it must be delta_x_y - insert events only, so we must be compacting - assert mergerOptions.isCompacting() : "Expected to be called as part of compaction"; - Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket); - return new Path[] {deltaFile}; - + static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Options mergerOptions) { + assert (!mergerOptions.isCompacting && + deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX) + ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory + + "(isCompacting=" + mergerOptions.isCompacting() + ")"; + return new Path[] {AcidUtils.createBucketFile(deltaDirectory, bucket)}; } @VisibleForTesting diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 66280b2da1..2f809de2fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -1420,19 +1420,8 @@ private void checkBucketId() throws IOException { /** * Whenever we are reading a batch, we must ensure that all the records in the batch * have the same bucket id as the bucket id of the split. If not, throw exception. - * NOTE: this assertion might not hold, once virtual bucketing is in place. However, - * it should be simple to fix that case. Just replace check for bucket equality with - * a check for valid bucket mapping. Until virtual bucketing is added, it means - * either the split computation got messed up or we found some corrupted records. */ private void checkBucketId(int bucketPropertyFromRecord) throws IOException { - if(!isBucketedTable) { - /** - * in this case a file inside a delete_delta_x_y/bucketN may contain any value for - * bucketId in {@link RecordIdentifier#getBucketProperty()} - */ - return; - } int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord) .decodeWriterId(bucketPropertyFromRecord); if(bucketIdFromRecord != bucketForSplit) { @@ -1534,7 +1523,7 @@ public int compareTo(CompressedOwid other) { for (Path deleteDeltaDir : deleteDeltaDirs) { FileSystem fs = deleteDeltaDir.getFileSystem(conf); Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket, - conf, new OrcRawRecordMerger.Options().isCompacting(false), isBucketedTable); + new OrcRawRecordMerger.Options().isCompacting(false)); for (Path deleteDeltaFile : deleteDeltaFiles) { // NOTE: Calling last flush length below is more for future-proofing when we have // streaming deletes. But currently we don't support streaming deletes, and this can