diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index c2319bb..c08986a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -370,6 +371,20 @@ public void addToStat(String statType, long amount) { public Collection getStoredStats() { return stat.getStoredStats(); } + + public int createDynamicBucket(int bucketNum) { + // this assumes all paths are bucket names (which means no lookup is needed) + int writerOffset = bucketNum; + if (updaters.length <= writerOffset) { + this.updaters = Arrays.copyOf(updaters, writerOffset + 1); + this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1); + this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1); + String bucketName = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); + this.finalPaths[writerOffset] = new Path(bDynParts ? parent : buildTmpPath(), bucketName); + fpaths.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(), bucketName); + } + return writerOffset; + } } // class FSPaths private static final long serialVersionUID = 1L; @@ -976,31 +991,12 @@ public void process(Object row, int tag) throws HiveException { " from data but no mapping in 'bucketMap'." + extraMsg); } writerOffset = bucketMap.get(bucketNum); + } else if (!isBucketed) { + writerOffset = fpaths.createDynamicBucket(bucketNum); } if (fpaths.updaters[writerOffset] == null) { - /*data for delete commands always have ROW__ID which implies that the bucket ID - * for each row is known. RecordUpdater creates bucket_N file based on 'bucketNum' thus - * delete events always land in the proper bucket_N file. This could even handle - * cases where multiple writers are writing bucket_N file for the same N in which case - * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location. The - * reset of acid (read path) doesn't know how to handle copy_N files except for 'original' - * files (HIVE-16177)*/ - int writerId = -1; - if(!isBucketed) { - assert !multiFileSpray; - assert writerOffset == 0; - /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can - * be written to the same bucketN file. - * N in this case is writerId and there is no relationship - * between the file name and any property of the data in it. Inserts will be written - * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed - * contain writerId=N. - * Since taskId is unique (at least per statementId and thus - * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/ - writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId)); - } fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater( - jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf, + jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset], rowInspector, reporter, 0); if (LOG.isDebugEnabled()) { LOG.debug("Created updater for bucket number " + bucketNum + " using file " + diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 00957ea..aaae560 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6865,6 +6865,12 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true); } } + else { + if(updating(dest) || deleting(dest)) { + partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true); + enforceBucketing = true; + } + } if ((dest_tab.getSortCols() != null) && (dest_tab.getSortCols().size() > 0)) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index f071531..cd1b025 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -96,6 +96,13 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000001_0000001_0000/bucket_00001")); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + rs = runStatementOnDriver("explain update nobuckets set c3 = 17 where c3 in(0,1)"); + LOG.warn("Query Plan: "); + for (String s : rs) { + LOG.warn(s); + } + runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)"); rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID"); LOG.warn("after update");