commit 779487b9a720e9d2ea2c457ba367afa89d9b5cf4 Author: Andrew Sherman Date: Tue Aug 29 14:05:25 2017 -0700 HIVE-17635: Add unit tests to CompactionTxnHandler and use PreparedStatements for queries Add a unit test which exercises CompactionTxnHandler.markFailed() and change it to use PreparedStament. Add test for checkFailedCompactions() and change it to use PreparedStatement Add a unit test which exercises purgeCompactionHistory(). Add buildQueryWithINClauseStrings() which is suitable for building in clauses for PreparedStatement Add test code to TestTxnUtils to tickle code in TxnUtils.buildQueryWithINClauseStrings() so that it produces multiple queries. Change markCleaned() to use PreparedStatement diff --git beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java index 7c65fe1e9b96a1d9be772d3cd1d11d4e964599c3..bb463ad205eb54a962d9a77dc032e2cb9fd7936b 100644 --- beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java +++ beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java @@ -669,27 +669,31 @@ boolean validateSequences(Connection conn) throws HiveMetaException { for (String seqName : seqNameToTable.keySet()) { String tableName = seqNameToTable.get(seqName).getLeft(); String tableKey = seqNameToTable.get(seqName).getRight(); + String fullSequenceName = "org.apache.hadoop.hive.metastore.model." + seqName; String seqQuery = needsQuotedIdentifier ? - ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.\"SEQUENCE_NAME\" ") - : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.SEQUENCE_NAME "); + ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"=? order by t.\"SEQUENCE_NAME\" ") + : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME=? order by t.SEQUENCE_NAME "); String maxIdQuery = needsQuotedIdentifier ? ("select max(\"" + tableKey + "\") from \"" + tableName + "\"") : ("select max(" + tableKey + ") from " + tableName); - ResultSet res = stmt.executeQuery(maxIdQuery); - if (res.next()) { - long maxId = res.getLong(1); - if (maxId > 0) { - ResultSet resSeq = stmt.executeQuery(seqQuery); - if (!resSeq.next()) { - isValid = false; - System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE"); - } else if (resSeq.getLong(1) < maxId) { - isValid = false; - System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max("+ tableKey + ") in " + tableName); - } - } + ResultSet res = stmt.executeQuery(maxIdQuery); + if (res.next()) { + long maxId = res.getLong(1); + if (maxId > 0) { + PreparedStatement pStmt = conn.prepareStatement(seqQuery); + pStmt.setString(1, fullSequenceName); + ResultSet resSeq = pStmt.executeQuery(); + if (!resSeq.next()) { + isValid = false; + System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE"); + } else if (resSeq.getLong(1) < maxId) { + isValid = false; + System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max(" + + tableKey + ") in " + tableName); + } } + } } System.out.println((isValid ? "Succeeded" :"Failed") + " in sequence number validation for SEQUENCE_TABLE."); diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 96005b4388bbfcbd46570cd285595b06104e52b0..34a1600a44f843a6cefb963faa4d6db033d7a385 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -49,6 +49,7 @@ import java.util.TreeSet; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; @@ -64,6 +65,10 @@ public TestCompactionTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); + // Set config so that TxnUtils.buildQueryWithINClauseStrings() will + // produce multiple queries + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1); + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10); tearDown(); } @@ -224,6 +229,64 @@ public void testMarkCleaned() throws Exception { } @Test + public void testMarkFailed() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + assertEquals(0, txnHandler.findReadyToClean().size()); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + + assertEquals(0, txnHandler.findReadyToClean().size()); + txnHandler.markFailed(ci); + assertNull(txnHandler.findNextToCompact("fred")); + boolean failedCheck = txnHandler.checkFailedCompactions(ci); + assertFalse(failedCheck); + try { + // The first call to markFailed() should have removed the record from + // COMPACTION_QUEUE, so a repeated call should fail + txnHandler.markFailed(ci); + fail("The first call to markFailed() must have failed as this call did " + + "not throw the expected exception"); + } catch (IllegalStateException e) { + // This is expected + assertTrue(e.getMessage().contains("No record with CQ_ID=")); + } + + // There are not enough failed compactions yet so checkFailedCompactions() should return false. + // But note that any sql error will also result in a return of false. + assertFalse(txnHandler.checkFailedCompactions(ci)); + + // Add more failed compactions so that the total is exactly COMPACTOR_INITIATOR_FAILED_THRESHOLD + for (int i = 1 ; i < conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); i++) { + addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today"); + } + // Now checkFailedCompactions() will return true + assertTrue(txnHandler.checkFailedCompactions(ci)); + + // Now add enough failed compactions to ensure purgeCompactionHistory() will attempt delete; + // HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED is enough for this. + // But we also want enough to tickle the code in TxnUtils.buildQueryWithINClauseStrings() + // so that it produces multiple queries. For that we need at least 290. + for (int i = 0 ; i < 300; i++) { + addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today"); + } + txnHandler.purgeCompactionHistory(); + } + + private void addFailedCompaction(String dbName, String tableName, CompactionType type, + String partitionName) throws MetaException { + CompactionRequest rqst; + CompactionInfo ci; + rqst = new CompactionRequest(dbName, tableName, type); + rqst.setPartitionname(partitionName); + txnHandler.compact(rqst); + ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + txnHandler.markFailed(ci); + } + + @Test public void testRevokeFromLocalWorkers() throws Exception { CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); txnHandler.compact(rqst); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index e676b91a705723feb5f1cba1323d67c1559b6096..c1ffb87122bc4ee12f45ac04527e5014abd1a875 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -334,13 +334,13 @@ public void markCompacted(CompactionInfo info) throws MetaException { public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = null; - Statement stmt = null; PreparedStatement pStmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, info.id); + rs = pStmt.executeQuery(); if(rs.next()) { info = CompactionInfo.loadFullFromCompactionQueue(rs); } @@ -348,9 +348,11 @@ public void markCleaned(CompactionInfo info) throws MetaException { throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); } close(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, info.id); LOG.debug("Going to execute update <" + s + ">"); - int updCount = stmt.executeUpdate(s); + int updCount = pStmt.executeUpdate(); if (updCount != 1) { LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); LOG.debug("Going to rollback"); @@ -364,28 +366,55 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest txn ID include in this compaction job. //highestTxnId will be NULL in upgrade scenarios - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; + s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + + "ctc_table = ?"; + if (info.partName != null) { + s += " and ctc_partition = ?"; + } + if(info.highestTxnId != 0) { + s += " and ctc_txnid <= ?"; + } + pStmt = dbConn.prepareStatement(s); + int paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); if (info.partName != null) { - s += " and ctc_partition = '" + info.partName + "'"; + pStmt.setString(paramCount++, info.partName); } if(info.highestTxnId != 0) { - s += " and ctc_txnid <= " + info.highestTxnId; + pStmt.setLong(paramCount++, info.highestTxnId); } LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { + if (pStmt.executeUpdate() < 1) { LOG.error("Expected to remove at least one row from completed_txn_components when " + "marking compaction entry as clean!"); } s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId); - if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; + if (info.highestTxnId != 0) s += " and txn_id <= ?"; + if (info.partName != null) s += " and tc_partition = ?"; + + pStmt = dbConn.prepareStatement(s); + paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if(info.highestTxnId != 0) { + pStmt.setLong(paramCount++, info.highestTxnId); + } + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + LOG.debug("Going to execute update <" + s + ">"); - rs = stmt.executeQuery(s); + rs = pStmt.executeQuery(); List txnids = new ArrayList<>(); - while (rs.next()) txnids.add(rs.getLong(1)); + List questions = new ArrayList<>(); + while (rs.next()) { + long id = rs.getLong(1); + txnids.add(id); + questions.add("?"); + } // Remove entries from txn_components, as there may be aborted txn components if (txnids.size() > 0) { List queries = new ArrayList<>(); @@ -397,21 +426,34 @@ public void markCleaned(CompactionInfo info) throws MetaException { prefix.append("delete from TXN_COMPONENTS where "); //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" and tc_database = "); - suffix.append(quoteString(info.dbname)); - suffix.append(" and tc_table = "); - suffix.append(quoteString(info.tableName)); + suffix.append(" and tc_database = ?"); + suffix.append(" and tc_table = ?"); if (info.partName != null) { - suffix.append(" and tc_partition = "); - suffix.append(quoteString(info.partName)); + suffix.append(" and tc_partition = ?"); } // Populate the complete query with provided prefix and suffix - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false); + List counts = TxnUtils + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", + true, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + int insertCount = counts.get(i); - for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, txnids.get(totalCount + j)); + } + totalCount += insertCount; + paramCount = insertCount + 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + int rc = pStmt.executeUpdate(); LOG.debug("Removed " + rc + " records from txn_components"); // Don't bother cleaning from the txns table. A separate call will do that. We don't @@ -430,8 +472,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(pStmt); - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException e) { markCleaned(info); @@ -599,34 +640,38 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { @RetrySemantics.ReadOnly public List findColumnsWithStats(CompactionInfo ci) throws MetaException { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); String quote = getIdentifierQuoteString(dbConn); - stmt = dbConn.createStatement(); StringBuilder bldr = new StringBuilder(); bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) .append(" FROM ") .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) .append(quote) .append(" WHERE ") - .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname) - .append("' AND ").append(quote).append("TABLE_NAME").append(quote) - .append(" = '").append(ci.tableName).append("'"); + .append(quote).append("DB_NAME").append(quote).append(" = ?") + .append(" AND ").append(quote).append("TABLE_NAME").append(quote) + .append(" = ?"); if (ci.partName != null) { - bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") - .append(ci.partName).append("'"); + bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?"); } String s = bldr.toString(); + pStmt = dbConn.prepareStatement(s); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS") + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ LOG.debug("Going to execute <" + s + ">"); - rs = stmt.executeQuery(s); + rs = pStmt.executeQuery(); List columns = new ArrayList<>(); while (rs.next()) { columns.add(rs.getString(1)); @@ -642,7 +687,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException ex) { return findColumnsWithStats(ci); @@ -725,6 +770,7 @@ private void checkForDeletion(List deleteSet, CompactionInfo ci, Retention public void purgeCompactionHistory() throws MetaException { Connection dbConn = null; Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; List deleteSet = new ArrayList<>(); RetentionCounters rc = null; @@ -764,11 +810,22 @@ public void purgeCompactionHistory() throws MetaException { prefix.append("delete from COMPLETED_COMPACTIONS where "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false); - - for (String query : queries) { + List questions = new ArrayList<>(deleteSet.size()); + for (int i = 0; i < deleteSet.size(); i++) { + questions.add("?"); + } + List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + long insertCount = counts.get(i); LOG.debug("Going to execute update <" + query + ">"); - int count = stmt.executeUpdate(query); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, deleteSet.get(totalCount + j)); + } + totalCount += insertCount; + int count = pStmt.executeUpdate(); LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS"); } dbConn.commit(); @@ -779,6 +836,7 @@ public void purgeCompactionHistory() throws MetaException { StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); + closeStmt(pStmt); } } catch (RetryException ex) { purgeCompactionHistory(); @@ -813,17 +871,22 @@ private int getFailedCompactionRetention() { @RetrySemantics.ReadOnly public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " + - "CC_DATABASE = " + quoteString(ci.dbname) + " and " + - "CC_TABLE = " + quoteString(ci.tableName) + - (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + + pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = ? and " + + "CC_TABLE = ? " + + (ci.partName != null ? "and CC_PARTITION = ?" : "") + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } + rs = pStmt.executeQuery(); int numFailed = 0; int numTotal = 0; int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); @@ -838,14 +901,14 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { return numFailed == failedThreshold; } catch (SQLException e) { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.error("Unable to check for failed compactions " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); return false;//weren't able to check } finally { - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException e) { return checkFailedCompactions(ci); @@ -869,12 +932,16 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, ci.id); + rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, ci.id); LOG.debug("Going to execute update <" + s + ">"); - int updCnt = stmt.executeUpdate(s); + int updCnt = pStmt.executeUpdate(); } else { if(ci.id > 0) { @@ -897,6 +964,7 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho ci.state = FAILED_STATE; } close(rs, stmt, null); + closeStmt(pStmt); pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 2eb967eda42abe9c764273ca47bc278678794e1b..cf40d3c9f3ad61263fa693f3717875d3068d6ca9 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.List; @@ -172,8 +173,9 @@ public static boolean isAcidTable(Table table) { * @param addParens IN: add a pair of parenthesis outside the IN lists * e.g. "(id in (1,2,3) OR id in (4,5,6))" * @param notIn IN: is this for building a 'NOT IN' composite clause? + * @return OUT: a list of the count of IN list values that are in each of the corresponding queries */ - public static void buildQueryWithINClause(Configuration conf, + public static List buildQueryWithINClause(Configuration conf, List queries, StringBuilder prefix, StringBuilder suffix, @@ -181,6 +183,47 @@ public static void buildQueryWithINClause(Configuration conf, String inColumn, boolean addParens, boolean notIn) { + List inListStrings = new ArrayList<>(inList.size()); + for (Long aLong : inList) { + inListStrings.add(aLong.toString()); + } + return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, + inListStrings, inColumn, addParens, notIn); + + } + /** + * Build a query (or queries if one query is too big but only for the case of 'IN' + * composite clause. For the case of 'NOT IN' clauses, multiple queries change + * the semantics of the intended query. + * E.g., Let's assume that input "inList" parameter has [5, 6] and that + * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause, + * Then having two delete statements changes the semantics of the inteneded SQL statement. + * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence + * is not equal to 'delete from T where a not in (5, 6)'.) + * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters. + * + * Note that this method currently support only single column for + * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and + * AND-based composite 'NOT IN' clause. + * For example, for 'IN' clause case, the method will build a query with OR. + * E.g., "id in (1,2,3) OR id in (4,5,6)". + * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND. + * + * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN' + * clauses in a query". + * + * @param queries OUT: Array of query strings + * @param prefix IN: Part of the query that comes before IN list + * @param suffix IN: Part of the query that comes after IN list + * @param inList IN: the list with IN list values + * @param inColumn IN: single column name of IN list operator + * @param addParens IN: add a pair of parenthesis outside the IN lists + * e.g. "(id in (1,2,3) OR id in (4,5,6))" + * @param notIn IN: is this for building a 'NOT IN' composite clause? + * @return OUT: a list of the count of IN list values that are in each of the corresponding queries + */ + public static List buildQueryWithINClauseStrings(Configuration conf, List queries, StringBuilder prefix, + StringBuilder suffix, List inList, String inColumn, boolean addParens, boolean notIn) { // Get configuration parameters int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH); int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE); @@ -203,6 +246,8 @@ public static void buildQueryWithINClause(Configuration conf, StringBuilder newInclausePrefix = new StringBuilder(notIn ? " and " + inColumn + " not in (": " or " + inColumn + " in ("); + List ret = new ArrayList<>(); + int currentCount = 0; // Loop over the given inList elements. while( cursor4InListArray < inListSize || !nextItemNeeded) { @@ -257,9 +302,11 @@ public static void buildQueryWithINClause(Configuration conf, buf.append(suffix); queries.add(buf.toString()); + ret.add(currentCount); // Prepare a new query string. buf.setLength(0); + currentCount = 0; cursor4queryOfInClauses = cursor4InClauseElements = 0; querySize = 0; newInclausePrefixJustAppended = false; @@ -276,6 +323,7 @@ public static void buildQueryWithINClause(Configuration conf, cursor4InClauseElements = 0; } else { buf.append(nextValue.toString()).append(","); + currentCount++; nextItemNeeded = true; newInclausePrefixJustAppended = false; // increment cursor for elements per 'IN'/'NOT IN' clause. @@ -293,6 +341,8 @@ public static void buildQueryWithINClause(Configuration conf, } buf.append(suffix); queries.add(buf.toString()); + ret.add(currentCount); + return ret; } /* diff --git standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java index 7dd268f3ca69b1417fd5f4e25a8dca9257c9bb25..0384f8b64d9bf7d912ecf8c2cb0ed393be50783d 100644 --- standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java +++ standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java @@ -45,6 +45,7 @@ public TestTxnUtils() throws Exception { @Test public void testBuildQueryWithINClause() throws Exception { List queries = new ArrayList<>(); + List ret; StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); @@ -61,16 +62,21 @@ public void testBuildQueryWithINClause() throws Exception { for (long i = 1; i <= 189; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(189L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 2 - Max in list members: 10; Max query string length: 1KB // The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member queries.clear(); inList.add((long)190); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(2, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(189L, ret.get(0).longValue()); + Assert.assertEquals(1L, ret.get(1).longValue()); runAgainstDerby(queries); // Case 3.1 - Max in list members: 1000, Max query string length: 1KB, and exact 1000 members in a single IN clause @@ -80,16 +86,19 @@ public void testBuildQueryWithINClause() throws Exception { for (long i = 191; i <= 1000; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(5, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(267L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 3.2 - Max in list members: 1000, Max query string length: 10KB, and exact 1000 members in a single IN clause MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10); MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 3.3 - Now with 2000 entries, try the above settings @@ -98,19 +107,25 @@ public void testBuildQueryWithINClause() throws Exception { } MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 1); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(10, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(267L, ret.get(0).longValue()); + Assert.assertEquals(240L, ret.get(1).longValue()); runAgainstDerby(queries); MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(2000L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 4 - NOT IN list queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 5 - Max in list members: 1000; Max query string length: 10KB @@ -118,16 +133,21 @@ public void testBuildQueryWithINClause() throws Exception { for (long i = 2001; i <= 4321; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(3, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 6 - No parenthesis queries.clear(); suffix.setLength(0); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); Assert.assertEquals(3, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(2255L, ret.get(0).longValue()); + Assert.assertEquals(2033L, ret.get(1).longValue()); + Assert.assertEquals(33L, ret.get(2).longValue()); runAgainstDerby(queries); }