diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index bc4bfdf..1f6d53d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1577,13 +1577,16 @@ public void getMetaData(QB qb, ReadEntity parentInput) throws SemanticException // Disallow INSERT INTO on bucketized tables boolean isAcid = isAcidTable(tab); - if (qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()) && + boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()); + if (isTableWrittenTo && tab.getNumBuckets() > 0 && !isAcid) { throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE. getMsg("Table: " + tab_name)); } // Disallow update and delete on non-acid tables - if ((updating() || deleting()) && !isAcid) { + if ((updating() || deleting()) && !isAcid && isTableWrittenTo) { + //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable) + //so only assert this if we are actually writing to this table // isAcidTable above also checks for whether we are using an acid compliant // transaction manager. But that has already been caught in // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java new file mode 100644 index 0000000..06d2ca2 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -0,0 +1,189 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.FileDump; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * TODO: this should be merged with TestTxnCommands once that is checked in + * specifically the tests; the supporting code here is just a clone of TestTxnCommands + */ +public class TestTxnCommands2 { + private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands2.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + //bucket count for test tables; set it to 1 for easier debugging + private static int BUCKET_COUNT = 2; + @Rule + public TestName testName = new TestName(); + private HiveConf hiveConf; + private Driver d; + private static enum Table { + ACIDTBL("acidTbl"), + NONACIDORCTBL("nonAcidOrcTbl"); + + private final String name; + @Override + public String toString() { + return name; + } + Table(String name) { + this.name = name; + } + } + + @Before + public void setUp() throws Exception { + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); + TxnDbUtil.setConfValues(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING, true); + TxnDbUtil.prepDb(); + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { + throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + } + SessionState.start(new SessionState(hiveConf)); + d = new Driver(hiveConf); + dropTables(); + runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + } + private void dropTables() throws Exception { + for(Table t : Table.values()) { + runStatementOnDriver("drop table if exists " + t); + } + } + @After + public void tearDown() throws Exception { + try { + if (d != null) { + // runStatementOnDriver("set autocommit true"); + dropTables(); + d.destroy(); + d.close(); + d = null; + TxnDbUtil.cleanDb(); + } + } finally { + FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); + } + } + @Ignore("not needed but useful for testing") + @Test + public void testNonAcidInsert() throws Exception { + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)"); + List rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + } + @Test + public void testDeleteIn() throws Exception { + int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,7),(3,7)"); + //todo: once multistatement txns are supported, add a test to run next 2 statements in a single txn + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.NONACIDORCTBL); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}}; + Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs); + runStatementOnDriver("update " + Table.ACIDTBL + " set b=19 where b in(select b from " + Table.NONACIDORCTBL + " where a = 3)"); + List rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] updatedData2 = {{1,19},{3,19},{5,2},{5,3}}; + Assert.assertEquals("Bulk update2 failed", stringifyValues(updatedData2), rs2); + } + + /** + * takes raw data and turns it into a string as if from Driver.getResults() + * sorts rows in dictionary order + */ + private List stringifyValues(int[][] rowsIn) { + assert rowsIn.length > 0; + int[][] rows = rowsIn.clone(); + Arrays.sort(rows, new RowComp()); + List rs = new ArrayList(); + for(int[] row : rows) { + assert row.length > 0; + StringBuilder sb = new StringBuilder(); + for(int value : row) { + sb.append(value).append("\t"); + } + sb.setLength(sb.length() - 1); + rs.add(sb.toString()); + } + return rs; + } + private static final class RowComp implements Comparator { + public int compare(int[] row1, int[] row2) { + assert row1 != null && row2 != null && row1.length == row2.length; + for(int i = 0; i < row1.length; i++) { + int comp = Integer.compare(row1[i], row2[i]); + if(comp != 0) { + return comp; + } + } + return 0; + } + } + private String makeValuesClause(int[][] rows) { + assert rows.length > 0; + StringBuilder sb = new StringBuilder("values"); + for(int[] row : rows) { + assert row.length > 0; + if(row.length > 1) { + sb.append("("); + } + for(int value : row) { + sb.append(value).append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + if(row.length > 1) { + sb.append(")"); + } + sb.append(","); + } + sb.setLength(sb.length() - 1);//remove trailing comma + return sb.toString(); + } + + private List runStatementOnDriver(String stmt) throws Exception { + CommandProcessorResponse cpr = d.run(stmt); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(stmt + " failed: " + cpr); + } + List rs = new ArrayList(); + d.getResults(rs); + return rs; + } +}