commit 8f8bce02142a98434f6e6ca7b3bd88dedcc52fbe Author: Andrew Sherman Date: Tue Aug 29 14:05:25 2017 -0700 HIVE-17635: Add unit tests to CompactionTxnHandler and use PreparedStatements for queries Add a unit test which exercises CompactionTxnHandler.markFailed() and change it to use PreparedStament. Add test for checkFailedCompactions() and change it to use PreparedStatement Add a unit test which exercises purgeCompactionHistory(). Add buildQueryWithINClauseStrings() which is suitable for building in clauses for PreparedStatement Add test code to TestTxnUtils to tickle code in TxnUtils.buildQueryWithINClauseStrings() so that it produces multiple queries. Change markCleaned() to use PreparedStatement diff --git beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java index 84963af10ec13979a7b3976be434efbc21cf2382..25991329396d5dbad034bfe9bbd83bf7cd11e9b5 100644 --- beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java +++ beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java @@ -669,27 +669,31 @@ boolean validateSequences(Connection conn) throws HiveMetaException { for (String seqName : seqNameToTable.keySet()) { String tableName = seqNameToTable.get(seqName).getLeft(); String tableKey = seqNameToTable.get(seqName).getRight(); + String fullSequenceName = "org.apache.hadoop.hive.metastore.model." + seqName; String seqQuery = needsQuotedIdentifier ? - ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.\"SEQUENCE_NAME\" ") - : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.SEQUENCE_NAME "); + ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"=? order by t.\"SEQUENCE_NAME\" ") + : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME=? order by t.SEQUENCE_NAME "); String maxIdQuery = needsQuotedIdentifier ? ("select max(\"" + tableKey + "\") from \"" + tableName + "\"") : ("select max(" + tableKey + ") from " + tableName); - ResultSet res = stmt.executeQuery(maxIdQuery); - if (res.next()) { - long maxId = res.getLong(1); - if (maxId > 0) { - ResultSet resSeq = stmt.executeQuery(seqQuery); - if (!resSeq.next()) { - isValid = false; - System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE"); - } else if (resSeq.getLong(1) < maxId) { - isValid = false; - System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max("+ tableKey + ") in " + tableName); - } - } + ResultSet res = stmt.executeQuery(maxIdQuery); + if (res.next()) { + long maxId = res.getLong(1); + if (maxId > 0) { + PreparedStatement pStmt = conn.prepareStatement(seqQuery); + pStmt.setString(1, fullSequenceName); + ResultSet resSeq = pStmt.executeQuery(); + if (!resSeq.next()) { + isValid = false; + System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE"); + } else if (resSeq.getLong(1) < maxId) { + isValid = false; + System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max(" + + tableKey + ") in " + tableName); + } } + } } System.out.println((isValid ? "Succeeded" :"Failed") + " in sequence number validation for SEQUENCE_TABLE."); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 60839faa352cbf959041a455e9e780dfca0afdc3..d3cf68e26c1d2581b5e46ed6da022b1ef490954b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -333,13 +333,13 @@ public void markCompacted(CompactionInfo info) throws MetaException { public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = null; - Statement stmt = null; PreparedStatement pStmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, info.id); + rs = pStmt.executeQuery(); if(rs.next()) { info = CompactionInfo.loadFullFromCompactionQueue(rs); } @@ -347,9 +347,11 @@ public void markCleaned(CompactionInfo info) throws MetaException { throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); } close(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, info.id); LOG.debug("Going to execute update <" + s + ">"); - int updCount = stmt.executeUpdate(s); + int updCount = pStmt.executeUpdate(); if (updCount != 1) { LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); LOG.debug("Going to rollback"); @@ -363,28 +365,55 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest txn ID include in this compaction job. //highestTxnId will be NULL in upgrade scenarios - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; + s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + + "ctc_table = ?"; + if (info.partName != null) { + s += " and ctc_partition = ?"; + } + if(info.highestTxnId != 0) { + s += " and ctc_txnid <= ?"; + } + pStmt = dbConn.prepareStatement(s); + int paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); if (info.partName != null) { - s += " and ctc_partition = '" + info.partName + "'"; + pStmt.setString(paramCount++, info.partName); } if(info.highestTxnId != 0) { - s += " and ctc_txnid <= " + info.highestTxnId; + pStmt.setLong(paramCount++, info.highestTxnId); } LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { + if (pStmt.executeUpdate() < 1) { LOG.error("Expected to remove at least one row from completed_txn_components when " + "marking compaction entry as clean!"); } s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId); - if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; + if (info.highestTxnId != 0) s += " and txn_id <= ?"; + if (info.partName != null) s += " and tc_partition = ?"; + + pStmt = dbConn.prepareStatement(s); + paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if(info.highestTxnId != 0) { + pStmt.setLong(paramCount++, info.highestTxnId); + } + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + LOG.debug("Going to execute update <" + s + ">"); - rs = stmt.executeQuery(s); + rs = pStmt.executeQuery(); List txnids = new ArrayList<>(); - while (rs.next()) txnids.add(rs.getLong(1)); + List questions = new ArrayList<>(); + while (rs.next()) { + long id = rs.getLong(1); + txnids.add(id); + questions.add("?"); + } // Remove entries from txn_components, as there may be aborted txn components if (txnids.size() > 0) { List queries = new ArrayList(); @@ -396,21 +425,35 @@ public void markCleaned(CompactionInfo info) throws MetaException { prefix.append("delete from TXN_COMPONENTS where "); //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" and tc_database = "); - suffix.append(quoteString(info.dbname)); - suffix.append(" and tc_table = "); - suffix.append(quoteString(info.tableName)); + suffix.append(" and tc_database = ?"); + suffix.append(" and tc_table = ?"); if (info.partName != null) { - suffix.append(" and tc_partition = "); - suffix.append(quoteString(info.partName)); + suffix.append(" and tc_partition = ?"); } // Populate the complete query with provided prefix and suffix - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false); + List counts = TxnUtils + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", + true, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + int insertCount = counts.get(i); - for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); + System.out.println("query = " + query); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, txnids.get(totalCount + j)); + } + totalCount += insertCount; + paramCount = insertCount + 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + int rc = pStmt.executeUpdate(); LOG.debug("Removed " + rc + " records from txn_components"); // Don't bother cleaning from the txns table. A separate call will do that. We don't @@ -429,8 +472,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(pStmt); - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException e) { markCleaned(info); @@ -598,35 +640,39 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { @RetrySemantics.ReadOnly public List findColumnsWithStats(CompactionInfo ci) throws MetaException { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); String quote = getIdentifierQuoteString(dbConn); - stmt = dbConn.createStatement(); StringBuilder bldr = new StringBuilder(); bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) .append(" FROM ") .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) .append(quote) .append(" WHERE ") - .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname) - .append("' AND ").append(quote).append("TABLE_NAME").append(quote) - .append(" = '").append(ci.tableName).append("'"); + .append(quote).append("DB_NAME").append(quote).append(" = ?") + .append(" AND ").append(quote).append("TABLE_NAME").append(quote) + .append(" = ?"); if (ci.partName != null) { - bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") - .append(ci.partName).append("'"); + bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?"); } String s = bldr.toString(); + pStmt = dbConn.prepareStatement(s); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS") + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ LOG.debug("Going to execute <" + s + ">"); - rs = stmt.executeQuery(s); - List columns = new ArrayList(); + rs = pStmt.executeQuery(); + List columns = new ArrayList<>(); while (rs.next()) { columns.add(rs.getString(1)); } @@ -641,7 +687,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException ex) { return findColumnsWithStats(ci); @@ -724,6 +770,7 @@ private void checkForDeletion(List deleteSet, CompactionInfo ci, Retention public void purgeCompactionHistory() throws MetaException { Connection dbConn = null; Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; List deleteSet = new ArrayList<>(); RetentionCounters rc = null; @@ -763,11 +810,23 @@ public void purgeCompactionHistory() throws MetaException { prefix.append("delete from COMPLETED_COMPACTIONS where "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false); + List questions = new ArrayList<>(deleteSet.size()); + for (int i = 0; i < deleteSet.size(); i++) { + questions.add("?"); + } + List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + long insertCount = counts.get(i); - for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); - int count = stmt.executeUpdate(query); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, deleteSet.get(totalCount + j)); + } + totalCount += insertCount; + int count = pStmt.executeUpdate(); LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS"); } dbConn.commit(); @@ -778,6 +837,7 @@ public void purgeCompactionHistory() throws MetaException { StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); + closeStmt(pStmt); } } catch (RetryException ex) { purgeCompactionHistory(); @@ -812,17 +872,22 @@ private int getFailedCompactionRetention() { @RetrySemantics.ReadOnly public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " + - "CC_DATABASE = " + quoteString(ci.dbname) + " and " + - "CC_TABLE = " + quoteString(ci.tableName) + - (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + + pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = ? and " + + "CC_TABLE = ? " + + (ci.partName != null ? "and CC_PARTITION = ?" : "") + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } + rs = pStmt.executeQuery(); int numFailed = 0; int numTotal = 0; int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); @@ -834,17 +899,18 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { numFailed--; } } + return numFailed == failedThreshold; } catch (SQLException e) { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.error("Unable to check for failed compactions " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); return false;//weren't able to check } finally { - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException e) { return checkFailedCompactions(ci); @@ -868,12 +934,16 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, ci.id); + rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, ci.id); LOG.debug("Going to execute update <" + s + ">"); - int updCnt = stmt.executeUpdate(s); + int updCnt = pStmt.executeUpdate(); } else { if(ci.id > 0) { @@ -896,6 +966,7 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho ci.state = FAILED_STATE; } close(rs, stmt, null); + closeStmt(pStmt); pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 30b155f3b3311fed6cd79e46a5b2abcee9927d91..ac5d11838d886e2fe600e9442a527ff8462b74f5 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.List; @@ -154,13 +155,41 @@ public static boolean isAcidTable(Table table) { * @param addParens add a pair of parenthesis outside the IN lists * e.g. ( id in (1,2,3) OR id in (4,5,6) ) * @param notIn clause to be broken up is NOT IN + * @return a list of the count of IN list values that are in each of the corresponding queries */ - public static void buildQueryWithINClause(HiveConf conf, List queries, StringBuilder prefix, + public static List buildQueryWithINClause(HiveConf conf, List queries, StringBuilder prefix, StringBuilder suffix, List inList, String inColumn, boolean addParens, boolean notIn) { + List inListStrings = new ArrayList<>(inList.size()); + for (Long aLong : inList) { + inListStrings.add(aLong.toString()); + } + return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, + inListStrings, inColumn, addParens, notIn); + } + + + /** + * Build a query (or queries if one query is too big) with specified "prefix" and "suffix", + * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6) + * For NOT IN case, NOT IN list is broken into multiple AND clauses. + * @param queries array of complete query strings + * @param prefix part of the query that comes before IN list + * @param suffix part of the query that comes after IN list + * @param inList the list containing IN list values + * @param inColumn column name of IN list operator + * @param addParens add a pair of parenthesis outside the IN lists + * e.g. ( id in (1,2,3) OR id in (4,5,6) ) + * @param notIn clause to be broken up is NOT IN + * @return a list of the count of IN list values that are in each of the corresponding queries + */ + public static List buildQueryWithINClauseStrings(HiveConf conf, List queries, StringBuilder prefix, + StringBuilder suffix, List inList, + String inColumn, boolean addParens, boolean notIn) { if (inList == null || inList.size() == 0) { throw new IllegalArgumentException("The IN list is empty!"); } + int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE); int numWholeBatches = inList.size() / batchSize; StringBuilder buf = new StringBuilder(); @@ -174,6 +203,8 @@ public static void buildQueryWithINClause(HiveConf conf, List queries, S } else { buf.append(" in ("); } + List ret = new ArrayList<>(numWholeBatches); + int currentCount = 0; for (int i = 0; i <= numWholeBatches; i++) { if (i * batchSize == inList.size()) { @@ -188,9 +219,11 @@ public static void buildQueryWithINClause(HiveConf conf, List queries, S } buf.append(suffix); queries.add(buf.toString()); + ret.add(currentCount); // Prepare a new query string buf.setLength(0); + currentCount = 0; } if (i > 0) { @@ -221,6 +254,7 @@ public static void buildQueryWithINClause(HiveConf conf, List queries, S for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) { buf.append(inList.get(j)).append(","); + currentCount++; } buf.setCharAt(buf.length() - 1, ')'); } @@ -230,6 +264,8 @@ public static void buildQueryWithINClause(HiveConf conf, List queries, S } buf.append(suffix); queries.add(buf.toString()); + ret.add(currentCount); + return ret; } /** Estimate if the size of a string will exceed certain limit */ diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java index 1497c00e5dc77c02e53767b014a23e5fd8cb5b29..927948d57b54855ab1c387fbe768808331e8cfba 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java @@ -43,6 +43,7 @@ public TestTxnUtils() throws Exception { @Test public void testBuildQueryWithINClause() throws Exception { List queries = new ArrayList(); + List ret; StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); @@ -59,16 +60,21 @@ public void testBuildQueryWithINClause() throws Exception { for (long i = 1; i <= 200; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(200L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 2 - Max in list members: 10; Max query string length: 1KB // The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member queries.clear(); inList.add((long)201); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(2, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(200L, ret.get(0).longValue()); + Assert.assertEquals(1L, ret.get(1).longValue()); runAgainstDerby(queries); // Case 3.1 - Max in list members: 1000, Max query string length: 1KB, and exact 1000 members in a single IN clause @@ -78,16 +84,19 @@ public void testBuildQueryWithINClause() throws Exception { for (long i = 202; i <= 1000; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(1000L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 3.2 - Max in list members: 1000, Max query string length: 10KB, and exact 1000 members in a single IN clause conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 10); conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 3.3 - Now with 2000 entries, try the above settings @@ -96,13 +105,18 @@ public void testBuildQueryWithINClause() throws Exception { } conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(2, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(1000L, ret.get(0).longValue()); + Assert.assertEquals(1000L, ret.get(1).longValue()); runAgainstDerby(queries); conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 10); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(2000L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 4 - Max in list members: 1000; Max query string length: 10KB @@ -110,22 +124,28 @@ public void testBuildQueryWithINClause() throws Exception { for (long i = 2001; i <= 4321; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(3, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 5 - NOT IN list queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); Assert.assertEquals(3, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 6 - No parenthesis queries.clear(); suffix.setLength(0); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); Assert.assertEquals(3, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(2000L, ret.get(0).longValue()); + Assert.assertEquals(2000L, ret.get(1).longValue()); + Assert.assertEquals(321L, ret.get(2).longValue()); runAgainstDerby(queries); } diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index f8ae86bea3fe78374c0e0487d66c661f4f0d78ff..bb0fcae8e6d5723c934190d6b50911223c05a1c9 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -49,6 +49,7 @@ import java.util.TreeSet; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; @@ -64,6 +65,10 @@ public TestCompactionTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); + // Set config so that TxnUtils.buildQueryWithINClauseStrings() will + // produce multiple queries + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1); + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10); tearDown(); } @@ -224,6 +229,64 @@ public void testMarkCleaned() throws Exception { } @Test + public void testMarkFailed() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + assertEquals(0, txnHandler.findReadyToClean().size()); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + + assertEquals(0, txnHandler.findReadyToClean().size()); + txnHandler.markFailed(ci); + assertNull(txnHandler.findNextToCompact("fred")); + boolean failedCheck = txnHandler.checkFailedCompactions(ci); + assertFalse(failedCheck); + try { + // The first call to markFailed() should have removed the record from + // COMPACTION_QUEUE, so a repeated call should fail + txnHandler.markFailed(ci); + fail("The first call to markFailed() must have failed as this call did " + + "not throw the expected exception"); + } catch (IllegalStateException e) { + // This is expected + assertTrue(e.getMessage().contains("No record with CQ_ID=")); + } + + // There are not enough failed compactions yet so checkFailedCompactions() should return false. + // But note that any sql error will also result in a return of false. + assertFalse(txnHandler.checkFailedCompactions(ci)); + + // Add more failed compactions so that the total is exactly COMPACTOR_INITIATOR_FAILED_THRESHOLD + for (int i = 1 ; i < conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); i++) { + addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today"); + } + // Now checkFailedCompactions() will return true + assertTrue(txnHandler.checkFailedCompactions(ci)); + + // Now add enough failed compactions to ensure purgeCompactionHistory() will attempt delete; + // HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED is enough for this. + // But we also want enough to tickle the code in TxnUtils.buildQueryWithINClauseStrings() + // so that it produces multiple queries. For that we need at least 290. + for (int i = 0 ; i < 300; i++) { + addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today"); + } + txnHandler.purgeCompactionHistory(); + } + + private void addFailedCompaction(String dbName, String tableName, CompactionType type, + String partitionName) throws MetaException { + CompactionRequest rqst; + CompactionInfo ci; + rqst = new CompactionRequest(dbName, tableName, type); + rqst.setPartitionname(partitionName); + txnHandler.compact(rqst); + ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + txnHandler.markFailed(ci); + } + + @Test public void testRevokeFromLocalWorkers() throws Exception { CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); txnHandler.compact(rqst);