diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 6651900..7b8d678 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -411,8 +411,6 @@ INSERT_CANNOT_CREATE_TEMP_FILE(10293, "Unable to create temp file for insert values "), ACID_OP_ON_NONACID_TXNMGR(10294, "Attempt to do update or delete using transaction manager that" + " does not support these operations."), - NO_INSERT_OVERWRITE_WITH_ACID(10295, "INSERT OVERWRITE not allowed on table {0} with OutputFormat " + - "that implements AcidOutputFormat while transaction manager that supports ACID is in use", true), VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296, "Values clause with table constructor not yet supported"), ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " + diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index cc69c7e..0070c68 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -340,7 +340,7 @@ private static RecordUpdater getRecordUpdater(JobConf jc, .isCompressed(conf.getCompressed()) .tableProperties(tableProp) .reporter(reporter) - .writingBase(false) + .writingBase(conf.getInsertOverwrite()) .minimumTransactionId(conf.getTransactionId()) .maximumTransactionId(conf.getTransactionId()) .bucket(bucket) diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index cdf2c40..49e02f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -421,10 +421,15 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi makes sense everywhere). This however would be problematic for merge...*/ case DDL_EXCLUSIVE: case INSERT_OVERWRITE: - compBuilder.setExclusive(); - compBuilder.setOperationType(DataOperationType.NO_TXN); + t = getTable(output); + if (AcidUtils.isAcidTable(t)) { + compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.INSERT); + } else { + compBuilder.setExclusive(); + compBuilder.setOperationType(DataOperationType.NO_TXN); + } break; - case INSERT: assert t != null; if(AcidUtils.isAcidTable(t)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 88c73f0..fca213f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3424,17 +3424,19 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, for (FileStatus origBucketStat : origBucketStats) { Path origBucketPath = origBucketStat.getPath(); - moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter, + moveAcidFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter, fs, dst, origBucketPath, createdDeltaDirs, newFiles); - moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, + moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, fs, dst,origBucketPath, createdDeltaDirs, newFiles); + moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter, + fs, dst, origBucketPath, createdDeltaDirs, newFiles); } } } - private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs, - Path dst, Path origBucketPath, Set createdDeltaDirs, - List newFiles) throws HiveException { + private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs, + Path dst, Path origBucketPath, Set createdDeltaDirs, + List newFiles) throws HiveException { LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath); FileStatus[] deltaStats = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 9e84a29..b856cdb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6895,8 +6895,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) checkAcidConstraints(qb, table_desc, dest_tab); } ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp); + // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old + // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName())); + dest_tab.getTableName()) && !destTableIsAcid); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -7008,8 +7010,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) checkAcidConstraints(qb, table_desc, dest_tab); } ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp); + // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old + // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName())); + dest_tab.getTableName()) && !destTableIsAcid); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); @@ -7239,6 +7243,9 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE : (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); + if (!qb.getParseInfo().getInsertOverwriteTables().isEmpty()) { + fileSinkDesc.setInsertOverwrite(true); + } acidFileSinks.add(fileSinkDesc); } @@ -7373,11 +7380,6 @@ String fixCtasColumnName(String colName) { // that isn't true. private void checkAcidConstraints(QB qb, TableDesc tableDesc, Table table) throws SemanticException { - String tableName = tableDesc.getTableName(); - if (!qb.getParseInfo().isInsertIntoTable(tableName)) { - LOG.debug("Couldn't find table " + tableName + " in insertIntoTable"); - throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID, tableName); - } /* LOG.info("Modifying config values for ACID write"); conf.setBoolVar(ConfVars.HIVEOPTREDUCEDEDUPLICATION, true); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 4716adc..fd27f53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -103,6 +103,8 @@ */ private boolean isUsingThriftJDBCBinarySerDe = false; + private boolean isInsertOverwrite = false; + public FileSinkDesc() { } @@ -509,4 +511,12 @@ public FileSinkOperatorExplainVectorization getFileSinkVectorization() { } return new FileSinkOperatorExplainVectorization(vectorDesc); } + + public void setInsertOverwrite(boolean isInsertOverwrite) { + this.isInsertOverwrite = isInsertOverwrite; + } + + public boolean getInsertOverwrite() { + return isInsertOverwrite; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index c31241a..6d1cdcb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -376,12 +376,6 @@ public void testDummyTxnManagerOnAcidTable() throws Exception { Assert.assertTrue(cpr.getErrorMessage().contains("This command is not allowed on an ACID table")); useDummyTxnManagerTemporarily(conf); - cpr = driver.compileAndRespond("insert overwrite table T10 select a, b from T11"); - Assert.assertEquals(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getErrorCode(), cpr.getResponseCode()); - Assert.assertTrue(cpr.getErrorMessage().contains("INSERT OVERWRITE not allowed on table default.t10 with OutputFormat" + - " that implements AcidOutputFormat while transaction manager that supports ACID is in use")); - - useDummyTxnManagerTemporarily(conf); cpr = driver.compileAndRespond("update T10 set a=0 where b=1"); Assert.assertEquals(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getErrorCode(), cpr.getResponseCode()); Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); diff --git ql/src/test/queries/clientpositive/acid_insert_overwrite.q ql/src/test/queries/clientpositive/acid_insert_overwrite.q new file mode 100644 index 0000000..5536ff6 --- /dev/null +++ ql/src/test/queries/clientpositive/acid_insert_overwrite.q @@ -0,0 +1,21 @@ +set hive.mapred.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +create table acidtbl (key char(1), value int) clustered by (value) into 2 buckets stored as orc TBLPROPERTIES ("transactional"="true"); + +insert into table acidtbl values ('a', 1), ('b', 2), ('c', 3); +select * from acidtbl order by key; + +insert overwrite table acidtbl values ('d', 4), ('e', 5), ('f', 6); +select * from acidtbl order by key; + +insert into table acidtbl values ('g', 7), ('h', 8); +select * from acidtbl order by key; + +insert overwrite table acidtbl values ('i', 9), ('j', 10); +select * from acidtbl order by key; + +insert into table acidtbl values ('k', 11); +insert into table acidtbl values ('l', 12); +select * from acidtbl order by key; diff --git ql/src/test/results/clientpositive/acid_insert_overwrite.q.out ql/src/test/results/clientpositive/acid_insert_overwrite.q.out new file mode 100644 index 0000000..d0b2386 --- /dev/null +++ ql/src/test/results/clientpositive/acid_insert_overwrite.q.out @@ -0,0 +1,113 @@ +PREHOOK: query: create table acidtbl (key char(1), value int) clustered by (value) into 2 buckets stored as orc TBLPROPERTIES ("transactional"="true") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@acidtbl +POSTHOOK: query: create table acidtbl (key char(1), value int) clustered by (value) into 2 buckets stored as orc TBLPROPERTIES ("transactional"="true") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@acidtbl +PREHOOK: query: insert into table acidtbl values ('a', 1), ('b', 2), ('c', 3) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values ('a', 1), ('b', 2), ('c', 3) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.key EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.value EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +a 1 +b 2 +c 3 +PREHOOK: query: insert overwrite table acidtbl values ('d', 4), ('e', 5), ('f', 6) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert overwrite table acidtbl values ('d', 4), ('e', 5), ('f', 6) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.key EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.value EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +d 4 +e 5 +f 6 +PREHOOK: query: insert into table acidtbl values ('g', 7), ('h', 8) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values ('g', 7), ('h', 8) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.key EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.value EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +d 4 +e 5 +f 6 +g 7 +h 8 +PREHOOK: query: insert overwrite table acidtbl values ('i', 9), ('j', 10) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert overwrite table acidtbl values ('i', 9), ('j', 10) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.key EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.value EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +i 9 +j 10 +PREHOOK: query: insert into table acidtbl values ('k', 11) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values ('k', 11) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.key EXPRESSION [(values__tmp__table__5)values__tmp__table__5.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.value EXPRESSION [(values__tmp__table__5)values__tmp__table__5.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: insert into table acidtbl values ('l', 12) +PREHOOK: type: QUERY +PREHOOK: Output: default@acidtbl +POSTHOOK: query: insert into table acidtbl values ('l', 12) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@acidtbl +POSTHOOK: Lineage: acidtbl.key EXPRESSION [(values__tmp__table__6)values__tmp__table__6.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: acidtbl.value EXPRESSION [(values__tmp__table__6)values__tmp__table__6.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: select * from acidtbl order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@acidtbl +#### A masked pattern was here #### +POSTHOOK: query: select * from acidtbl order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@acidtbl +#### A masked pattern was here #### +i 9 +j 10 +k 11 +l 12