diff --git metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index a07c695..e9df1e1 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -1482,7 +1482,7 @@ CompactionResponse compact2(String dbname, String tableName, String partitionNam Map tblproperties) throws TException; /** - * Get a list of all current compactions. + * Get a list of all compactions. * @return List of all current compactions. This includes compactions waiting to happen, * in progress, and finished but waiting to clean the existing files. * @throws TException diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index a4d9183..0b0df85 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -48,15 +48,15 @@ @InterfaceStability.Evolving public interface TxnStore { - public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, + enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, WriteSetCleaner, CompactionScheduler} // Compactor states (Should really be enum) - static final public String INITIATED_RESPONSE = "initiated"; - static final public String WORKING_RESPONSE = "working"; - static final public String CLEANING_RESPONSE = "ready for cleaning"; - static final public String FAILED_RESPONSE = "failed"; - static final public String SUCCEEDED_RESPONSE = "succeeded"; - static final public String ATTEMPTED_RESPONSE = "attempted"; + String INITIATED_RESPONSE = "initiated"; + String WORKING_RESPONSE = "working"; + String CLEANING_RESPONSE = "ready for cleaning"; + String FAILED_RESPONSE = "failed"; + String SUCCEEDED_RESPONSE = "succeeded"; + String ATTEMPTED_RESPONSE = "attempted"; public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index c11ba97..01d980e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; @@ -1875,6 +1876,42 @@ private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException { console.printInfo("Compaction already enqueued with id " + resp.getId() + "; State is " + resp.getState()); } + if(desc.isBlocking() && resp.isAccepted()) { + StringBuilder progressDots = new StringBuilder(); + long waitTimeMs = 1000; + wait: while (true) { + //double wait time until 5min + waitTimeMs = waitTimeMs*2; + waitTimeMs = waitTimeMs < 5*60*1000 ? waitTimeMs : 5*60*1000; + try { + Thread.sleep(waitTimeMs); + } + catch(InterruptedException ex) { + console.printInfo("Interrupted while waiting for compaction with id=" + resp.getId()); + break; + } + //this could be expensive when there are a lot of compactions.... + //todo: update to search by ID once HIVE-13353 is done + ShowCompactResponse allCompactions = db.showCompactions(); + for(ShowCompactResponseElement compaction : allCompactions.getCompacts()) { + if (resp.getId() != compaction.getId()) { + continue; + } + switch (compaction.getState()) { + case TxnStore.WORKING_RESPONSE: + case TxnStore.INITIATED_RESPONSE: + //still working + console.printInfo(progressDots.toString()); + progressDots.append("."); + continue wait; + default: + //done + console.printInfo("Compaction with id " + resp.getId() + " finished with status: " + compaction.getState()); + break wait; + } + } + } + } return 0; } @@ -2812,7 +2849,7 @@ private int showLocksNewFormat(ShowLocksDesc showLocks, HiveLockManager lm) } private int showCompactions(Hive db, ShowCompactionsDesc desc) throws HiveException { - // Call the metastore to get the currently queued and running compactions. + // Call the metastore to get the status of all known compactions (completed get purged eventually) ShowCompactResponse rsp = db.showCompactions(); // Write the results into the file diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ce435f9..f175663 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -516,7 +516,7 @@ public void analyzeInternal(ASTNode input) throws SemanticException { analyzeCacheMetadata(ast); break; default: - throw new SemanticException("Unsupported command."); + throw new SemanticException("Unsupported command: " + ast); } if (fetchTask != null && !rootTasks.isEmpty()) { rootTasks.get(rootTasks.size() - 1).setFetchSource(true); @@ -1748,13 +1748,22 @@ private void analyzeAlterTableCompact(ASTNode ast, String tableName, LinkedHashMap newPartSpec = null; if (partSpec != null) newPartSpec = new LinkedHashMap(partSpec); - AlterTableSimpleDesc desc = new AlterTableSimpleDesc( - tableName, newPartSpec, type); + HashMap mapProp = null; + boolean isBlocking = false; - if (ast.getChildCount() > 1) { - HashMap mapProp = getProps((ASTNode) (ast.getChild(1)).getChild(0)); - desc.setProps(mapProp); + for(int i = 0; i < ast.getChildCount(); i++) { + switch(ast.getChild(i).getType()) { + case HiveParser.TOK_TABLEPROPERTIES: + mapProp = getProps((ASTNode) (ast.getChild(i)).getChild(0)); + break; + case HiveParser.TOK_BLOCKING: + isBlocking = true; + break; + } } + AlterTableSimpleDesc desc = new AlterTableSimpleDesc( + tableName, newPartSpec, type, isBlocking); + desc.setProps(mapProp); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc), conf)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index ad61f83..2df5e3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -346,6 +346,7 @@ KW_SUMMARY: 'SUMMARY'; KW_OPERATOR: 'OPERATOR'; KW_EXPRESSION: 'EXPRESSION'; KW_DETAIL: 'DETAIL'; +KW_WAIT: 'WAIT'; // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 61778f6..37817ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -396,6 +396,7 @@ TOK_SUMMARY; TOK_OPERATOR; TOK_EXPRESSION; TOK_DETAIL; +TOK_BLOCKING; } @@ -559,6 +560,9 @@ import org.apache.hadoop.hive.conf.HiveConf; xlateMap.put("KW_NORELY", "NORELY"); xlateMap.put("KW_ABORT", "ABORT"); xlateMap.put("KW_TRANSACTIONS", "TRANSACTIONS"); + xlateMap.put("KW_COMPACTIONS", "COMPACTIONS"); + xlateMap.put("KW_COMPACT", "COMPACT"); + xlateMap.put("KW_WAIT", "WAIT"); // Operators xlateMap.put("DOT", "."); @@ -1446,11 +1450,16 @@ alterStatementSuffixBucketNum -> ^(TOK_ALTERTABLE_BUCKETS $num) ; +blocking + : KW_AND KW_WAIT + -> TOK_BLOCKING + ; + alterStatementSuffixCompact @init { msgs.push("compaction request"); } @after { msgs.pop(); } - : KW_COMPACT compactType=StringLiteral (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)? - -> ^(TOK_ALTERTABLE_COMPACT $compactType tableProperties?) + : KW_COMPACT compactType=StringLiteral blocking? (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)? + -> ^(TOK_ALTERTABLE_COMPACT $compactType blocking? tableProperties?) ; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 57fa0f4..0de1596 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -789,6 +789,7 @@ nonReserved | KW_OPERATOR | KW_EXPRESSION | KW_DETAIL + | KW_WAIT ; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java index 2ae70bb..e76bbb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java @@ -31,6 +31,7 @@ private String tableName; private LinkedHashMap partSpec; private String compactionType; + private boolean isBlocking = false; AlterTableTypes type; private Map props; @@ -61,11 +62,12 @@ public AlterTableSimpleDesc(String tableName, * @param compactionType currently supported values: 'major' and 'minor' */ public AlterTableSimpleDesc(String tableName, - LinkedHashMap partSpec, String compactionType) { + LinkedHashMap partSpec, String compactionType, boolean isBlocking) { type = AlterTableTypes.COMPACT; this.compactionType = compactionType; this.tableName = tableName; this.partSpec = partSpec; + this.isBlocking = isBlocking; } public String getTableName() { @@ -100,6 +102,13 @@ public String getCompactionType() { return compactionType; } + /** + * if compaction request should block until completion + */ + public boolean isBlocking() { + return isBlocking; + } + public Map getProps() { return props; } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index d2ade88..b9df674 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -54,6 +54,8 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.TimeUnit; /** @@ -873,4 +875,27 @@ public void testBadOnClause() throws Exception { "No columns from target table 'trgt' found in ON clause '`sub`.`a` = `target`.`a`' of MERGE statement.")); } + + /** + * Writing UTs that need multiple threads is challenging since Derby chokes on concurrent access. + * This tests that "AND WAIT" actually blocks and responds to interrupt + * @throws Exception + */ + @Test + public void testCompactionBlocking() throws Exception { + Timer cancelCompact = new Timer("CancelCompactionTimer", false); + final Thread threadToInterrupt= Thread.currentThread(); + cancelCompact.schedule(new TimerTask() { + @Override + public void run() { + threadToInterrupt.interrupt(); + } + }, 5000); + long start = System.currentTimeMillis(); + runStatementOnDriver("alter table "+ TestTxnCommands2.Table.ACIDTBL +" compact 'major' AND WAIT"); + //no Worker so it stays in initiated state + //w/o AND WAIT the above alter table retunrs almost immediately, so the test here to check that + //> 2 seconds pass, i.e. that the command in Driver actually blocks before cancel is fired + Assert.assertTrue(System.currentTimeMillis() > start + 2); + } }