diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java index 575be39451..992b9017e3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.backup.regionserver; import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; @@ -92,10 +94,14 @@ public class LogRollBackupSubprocedure extends Subprocedure { LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum + " highest: " + highest + " on " + rss.getServerName()); - ((HRegionServer) rss).getWalRoller().requestRollAll(); + Future f = ((HRegionServer) rss).getWalRoller().requestRollAll(); long start = EnvironmentEdgeManager.currentTime(); - while (!((HRegionServer) rss).getWalRoller().walRollFinished()) { - Thread.sleep(20); + try { + f.get(); + } catch (ExecutionException ex) { + // Shouldn't currently happen, but could be a valid path (roll failed). + LOG.error("Log rolling failed", ex); + throw ex; } LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start)); LOG.info("After roll log in backup subprocedure, current log number: " + fsWAL.getFilenum() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 05a8fdf778..caf68a6198 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.IdentityHashMap; import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; @@ -41,6 +43,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.SettableFuture; + /** * Runs periodically to determine if the WAL should be rolled. @@ -55,12 +59,22 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @VisibleForTesting public class LogRoller extends HasThread implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); - private final ReentrantLock rollLock = new ReentrantLock(); - private final AtomicBoolean rollLog = new AtomicBoolean(false); - private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap<>(); + + private final Object rollLogRequestLock = new Object(); + // Admin/procedure requests to roll all WALs. + private ArrayDeque> allWalsNeedToRollRequests = new ArrayDeque<>(); + // Requests to roll individual WALs. + // Note: currently WALs requesting a roll via walsNeedToRoll treat replaceWriter call from + // rollWriter as a callback; we could make callbacks explicit, making the threading model + // of AsyncWAL/etc a bit clearer to a reader, but that would require a larger refactoring. + private IdentityHashMap walsNeedToRoll = new IdentityHashMap<>(); + + // All the WALs in the system. + private final ConcurrentHashMap wals = new ConcurrentHashMap<>(); + private final Server server; protected final RegionServerServices services; - private volatile long lastrolltime = System.currentTimeMillis(); + private volatile long lastrolltime = System.nanoTime(); // Period to roll log. private final long rollperiod; private final int threadWakeFrequency; @@ -70,29 +84,28 @@ public class LogRoller extends HasThread implements Closeable { private volatile boolean running = true; public void addWAL(final WAL wal) { - if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { + if (null == wals.putIfAbsent(wal, Boolean.TRUE)) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(boolean lowReplicas) { - walNeedsRoll.put(wal, Boolean.TRUE); // TODO logs will contend with each other here, replace with e.g. DelayedQueue - synchronized(rollLog) { - rollLog.set(true); - rollLog.notifyAll(); + // if contention is observed empirically + synchronized (rollLogRequestLock) { + walsNeedToRoll.put(wal, Boolean.TRUE); + rollLogRequestLock.notifyAll(); } } }); } } - public void requestRollAll() { - for (WAL wal : walNeedsRoll.keySet()) { - walNeedsRoll.put(wal, Boolean.TRUE); - } - synchronized(rollLog) { - rollLog.set(true); - rollLog.notifyAll(); + public Future requestRollAll() { + SettableFuture result = SettableFuture.create(); + synchronized (rollLogRequestLock) { + allWalsNeedToRollRequests.add(result); + rollLogRequestLock.notifyAll(); } + return result; } /** @param server */ @@ -110,9 +123,9 @@ public class LogRoller extends HasThread implements Closeable { @Override public void interrupt() { - // Wake up if we are waiting on rollLog. For tests. - synchronized (rollLog) { - this.rollLog.notify(); + // Wake up if we are waiting on anyWalNeedsRoll. For tests. + synchronized(rollLogRequestLock) { + rollLogRequestLock.notifyAll(); } super.interrupt(); } @@ -120,11 +133,10 @@ public class LogRoller extends HasThread implements Closeable { /** * we need to check low replication in period, see HBASE-18132 */ - void checkLowReplication(long now) { + void checkLowReplication() { try { - for (Entry entry : walNeedsRoll.entrySet()) { - WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + for (WAL wal : wals.keySet()) { + boolean needRollAlready = walsNeedToRoll.containsKey(wal); // Best-effort; no lock. if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -137,10 +149,10 @@ public class LogRoller extends HasThread implements Closeable { private void abort(String reason, Throwable cause) { // close all WALs before calling abort on RS. - // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we - // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it - // is already broken. - for (WAL wal : walNeedsRoll.keySet()) { + // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if + // we failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that + // it is already broken. + for (WAL wal : wals.keySet()) { // shutdown rather than close here since we are going to abort the RS and the wals need to be // split when recovery try { @@ -155,41 +167,53 @@ public class LogRoller extends HasThread implements Closeable { @Override public void run() { while (running) { - long now = System.currentTimeMillis(); - checkLowReplication(now); + long now = System.nanoTime(); + checkLowReplication(); boolean periodic = false; - if (!rollLog.get()) { - periodic = (now - this.lastrolltime) > this.rollperiod; - if (!periodic) { - synchronized (rollLog) { + // TODO: what is this lock for? LogRoller is a thread, there's only one of it, + // and this is the only usage. Keeping it for now. + ArrayList> rollRequests = null; + try { + Collection currentWalsToRoll = null; + // Unset the flags atomically, so that if another roll is requested after writer is + // replaced but before rollWriter is done (e.g. during archiving), we don't lose it. + synchronized (rollLogRequestLock) { + if (!allWalsNeedToRollRequests.isEmpty()) { + // Administrator, or some procedure wants to roll all WALs. + currentWalsToRoll = wals.keySet(); + rollRequests = new ArrayList<>(allWalsNeedToRollRequests); + allWalsNeedToRollRequests.clear(); + } else if (!walsNeedToRoll.isEmpty()) { + // Some WALs want to be rolled (e.g. based on size). + currentWalsToRoll = new ArrayList<>(walsNeedToRoll.keySet()); + } else if ((now - this.lastrolltime) > TimeUnit.MILLISECONDS.toNanos(this.rollperiod)) { + // Roll all WALs due to a timeout since the last roll. + currentWalsToRoll = wals.keySet(); + periodic = true; + } + walsNeedToRoll.clear(); // Clear current requests, no matter how we roll. + + if (currentWalsToRoll == null) { // Nothing to do; wait. try { - if (!rollLog.get()) { - rollLog.wait(this.threadWakeFrequency); - } + rollLogRequestLock.wait(this.threadWakeFrequency); } catch (InterruptedException e) { // Fall through } + continue; } - continue; } - // Time for periodic roll - LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); - } else { - LOG.debug("WAL roll requested"); - } - rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH - try { + + if (periodic) { + LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); + } else { + LOG.debug("WAL roll requested"); + } + this.lastrolltime = now; - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); - final WAL wal = entry.getKey(); - // Force the roll if the logroll.period is elapsed or if a roll was requested. + for (WAL wal : currentWalsToRoll) { // The returned value is an array of actual region names. try { - final byte[][] regionsToFlush = - wal.rollWriter(periodic || entry.getValue().booleanValue()); - walNeedsRoll.put(wal, Boolean.FALSE); + final byte[][] regionsToFlush = wal.rollWriter(true); if (regionsToFlush != null) { for (byte[] r : regionsToFlush) { scheduleFlush(r); @@ -197,6 +221,12 @@ public class LogRoller extends HasThread implements Closeable { } } catch (WALClosedException e) { LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); + wals.remove(wal); + } + } + if (rollRequests != null) { + for (Iterator> iter = rollRequests.iterator(); iter.hasNext(); ) { + iter.next().set(null); // Notify the caller that the roll of all logs is done. iter.remove(); } } @@ -212,10 +242,12 @@ public class LogRoller extends HasThread implements Closeable { LOG.error("Log rolling failed", ex); abort("Log rolling failed", ex); } finally { - try { - rollLog.set(false); - } finally { - rollLock.unlock(); + // The old semantics of requestRollAll are to wait forever, so put these back on error. + if (rollRequests != null && !rollRequests.isEmpty()) { + synchronized (rollLogRequestLock) { + this.allWalsNeedToRollRequests.addAll(rollRequests); + // No need to notify, we'll see them again in the next iteration. + } } } } @@ -244,25 +276,15 @@ public class LogRoller extends HasThread implements Closeable { } /** + * Test method - doesn't do any syncronization so it can be racy. * @return true if all WAL roll finished */ + @VisibleForTesting public boolean walRollFinished() { - for (boolean needRoll : walNeedsRoll.values()) { - if (needRoll) { - return false; - } - } - return true; + return walsNeedToRoll.isEmpty() && allWalsNeedToRollRequests.isEmpty(); } - /** - * Wait until all wals have been rolled after calling {@link #requestRollAll()}. - */ - public void waitUntilWalRollFinished() throws InterruptedException { - while (!walRollFinished()) { - Thread.sleep(100); - } - } + @Override public void close() { @@ -271,7 +293,7 @@ public class LogRoller extends HasThread implements Closeable { } @VisibleForTesting - Map getWalNeedsRoll() { - return this.walNeedsRoll; + Collection getWals() { + return this.wals.keySet(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index df84dcf57f..ce4aedc28a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2293,6 +2293,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws ServiceException */ @Override + @SuppressWarnings("FutureReturnValueIgnored") // This API is fire-and-forget. public RollWALWriterResponse rollWALWriter(final RpcController controller, final RollWALWriterRequest request) throws ServiceException { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index d01b130503..896413bb78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.LogRoller; @@ -202,14 +204,18 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { // data inconsistency. So here we need to roll the wal, and let the ReplicationSource // track the new wal file, and throw the old wal files away. LogRoller roller = rs.getWalRoller(); - roller.requestRollAll(); + Future future = roller.requestRollAll(); try { - roller.waitUntilWalRollFinished(); + future.get(); } catch (InterruptedException e) { // reset the interrupted flag Thread.currentThread().interrupt(); throw (IOException) new InterruptedIOException( "Interrupted while waiting for wal roll finish").initCause(e); + } catch (ExecutionException e) { + // This shouldn't currently happen; however, it may be valid for + // the WAL roll to fail in future. + throw new IOException("Failed to roll WALs", e); } } SyncReplicationState oldState = peer.getSyncReplicationState(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java index 7892d4478f..53fb324df6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -66,7 +66,7 @@ public class TestLogRoller { HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); Configuration conf = rs.getConfiguration(); LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller(); - int originalSize = logRoller.getWalNeedsRoll().size(); + int originalSize = logRoller.getWals().size(); FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(), AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); logRoller.addWAL(wal1); @@ -77,19 +77,19 @@ public class TestLogRoller { AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); logRoller.addWAL(wal3); - assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size()); - assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1)); + assertEquals(originalSize + 3, logRoller.getWals().size()); + assertTrue(logRoller.getWals().contains(wal1)); wal1.close(); Thread.sleep(2 * logRollPeriod); - assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size()); - assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1)); + assertEquals(originalSize + 2, logRoller.getWals().size()); + assertFalse(logRoller.getWals().contains(wal1)); wal2.close(); wal3.close(); Thread.sleep(2 * logRollPeriod); - assertEquals(originalSize, logRoller.getWalNeedsRoll().size()); + assertEquals(originalSize, logRoller.getWals().size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 1b98518728..62329c632f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -167,6 +167,7 @@ public class SerialReplicationTestBase { for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { t.getRegionServer().getWalRoller().requestRollAll(); } + // TODO: change to use futures UTIL.waitFor(30000, new ExplainingPredicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java index 672564965e..7147d97164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java @@ -77,8 +77,7 @@ public class TestSerialSyncReplication extends SyncReplicationTestBase { // will not replay this wal when transiting to DA. for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) { LogRoller roller = t.getRegionServer().getWalRoller(); - roller.requestRollAll(); - roller.waitUntilWalRollFinished(); + roller.requestRollAll().get(); } waitUntilDeleted(UTIL2, remoteWAL);