diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index c2319bb..c08986a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -370,6 +371,20 @@ public void addToStat(String statType, long amount) { public Collection getStoredStats() { return stat.getStoredStats(); } + + public int createDynamicBucket(int bucketNum) { + // this assumes all paths are bucket names (which means no lookup is needed) + int writerOffset = bucketNum; + if (updaters.length <= writerOffset) { + this.updaters = Arrays.copyOf(updaters, writerOffset + 1); + this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1); + this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1); + String bucketName = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); + this.finalPaths[writerOffset] = new Path(bDynParts ? parent : buildTmpPath(), bucketName); + fpaths.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), bucketName); + } + return writerOffset; + } } // class FSPaths private static final long serialVersionUID = 1L; @@ -976,31 +991,12 @@ public void process(Object row, int tag) throws HiveException { " from data but no mapping in 'bucketMap'." + extraMsg); } writerOffset = bucketMap.get(bucketNum); + } else if (!isBucketed) { + writerOffset = fpaths.createDynamicBucket(bucketNum); } if (fpaths.updaters[writerOffset] == null) { - /*data for delete commands always have ROW__ID which implies that the bucket ID - * for each row is known. RecordUpdater creates bucket_N file based on 'bucketNum' thus - * delete events always land in the proper bucket_N file. This could even handle - * cases where multiple writers are writing bucket_N file for the same N in which case - * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location. The - * reset of acid (read path) doesn't know how to handle copy_N files except for 'original' - * files (HIVE-16177)*/ - int writerId = -1; - if(!isBucketed) { - assert !multiFileSpray; - assert writerOffset == 0; - /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can - * be written to the same bucketN file. - * N in this case is writerId and there is no relationship - * between the file name and any property of the data in it. Inserts will be written - * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed - * contain writerId=N. - * Since taskId is unique (at least per statementId and thus - * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/ - writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId)); - } fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf, + jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset], rowInspector, reporter, 0); if (LOG.isDebugEnabled()) { LOG.debug("Created updater for bucket number " + bucketNum + " using file " + diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c2bcedd..b0a21e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6865,6 +6865,12 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true); } } + else { + if(updating(dest) || deleting(dest)) { + partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true); + enforceBucketing = true; + } + } if ((dest_tab.getSortCols() != null) && (dest_tab.getSortCols().size() > 0)) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index f071531..8469454 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -96,6 +96,13 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001")); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + rs = runStatementOnDriver("explain update nobuckets set c3 = 17 where c3 in(0,1)"); + LOG.warn("Query Plan: "); + for (String s : rs) { + LOG.warn(s); + } + runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)"); rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID"); LOG.warn("after update"); @@ -106,18 +113,21 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00000")); Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001")); - //so update has 1 writer which creates bucket0 where both new rows land + //so update has 1 writer, but which creates buckets where the new rows land Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00000")); + // update for "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t1\t" + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000002_0000002_0000/bucket_00001")); Set expectedFiles = new HashSet<>(); - //both delete events land in a single bucket0. Each has a different ROW__ID.bucketId value (even writerId in it is different) + //both delete events land in corresponding buckets to the original row-ids expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00000"); + expectedFiles.add("ts/delete_delta_0000002_0000002_0000/bucket_00001"); expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00000"); expectedFiles.add("nobuckets/delta_0000001_0000001_0000/bucket_00001"); expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00000"); + expectedFiles.add("nobuckets/delta_0000002_0000002_0000/bucket_00001"); //check that we get the right files on disk assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets"); //todo: it would be nice to check the contents of the files... could use orc.FileDump - it has @@ -136,6 +146,7 @@ public void testNoBuckets() throws Exception { │   └── bucket_00001 ├── delete_delta_0000002_0000002_0000 │   └── bucket_00000 +| └── bucket_00001 ├── delta_0000001_0000001_0000 │   ├── bucket_00000 │   └── bucket_00001 @@ -146,16 +157,18 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nobuckets/base_0000002/bucket_00000")); Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17\t")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nobuckets/base_0000002/bucket_00000")); - Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t17\t")); - Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000002/bucket_00000")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/base_0000002/bucket_00001")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17\t")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/base_0000002/bucket_00001")); expectedFiles.clear(); expectedFiles.add("delete_delta_0000002_0000002_0000/bucket_00000"); + expectedFiles.add("delete_delta_0000002_0000002_0000/bucket_00001"); expectedFiles.add("uckets/delta_0000001_0000001_0000/bucket_00000"); expectedFiles.add("uckets/delta_0000001_0000001_0000/bucket_00001"); expectedFiles.add("uckets/delta_0000002_0000002_0000/bucket_00000"); + expectedFiles.add("uckets/delta_0000002_0000002_0000/bucket_00001"); expectedFiles.add("/warehouse/nobuckets/base_0000002/bucket_00000"); expectedFiles.add("/warehouse/nobuckets/base_0000002/bucket_00001"); assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/nobuckets"); @@ -375,7 +388,8 @@ logical bucket (tranche) {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, - {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00000"}, + // update for "{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t60\t80" + {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0000/bucket_00001"}, }; rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); checkExpected(rs, expected3,"after converting to acid (no compaction with updates)"); @@ -403,8 +417,8 @@ logical bucket (tranche) "warehouse/t/base_10000002/bucket_00000"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/base_10000002/bucket_00000"}, - {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t60\t88", - "warehouse/t/base_10000002/bucket_00000"}, + {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88", + "warehouse/t/base_10000002/bucket_00001"}, }; checkExpected(rs, expected4,"after major compact"); } diff --git ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out index 80bbba4..f9a17a5 100644 --- ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out +++ ql/src/test/results/clientpositive/llap/acid_no_buckets.q.out @@ -130,6 +130,7 @@ STAGE PLANS: Reduce Output Operator key expressions: _col0 (type: struct) sort order: + + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 500 Data size: 308500 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string) Execution mode: llap @@ -312,6 +313,7 @@ STAGE PLANS: Reduce Output Operator key expressions: _col0 (type: struct) sort order: + + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 101 Data size: 44844 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: string), _col2 (type: string) Execution mode: llap @@ -1138,6 +1140,7 @@ STAGE PLANS: keyColumnNums: [4] native: true nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + partitionColumnNums: [5] valueColumnNums: [0, 6, 2] Execution mode: vectorized, llap LLAP IO: may be used (ACID table) @@ -1336,6 +1339,7 @@ STAGE PLANS: keyColumnNums: [4] native: true nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + partitionColumnNums: [5] valueColumnNums: [2, 3] Execution mode: vectorized, llap LLAP IO: may be used (ACID table) @@ -1355,7 +1359,7 @@ STAGE PLANS: neededVirtualColumns: [ROWID] partitionColumnCount: 2 partitionColumns: ds:string, hr:string - scratchColumnTypeNames: [] + scratchColumnTypeNames: [bigint] Reducer 2 Execution mode: vectorized, llap Reduce Vectorization: diff --git ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out index 2f34aae..34809ec 100644 --- ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out +++ ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out @@ -788,6 +788,7 @@ STAGE PLANS: Reduce Output Operator key expressions: _col0 (type: struct) sort order: + + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: decimal(10,2)), _col3 (type: bigint) Reducer 3 @@ -1598,6 +1599,7 @@ STAGE PLANS: Reduce Output Operator key expressions: _col0 (type: struct) sort order: + + Map-reduce partition columns: UDFToInteger(_col0) (type: int) Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: int), _col2 (type: decimal(10,2)), _col3 (type: bigint) Reducer 3