diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 2023292f4e..a96cf1e731 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -240,6 +240,7 @@ public Object run() throws Exception { ci.errorMessage = e.getMessage(); msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); msc.abortTxns(Collections.singletonList(compactorTxnId)); + compactorTxnId = NOT_SET; } } catch (TException | IOException t) { LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + @@ -248,6 +249,7 @@ public Object run() throws Exception { if (msc != null && ci != null) { ci.errorMessage = t.getMessage(); msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); + compactorTxnId = NOT_SET; } } catch (TException e) { LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e); @@ -265,8 +267,9 @@ public Object run() throws Exception { } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " + StringUtils.stringifyException(t)); + compactorTxnId = NOT_SET; } finally { - commitTxn(compactorTxnId); + commitTxnIfSet(compactorTxnId); if (heartbeater != null) { heartbeater.cancel(); } @@ -284,7 +287,7 @@ public Object run() throws Exception { } while (!stop.get()); } - private void commitTxn(long compactorTxnId) { + private void commitTxnIfSet(long compactorTxnId) { if (compactorTxnId != NOT_SET) { try { if (msc != null) {