diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 328a65c..44ee5c6 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -122,8 +122,9 @@ public void setRunAs(long cq_id, String user) throws MetaException { stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); + int updCnt = stmt.executeUpdate(s); + if (updCnt != 1) { + LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ". updCnt=" + updCnt); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -182,8 +183,10 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); + int updCount = stmt.executeUpdate(s); + if (updCount != 1) { + LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + + info + ". updCnt=" + updCount); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -221,8 +224,9 @@ public void markCompacted(CompactionInfo info) throws MetaException { String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + "cq_worker_id = null where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); + int updCnt = stmt.executeUpdate(s); + if (updCnt != 1) { + LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -298,6 +302,17 @@ public void markCompacted(CompactionInfo info) throws MetaException { /** * This will remove an entry from the queue after * it has been compacted. + * + * todo: possibly a problem? Worker will start with DB in state X (wrt this partition). + * while it's working more txns will happen, against partition it's compacting. + * then this will delete state up to X and since then. There may be new delta files created + * between compaction starting and cleaning. These will not be compacted until more + * transactions happen. So this ideally should only delete + * up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run + * at READ_COMMITTED + * + * Also, by using this method when Worker fails, we prevent future compactions from + * running until more data is written to tale or compaction is invoked explicitly * @param info info on the compaction entry to remove */ public void markCleaned(CompactionInfo info) throws MetaException { @@ -309,8 +324,9 @@ public void markCleaned(CompactionInfo info) throws MetaException { stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to delete compaction record"); + int updCount = stmt.executeUpdate(s); + if (updCount != 1) { + LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); LOG.debug("Going to rollback"); dbConn.rollback(); } @@ -348,7 +364,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { else buf.append(", "); buf.append(id); } - + //because 1 txn may include different partitions/tables even in auto commit mode buf.append(") and tc_database = '"); buf.append(info.dbname); buf.append("' and tc_table = '"); @@ -415,7 +431,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { String bufStr = buf.toString(); LOG.debug("Going to execute update <" + bufStr + ">"); int rc = stmt.executeUpdate(bufStr); - LOG.debug("Removed " + rc + " records from txns"); + LOG.info("Removed " + rc + " empty Aborted transactions: " + txnids + " from TXNS"); LOG.debug("Going to commit"); dbConn.commit(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java index d22ca8d..23a77e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -84,9 +84,10 @@ private TimedoutTxnReaper(HiveConf hiveConf, AcidHouseKeeperService owner) { @Override public void run() { try { + long startTime = System.currentTimeMillis(); txnHandler.performTimeOuts(); - owner.isAliveCounter.incrementAndGet(); - LOG.info("timeout reaper ran"); + int count = owner.isAliveCounter.incrementAndGet(); + LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); } catch(Throwable t) { LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 16d2c81..622bf54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -212,7 +212,7 @@ private void clean(CompactionInfo ci) throws MetaException { if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); } else { - LOG.info("Cleaning as user " + ci.runAs); + LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, UserGroupInformation.getLoginUser()); ugi.doAs(new PrivilegedExceptionAction() { @@ -245,6 +245,7 @@ private void removeFiles(String location, ValidTxnList txnList) throws IOExcepti ", that hardly seems right."); return; } + LOG.info("About to remove " + filesToDelete.size() + " obsolete directories from " + location); FileSystem fs = filesToDelete.get(0).getFileSystem(conf); for (Path dead : filesToDelete) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 38cd95e..c956f58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -119,8 +119,8 @@ protected Partition resolvePartition(CompactionInfo ci) throws Exception { throw e; } if (parts.size() != 1) { - LOG.error(ci.getFullPartitionName() + " does not refer to a single partition"); - throw new MetaException("Too many partitions"); + LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts); + throw new MetaException("Too many partitions for : " + ci.getFullPartitionName()); } return parts.get(0); } else { @@ -179,8 +179,9 @@ public Object run() throws Exception { return wrapper.get(0); } } - LOG.error("Unable to stat file as either current user or table owner, giving up"); - throw new IOException("Unable to stat file"); + LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() + + ") or table owner(" + t.getOwner() + "), giving up"); + throw new IOException("Unable to stat file: " + p); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 9bf725d..f265311 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -99,7 +99,7 @@ public void run() { // check if no compaction set for this table if (noAutoCompactSet(t)) { - LOG.info("Table " + tableName(t) + " marked true so we will not compact it."); + LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it."); continue; } @@ -297,11 +297,10 @@ private long sumDirSize(FileSystem fs, Path dir) throws IOException { } private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException { - String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName(); - LOG.info(s); CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type); if (ci.partName != null) rqst.setPartitionname(ci.partName); rqst.setRunas(runAs); + LOG.info("Requesting compaction: " + rqst); txnHandler.compact(rqst); }