diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 7e99008..5de3f1d 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; /** * Information about the hive end point (i.e. table or partition) to write to. @@ -272,11 +273,31 @@ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, } this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials(); this.msClient = getMetaStoreClient(endPoint, conf, secureMode); + checkEndPoint(endPoint, msClient); if (createPart && !endPoint.partitionVals.isEmpty()) { createPartitionIfNotExists(endPoint, msClient, conf); } } + private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable { + // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table + try { + Table t = msClient.getTable(endPoint.database, endPoint.table); + Map params = t.getParameters(); + if(params != null) { + String transactionalProp = params.get("transactional"); + if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) { + return; + } + } + LOG.error("'transactional' property is not set on Table " + endPoint); + throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table"); + } catch (Exception e) { + LOG.warn("Unable to check if Table is transactional. " + endPoint, e); + throw new InvalidTable(endPoint.database, endPoint.table, e); + } + } + /** * Close connection */ diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java index 903c37e..98ef688 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java @@ -27,4 +27,12 @@ private static String makeMsg(String db, String table) { public InvalidTable(String db, String table) { super(makeMsg(db,table), null); } + + public InvalidTable(String db, String table, String msg) { + super(msg); + } + + public InvalidTable(String db, String table, Exception inner) { + super(inner.getMessage(), inner); + } } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 2f6baec..db18414 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -386,6 +386,44 @@ public void testStreamBucketingMatchesRegularBucketing() throws Exception { } + @Test + public void testTableValidation() throws Exception { + int bucketCount = 100; + + String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString(); + String tbl1 = "validation1"; + String tbl2 = "validation2"; + + String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'"; + String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'"; + + runDDL(driver, "create database testBucketing3"); + runDDL(driver, "use testBucketing3"); + + runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc) ; + + runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ; + + + try { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null); + endPt.newConnection(false); + Assert.assertTrue("InvalidTable exception was not thrown", false); + } catch (InvalidTable e) { + // expecting this exception + } + try { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null); + endPt.newConnection(false); + Assert.assertTrue("InvalidTable exception was not thrown", false); + } catch (InvalidTable e) { + // expecting this exception + } + } + + private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { ValidTxnList txns = msClient.getValidTxns(); @@ -1164,7 +1202,8 @@ public static Path createDbAndTable(Driver driver, String databaseName, " clustered by ( " + join(bucketCols, ",") + " )" + " into " + bucketCount + " buckets " + " stored as orc " + - " location '" + tableLoc + "'"; + " location '" + tableLoc + "'" + + " TBLPROPERTIES ('transactional'='true') "; runDDL(driver, crtTbl); if(partNames!=null && partNames.length!=0) { return addPartition(driver, tableName, partVals, partNames); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index abca1ce..e2910dd 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -154,11 +154,12 @@ public void testStatsAfterCompactionPartTbl() throws Exception { executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " PARTITIONED BY(bkt INT)" + " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC", driver); + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" + " STORED AS TEXTFILE" + - " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'", driver); + " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'" + + " TBLPROPERTIES ('transactional'='true')", driver); executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME + "' overwrite into table " + tblNameStg, driver); @@ -411,7 +412,7 @@ public void minorCompactWhileStreaming() throws Exception { executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC", driver); + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); @@ -468,7 +469,7 @@ public void majorCompactWhileStreaming() throws Exception { executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC", driver); + " STORED AS ORC TBLPROPERTIES ('transactional'='true') ", driver); HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); @@ -516,7 +517,7 @@ public void minorCompactAfterAbort() throws Exception { executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC", driver); + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt); @@ -576,7 +577,7 @@ public void majorCompactAfterAbort() throws Exception { executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC", driver); + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null); DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);