diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 5d588390bfa00a956f4094310819204371f81122..19f0badb1568dd7bc27cab097f4d75224a2c6e98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1872,7 +1872,7 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, if (fs.exists(tmpPath)) { // remove any tmp file or double-committed output files ArrayList emptyBuckets = - Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx); + Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx, conf, hconf); // create empty buckets if necessary if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf, reporter); @@ -1941,7 +1941,7 @@ private static void createEmptyBuckets(Configuration hconf, ArrayList pa * Remove all temporary files and duplicate (double-committed) files from a given directory. */ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException { - removeTempOrDuplicateFiles(fs, path, null); + removeTempOrDuplicateFiles(fs, path, null,null,null); } /** @@ -1950,15 +1950,15 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * @return a list of path names corresponding to should-be-created empty buckets. */ public static ArrayList removeTempOrDuplicateFiles(FileSystem fs, Path path, - DynamicPartitionCtx dpCtx) throws IOException { + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { if (path == null) { return null; } ArrayList result = new ArrayList(); + HashMap taskIDToFile = null; if (dpCtx != null) { FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs); - HashMap taskIDToFile = null; for (int i = 0; i < parts.length; ++i) { assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() @@ -1994,8 +1994,24 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I } } else { FileStatus[] items = fs.listStatus(path); - removeTempOrDuplicateFiles(items, fs); + taskIDToFile = removeTempOrDuplicateFiles(items, fs); + if(taskIDToFile != null && conf != null && conf.getTable() != null + && conf.getTable().getNumBuckets() > 0 + && (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEENFORCEBUCKETING))) { + // get the missing buckets and generate empty buckets for non-dynamic partition + String taskID1 = taskIDToFile.keySet().iterator().next(); + Path bucketPath = taskIDToFile.values().iterator().next().getPath(); + for (int j = 0; j < conf.getTable().getNumBuckets(); ++j) { + String taskID2 = replaceTaskId(taskID1, j); + if (!taskIDToFile.containsKey(taskID2)) { + // create empty bucket, file name should be derived from taskID2 + String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j); + result.add(path2); + } + } + } } + return result; } diff --git a/ql/src/test/queries/clientpositive/insertoverwrite_bucket.q b/ql/src/test/queries/clientpositive/insertoverwrite_bucket.q new file mode 100644 index 0000000000000000000000000000000000000000..d939710ac0961379961d8b2a8a897778bb3177be --- /dev/null +++ b/ql/src/test/queries/clientpositive/insertoverwrite_bucket.q @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS bucketinput( +data string +) +ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; +CREATE TABLE IF NOT EXISTS bucketoutput1( +data string +)CLUSTERED BY(data) +INTO 2 BUCKETS +ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; +CREATE TABLE IF NOT EXISTS bucketoutput2( +data string +)CLUSTERED BY(data) +INTO 2 BUCKETS +ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; +insert into table bucketinput values ("firstinsert1"); +insert into table bucketinput values ("firstinsert2"); +insert into table bucketinput values ("firstinsert3"); +set hive.enforce.bucketing = true; +set hive.enforce.sorting=true; +insert overwrite table bucketoutput1 select * from bucketinput where data like 'first%'; +set hive.auto.convert.sortmerge.join=true; +set hive.optimize.bucketmapjoin = true; +set hive.optimize.bucketmapjoin.sortedmerge = true; +select * from bucketoutput1 a join bucketoutput2 b on (a.data=b.data); +drop table buckettestinput; +drop table buckettestoutput1; +drop table buckettestoutput2; + diff --git a/ql/src/test/results/clientpositive/insertoverwrite_bucket.q.out b/ql/src/test/results/clientpositive/insertoverwrite_bucket.q.out new file mode 100644 index 0000000000000000000000000000000000000000..9b7b85df3c7cb437c0631968742850e62b7001f6 --- /dev/null +++ b/ql/src/test/results/clientpositive/insertoverwrite_bucket.q.out @@ -0,0 +1,104 @@ +PREHOOK: query: CREATE TABLE IF NOT EXISTS bucketinput( +data string +) +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucketinput +POSTHOOK: query: CREATE TABLE IF NOT EXISTS bucketinput( +data string +) +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucketinput +PREHOOK: query: CREATE TABLE IF NOT EXISTS bucketoutput1( +data string +)CLUSTERED BY(data) +INTO 2 BUCKETS +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucketoutput1 +POSTHOOK: query: CREATE TABLE IF NOT EXISTS bucketoutput1( +data string +)CLUSTERED BY(data) +INTO 2 BUCKETS +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucketoutput1 +PREHOOK: query: CREATE TABLE IF NOT EXISTS bucketoutput2( +data string +)CLUSTERED BY(data) +INTO 2 BUCKETS +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucketoutput2 +POSTHOOK: query: CREATE TABLE IF NOT EXISTS bucketoutput2( +data string +)CLUSTERED BY(data) +INTO 2 BUCKETS +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucketoutput2 +PREHOOK: query: insert into table bucketinput values ("firstinsert1") +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@bucketinput +POSTHOOK: query: insert into table bucketinput values ("firstinsert1") +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@bucketinput +POSTHOOK: Lineage: bucketinput.data SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert into table bucketinput values ("firstinsert2") +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@bucketinput +POSTHOOK: query: insert into table bucketinput values ("firstinsert2") +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@bucketinput +POSTHOOK: Lineage: bucketinput.data SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert into table bucketinput values ("firstinsert3") +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__3 +PREHOOK: Output: default@bucketinput +POSTHOOK: query: insert into table bucketinput values ("firstinsert3") +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__3 +POSTHOOK: Output: default@bucketinput +POSTHOOK: Lineage: bucketinput.data SIMPLE [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert overwrite table bucketoutput1 select * from bucketinput where data like 'first%' +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketinput +PREHOOK: Output: default@bucketoutput1 +POSTHOOK: query: insert overwrite table bucketoutput1 select * from bucketinput where data like 'first%' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketinput +POSTHOOK: Output: default@bucketoutput1 +POSTHOOK: Lineage: bucketoutput1.data SIMPLE [(bucketinput)bucketinput.FieldSchema(name:data, type:string, comment:null), ] +PREHOOK: query: select * from bucketoutput1 a join bucketoutput2 b on (a.data=b.data) +PREHOOK: type: QUERY +PREHOOK: Input: default@bucketoutput1 +PREHOOK: Input: default@bucketoutput2 +#### A masked pattern was here #### +POSTHOOK: query: select * from bucketoutput1 a join bucketoutput2 b on (a.data=b.data) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucketoutput1 +POSTHOOK: Input: default@bucketoutput2 +#### A masked pattern was here #### +PREHOOK: query: drop table buckettestinput +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table buckettestinput +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table buckettestoutput1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table buckettestoutput1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table buckettestoutput2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table buckettestoutput2 +POSTHOOK: type: DROPTABLE