diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8c948a98cc98d7997c86310d0fe32c61f72cc744..b6f527e241d914b209f20d21f3e0819abd9d5885 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1400,7 +1400,7 @@ public Partition loadPartition(Path loadPath, Table tbl, } else { newFiles = new ArrayList(); FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles, tbl); } boolean forceCreate = (!holdDDLTime) ? true : false; @@ -1642,7 +1642,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, FileSystem fs; try { fs = tbl.getDataLocation().getFileSystem(sessionConf); - copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles); + copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles, tbl); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } @@ -2665,7 +2665,7 @@ public static boolean moveFile(HiveConf conf, Path srcf, Path destf, * @throws HiveException */ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean isSrcLocal, boolean isAcid, List newFiles) throws HiveException { + FileSystem fs, boolean isSrcLocal, boolean isAcid, List newFiles, Table tlb) throws HiveException { boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); try { @@ -2701,10 +2701,19 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, } else { // check that source and target paths exist List> result = checkPaths(conf, fs, srcs, srcFs, destf, false); + boolean isEnforceBucketing = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEENFORCEBUCKETING) + && tlb.getNumBuckets() > 0; // move it, move it + int warnCount = 0; //use the variable to avoid too many warnings. try { for (List sdpairs : result) { for (Path[] sdpair : sdpairs) { + if(isEnforceBucketing && warnCount == 0 && sdpair[0]!=null && sdpair[1] != null && (!sdpair[0].getName().equalsIgnoreCase(sdpair[1].getName()))) { + String msg = "Insert into bucketed table is not supported, it may result in incorrect metadata. Table name: " + tlb.getTableName(); + SessionState.getConsole().printInfo( + String.format("Warning: %s", msg)); + ++warnCount; + } if (!moveFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) { throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]); diff --git a/ql/src/test/queries/clientpositive/insertinto_nonemptybucket.q b/ql/src/test/queries/clientpositive/insertinto_nonemptybucket.q new file mode 100644 index 0000000000000000000000000000000000000000..09c8db40c8992a10f405e5c1f5909edc8e659a71 --- /dev/null +++ b/ql/src/test/queries/clientpositive/insertinto_nonemptybucket.q @@ -0,0 +1,10 @@ +drop table if exists buckettest1; +create table if not exists buckettest1 (data int) partitioned by (state string) clustered by (data) into 2 buckets; +set hive.enforce.bucketing = true; +set hive.enforce.sorting=true; +set hive.exec.dynamic.partition = true; +set hive.exec.dynamic.partition.mode = nonstrict; +set hive.session.silent = false; +insert into table buckettest1 partition(state) select key, 'MA' from src where key < 100; +insert into table buckettest1 partition(state) select key, 'MA' from src where key > 100 and key < 200; +set hive.session.silent = true; diff --git a/ql/src/test/results/clientpositive/insertinto_nonemptybucket.q.out b/ql/src/test/results/clientpositive/insertinto_nonemptybucket.q.out new file mode 100644 index 0000000000000000000000000000000000000000..94be331ac1121a51d67a36d8d8223811bc853b8a --- /dev/null +++ b/ql/src/test/results/clientpositive/insertinto_nonemptybucket.q.out @@ -0,0 +1,77 @@ +PREHOOK: query: drop table if exists buckettest1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists buckettest1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table if not exists buckettest1 (data int) partitioned by (state string) clustered by (data) into 2 buckets +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@buckettest1 +POSTHOOK: query: create table if not exists buckettest1 (data int) partitioned by (state string) clustered by (data) into 2 buckets +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@buckettest1 +PREHOOK: query: insert into table buckettest1 partition(state) select key, 'MA' from src where key < 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@buckettest1 +Query ID = ychen_20150610135023_b84f1965-be94-4876-bcab-cf7c22a93576 +Total jobs = 1 +Launching Job 1 out of 1 +Number of reduce tasks determined at compile time: 2 +In order to change the average load for a reducer (in bytes): + set hive.exec.reducers.bytes.per.reducer= +In order to limit the maximum number of reducers: + set hive.exec.reducers.max= +In order to set a constant number of reducers: + set mapreduce.job.reduces= +Job running in-process (local Hadoop) +2015-06-10 13:50:25,481 Stage-1 map = 100%, reduce = 100% +#### A masked pattern was here #### +Loading data to table default.buckettest1 partition (state=null) + Time taken for load dynamic partitions : 186 + Loading partition {state=MA} + Time taken for adding to write entity : 1 +Partition default.buckettest1{state=MA} stats: [numFiles=2, numRows=84, totalSize=242, rawDataSize=158] +POSTHOOK: query: insert into table buckettest1 partition(state) select key, 'MA' from src where key < 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@buckettest1@state=MA +POSTHOOK: Lineage: buckettest1 PARTITION(state=MA).data EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +MapReduce Jobs Launched: +Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS +Total MapReduce CPU Time Spent: 0 msec +OK +Time taken: 2.071 seconds +PREHOOK: query: insert into table buckettest1 partition(state) select key, 'MA' from src where key > 100 and key < 200 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@buckettest1 +Query ID = ychen_20150610135025_42365b47-e4cd-4616-b06f-f6f84f270a0b +Total jobs = 1 +Launching Job 1 out of 1 +Number of reduce tasks determined at compile time: 2 +In order to change the average load for a reducer (in bytes): + set hive.exec.reducers.bytes.per.reducer= +In order to limit the maximum number of reducers: + set hive.exec.reducers.max= +In order to set a constant number of reducers: + set mapreduce.job.reduces= +Job running in-process (local Hadoop) +2015-06-10 13:50:27,313 Stage-1 map = 100%, reduce = 100% +#### A masked pattern was here #### +Loading data to table default.buckettest1 partition (state=null) +Warning: Insert into bucketed table is not supported, it may result in incorrect metadata. Table name: buckettest1 + Time taken for load dynamic partitions : 200 + Loading partition {state=MA} + Time taken for adding to write entity : 0 +Partition default.buckettest1{state=MA} stats: [numFiles=4, numRows=187, totalSize=654, rawDataSize=467] +POSTHOOK: query: insert into table buckettest1 partition(state) select key, 'MA' from src where key > 100 and key < 200 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@buckettest1@state=MA +POSTHOOK: Lineage: buckettest1 PARTITION(state=MA).data EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +MapReduce Jobs Launched: +Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS +Total MapReduce CPU Time Spent: 0 msec +OK +Time taken: 1.8 seconds