diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 7e99008..61db209 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; /** * Information about the hive end point (i.e. table or partition) to write to. @@ -272,11 +273,31 @@ private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi, } this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials(); this.msClient = getMetaStoreClient(endPoint, conf, secureMode); + checkEndPoint(endPoint, msClient); if (createPart && !endPoint.partitionVals.isEmpty()) { createPartitionIfNotExists(endPoint, msClient, conf); } } + private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable { + // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table + try { + Table t = msClient.getTable(endPoint.database, endPoint.table); + Map params = t.getParameters(); + if(params != null) { + String transactionalProp = params.get("transactional"); + if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) { + return; + } + } + LOG.error("'transactional' property is not set on Table " + endPoint); + throw new InvalidTable(endPoint.database, endPoint.table); + } catch (Exception e) { + LOG.warn("Unable to check if Table is transactional. " + endPoint, e); + throw new InvalidTable(endPoint.database, endPoint.table); + } + } + /** * Close connection */ diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 720ba12..6078f93 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -392,6 +392,44 @@ public void testStreamBucketingMatchesRegularBucketing() throws Exception { } + @Test + public void testTableValidation() throws Exception { + int bucketCount = 100; + + String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString(); + String tbl1 = "validation1"; + String tbl2 = "validation2"; + + String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'"; + String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'"; + + runDDL(driver, "create database testBucketing3"); + runDDL(driver, "use testBucketing3"); + + runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc) ; + + runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ; + + + try { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null); + endPt.newConnection(false); + Assert.assertTrue("InvalidTable exception was not thrown", false); + } catch (InvalidTable e) { + // expecting this exception + } + try { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null); + endPt.newConnection(false); + Assert.assertTrue("InvalidTable exception was not thrown", false); + } catch (InvalidTable e) { + // expecting this exception + } + } + + private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { ValidTxnList txns = msClient.getValidTxns(); @@ -1170,7 +1208,8 @@ public static Path createDbAndTable(Driver driver, String databaseName, " clustered by ( " + join(bucketCols, ",") + " )" + " into " + bucketCount + " buckets " + " stored as orc " + - " location '" + tableLoc + "'"; + " location '" + tableLoc + "'" + + " TBLPROPERTIES ('transactional'='true') "; runDDL(driver, crtTbl); if(partNames!=null && partNames.length!=0) { return addPartition(driver, tableName, partVals, partNames);