diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index a9ab90bca8..5f625945d8 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -878,7 +878,7 @@ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) thro txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -890,11 +890,11 @@ private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) thro txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -946,7 +946,7 @@ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws E txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -958,11 +958,11 @@ private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws E txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -1008,7 +1008,7 @@ public void testTransactionBatchCommit_Json() throws Exception { txnBatch.write(rec1.getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 3, 12, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -1135,7 +1135,7 @@ public void testTransactionBatchAbortAndCommit() throws Exception { txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 2, 11, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -1154,13 +1154,13 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; - checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming"); + checkDataWritten2(partLoc, 3, 12, 1, validationQuery, false, "1\tHello streaming"); txnBatch.beginNextTransaction(); txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello streaming", + checkDataWritten2(partLoc, 3, 12, 1, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming"); txnBatch.close(); @@ -1171,14 +1171,14 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.write("3,Hello streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello streaming", + checkDataWritten2(partLoc, 3, 22, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again"); txnBatch.beginNextTransaction(); txnBatch.write("4,Welcome to streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello streaming", + checkDataWritten2(partLoc, 3, 22, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); @@ -1215,7 +1215,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { txnBatch2.commit(); String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; - checkDataWritten2(partLoc, 11, 20, 1, + checkDataWritten2(partLoc, 12, 21, 1, validationQuery, true, "3\tHello streaming - once again"); txnBatch1.commit(); @@ -1236,7 +1236,7 @@ public void testInterleavedTransactionBatchCommits() throws Exception { Assert.assertTrue("", logicalLength == actualLength); } } - checkDataWritten2(partLoc, 1, 20, 2, + checkDataWritten2(partLoc, 2, 21, 2, validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again"); txnBatch1.beginNextTransaction(); @@ -1261,19 +1261,19 @@ public void testInterleavedTransactionBatchCommits() throws Exception { Assert.assertTrue("", logicalLength <= actualLength); } } - checkDataWritten2(partLoc, 1, 20, 2, + checkDataWritten2(partLoc, 2, 21, 2, validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again"); txnBatch1.commit(); - checkDataWritten2(partLoc, 1, 20, 2, + checkDataWritten2(partLoc, 2, 21, 2, validationQuery, false, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again"); txnBatch2.commit(); - checkDataWritten2(partLoc, 1, 20, 2, + checkDataWritten2(partLoc, 2, 21, 2, validationQuery, true, "1\tHello streaming", "2\tWelcome to streaming", "3\tHello streaming - once again", @@ -2101,7 +2101,7 @@ public static void dropDB(IMetaStoreClient client, String databaseName) { ///////// -------- UTILS ------- ///////// // returns Path of the partition created (if any) else Path of table - public static Path createDbAndTable(IDriver driver, String databaseName, + private static Path createDbAndTable(IDriver driver, String databaseName, String tableName, List partVals, String[] colNames, String[] colTypes, String[] bucketCols, @@ -2147,7 +2147,7 @@ private static Path getPartitionPath(IDriver driver, String tableName, String pa private static String getTableColumnsStr(String[] colNames, String[] colTypes) { StringBuilder sb = new StringBuilder(); for (int i=0; i < colNames.length; ++i) { - sb.append(colNames[i] + " " + colTypes[i]); + sb.append(colNames[i]).append(" ").append(colTypes[i]); if (i partVals) { StringBuilder sb = new StringBuilder(); for (int i=0; i < partVals.size(); ++i) { - sb.append(partNames[i] + " = '" + partVals.get(i) + "'"); + sb.append(partNames[i]).append(" = '").append(partVals.get(i)).append("'"); if(i < partVals.size()-1) { sb.append(","); } @@ -2217,7 +2217,7 @@ private static boolean runDDL(IDriver driver, String sql) throws QueryFailedExce } - public static ArrayList queryTable(IDriver driver, String query) throws IOException { + private static ArrayList queryTable(IDriver driver, String query) throws IOException { CommandProcessorResponse cpr = driver.run(query); if(cpr.getResponseCode() != 0) { throw new RuntimeException(query + " failed: " + cpr);