diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 7913295380..1dfc91f2fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -282,6 +282,7 @@ private CompactionType checkForCompaction(final CompactionInfo ci, } if (runJobAsSelf(runAs)) { + ci.runAs = runAs; return determineCompactionType(ci, writeIds, sd, tblproperties); } else { LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName()); @@ -353,6 +354,16 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : Float.parseFloat(deltaPctProp); boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold; + boolean multiBase = dir.getObsolete().stream() + .filter(path -> path.getName().startsWith(AcidUtils.BASE_PREFIX)).count() >= 1; + if ((deltaSize == 0 && dir.getObsolete().size() > 0) && multiBase) { + try { + txnHandler.requestCleanup(ci); + return null; + } catch (MetaException e) { + LOG.error("Unable to request Clean up for "+ci.getFullTableName()); + } + } if (LOG.isDebugEnabled()) { StringBuilder msg = new StringBuilder("delta size: "); msg.append(deltaSize); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 89ddccbbda..fe8f09acd2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -5363,6 +5363,121 @@ private void acquireTxnLock(Statement stmt, boolean shared) throws SQLException, LOG.debug("TXN lock locked by {} in mode {}", quoteString(TxnHandler.hostname), shared); } + + @Override + @RetrySemantics.Idempotent + public void requestCleanup(CompactionInfo ci) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + PreparedStatement pst = null; + TxnStore.MutexAPI.LockHandle handle = null; + try { + lockInternal(); + /** + * MUTEX_KEY.CompactionScheduler lock ensures that there is only 1 entry in + * Initiated/Working state for any resource. This ensures that we don't run concurrent + * compactions for any resource. + */ + handle = getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name()); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + long id = generateCompactionQueueId(stmt); + + List params = new ArrayList<>(); + StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where"). + append(" cq_state IN(").append(quoteChar(INITIATED_STATE)). + append(",").append(quoteChar(WORKING_STATE)). + append(") AND cq_database=?"). + append(" AND cq_table=?").append(" AND "); + params.add(ci.dbname); + params.add(ci.tableName); + if(ci.partName == null) { + sb.append("cq_partition is null"); + } else { + sb.append("cq_partition=?"); + params.add(ci.partName); + } + + pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params); + LOG.debug("Going to execute query <" + sb.toString() + ">"); + ResultSet rs = pst.executeQuery(); + if(rs.next()) { + long enqueuedId = rs.getLong(1); + String state = compactorStateToResponse(rs.getString(2).charAt(0)); + LOG.info("Ignoring request to clean up for " + ci.dbname + "/" + ci.tableName + + "/" + ci.partName + " since it is already " + quoteString(state) + + " with id=" + enqueuedId); + } + close(rs); + closeStmt(pst); + params.clear(); + StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + + "cq_table, "); + String partName = ci.partName; + if (partName != null) { + buf.append("cq_partition, "); + } + buf.append("cq_state, cq_type"); + if (ci.properties != null) { + buf.append(", cq_tblproperties"); + } + if (ci.runAs != null) { + buf.append(", cq_run_as"); + } + buf.append(") values ("); + buf.append(id); + buf.append(", ?"); + buf.append(", ?"); + buf.append(", "); + params.add(ci.dbname); + params.add(ci.tableName); + if (partName != null) { + buf.append("?, '"); + params.add(partName); + } else { + buf.append("'"); + } + buf.append(READY_FOR_CLEANING); + buf.append("', '"); + buf.append(MAJOR_TYPE); + buf.append("'"); + if (ci.properties != null) { + buf.append(", ?"); + params.add(ci.properties); + } + if (ci.runAs != null) { + buf.append(", ?"); + params.add(ci.runAs); + } + buf.append(")"); + String s = buf.toString(); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute update <" + s + ">"); + pst.executeUpdate(); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "requestCleanup(" + ci + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(pst); + closeStmt(stmt); + closeDbConn(dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } catch (RetryException e) { + requestCleanup(ci); + } + } + private static final class LockHandleImpl implements LockHandle { private final Connection dbConn; private final Statement stmt; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 3e441b559f..76fb1134a7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -523,4 +523,13 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old */ @RetrySemantics.Idempotent long findMinOpenTxnIdForCleaner() throws MetaException; + + /** + * Request explicit cleanup in case there n base + * files available without delta. + * @param ci + * @throws MetaException + */ + @RetrySemantics.Idempotent + void requestCleanup(CompactionInfo ci) throws MetaException; }