diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java index 575be39451..992b9017e3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.backup.regionserver; import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; @@ -92,10 +94,14 @@ public class LogRollBackupSubprocedure extends Subprocedure { LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum + " highest: " + highest + " on " + rss.getServerName()); - ((HRegionServer) rss).getWalRoller().requestRollAll(); + Future f = ((HRegionServer) rss).getWalRoller().requestRollAll(); long start = EnvironmentEdgeManager.currentTime(); - while (!((HRegionServer) rss).getWalRoller().walRollFinished()) { - Thread.sleep(20); + try { + f.get(); + } catch (ExecutionException ex) { + // Shouldn't currently happen, but could be a valid path (roll failed). + LOG.error("Log rolling failed", ex); + throw ex; } LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start)); LOG.info("After roll log in backup subprocedure, current log number: " + fsWAL.getFilenum() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 05a8fdf778..9206d71b4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -20,12 +20,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayDeque;import java.util.ArrayList;import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; @@ -40,7 +41,10 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.SettableFuture; + /** * Runs periodically to determine if the WAL should be rolled. @@ -56,11 +60,22 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti public class LogRoller extends HasThread implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); - private final AtomicBoolean rollLog = new AtomicBoolean(false); + + private final Object rollLogRequestLock = new Object(); + private boolean anyWalNeedsRoll = false; + // Admin/procedure requests to roll all WALs. + private ArrayDeque> allWalsNeedToRollRequests = new ArrayDeque<>(); + // Note: this map serves a dual purpose... first, it's used in places that merely want to + // iterate all the WALs and presumably don't care about parallel changes; CHM sync + // is used for that. It's also used as a per-WAL equivalent of anyWalNeedsRoll; that + // (i.e. any code changing values) is synchronized via rollLogRequestLock to avoid + // an ABA (sortof) problem where we may lose multiple valid requests. + // TODO: perhaps split into two structures? private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap<>(); + private final Server server; protected final RegionServerServices services; - private volatile long lastrolltime = System.currentTimeMillis(); + private volatile long lastrolltime = System.nanoTime(); // Period to roll log. private final long rollperiod; private final int threadWakeFrequency; @@ -70,29 +85,30 @@ public class LogRoller extends HasThread implements Closeable { private volatile boolean running = true; public void addWAL(final WAL wal) { + // TODO: potentially races with remove in run()? esp. the put-true below. if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(boolean lowReplicas) { - walNeedsRoll.put(wal, Boolean.TRUE); // TODO logs will contend with each other here, replace with e.g. DelayedQueue - synchronized(rollLog) { - rollLog.set(true); - rollLog.notifyAll(); + // if contention is observed empirically + synchronized (rollLogRequestLock) { + anyWalNeedsRoll = true; + walNeedsRoll.put(wal, Boolean.TRUE); + rollLogRequestLock.notifyAll(); } } }); } } - public void requestRollAll() { - for (WAL wal : walNeedsRoll.keySet()) { - walNeedsRoll.put(wal, Boolean.TRUE); - } - synchronized(rollLog) { - rollLog.set(true); - rollLog.notifyAll(); + public Future requestRollAll() { + SettableFuture result = SettableFuture.create(); + synchronized (rollLogRequestLock) { + allWalsNeedToRollRequests.add(result); + rollLogRequestLock.notifyAll(); } + return result; } /** @param server */ @@ -110,9 +126,9 @@ public class LogRoller extends HasThread implements Closeable { @Override public void interrupt() { - // Wake up if we are waiting on rollLog. For tests. - synchronized (rollLog) { - this.rollLog.notify(); + // Wake up if we are waiting on anyWalNeedsRoll. For tests. + synchronized(rollLogRequestLock) { + rollLogRequestLock.notifyAll(); } super.interrupt(); } @@ -120,11 +136,11 @@ public class LogRoller extends HasThread implements Closeable { /** * we need to check low replication in period, see HBASE-18132 */ - void checkLowReplication(long now) { + void checkLowReplication() { try { for (Entry entry : walNeedsRoll.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue(); // Best-effort; no lock. if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -137,9 +153,9 @@ public class LogRoller extends HasThread implements Closeable { private void abort(String reason, Throwable cause) { // close all WALs before calling abort on RS. - // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we - // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it - // is already broken. + // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if + // we failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that + // it is already broken. for (WAL wal : walNeedsRoll.keySet()) { // shutdown rather than close here since we are going to abort the RS and the wals need to be // split when recovery @@ -155,41 +171,63 @@ public class LogRoller extends HasThread implements Closeable { @Override public void run() { while (running) { - long now = System.currentTimeMillis(); - checkLowReplication(now); + long now = System.nanoTime(); + checkLowReplication(); boolean periodic = false; - if (!rollLog.get()) { - periodic = (now - this.lastrolltime) > this.rollperiod; - if (!periodic) { - synchronized (rollLog) { - try { - if (!rollLog.get()) { - rollLog.wait(this.threadWakeFrequency); + // TODO: what is this lock for? LogRoller is a thread, there's only one of it, + // and this is the only usage. Keeping it for now. + rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH + ReentrantLock lockTaken = rollLock; + ArrayList> rollRequests = null; + try { + Collection walsToRoll = null; + synchronized (rollLogRequestLock) { + // Unset the flags atomically, so that if another roll is requested after writer is + // replaced but before rollWriter is done (e.g. during archiving), we don't lose it. + if (!allWalsNeedToRollRequests.isEmpty()) { + // Administrator, or some procedure wants to roll all WALs. + rollRequests = new ArrayList<>(allWalsNeedToRollRequests); + allWalsNeedToRollRequests.clear(); + walsToRoll = prepareToRollAllWalsUnderReqLock(); + } else if (anyWalNeedsRoll) { + // Some WALs want to be rolled (e.g. based on size). + anyWalNeedsRoll = false; + walsToRoll = new ArrayList<>(); + for (Entry e : walNeedsRoll.entrySet()) { + // Only flush WALs where a flush was requested; not others. Unset flags under lock. + if (e.getValue()) { + walsToRoll.add(e.getKey()); + e.setValue(Boolean.FALSE); } + } + } else if ((now - this.lastrolltime) > TimeUnit.MILLISECONDS.toNanos(this.rollperiod)) { + // Roll all WALs due to a timeout since the last roll. + periodic = true; + walsToRoll = prepareToRollAllWalsUnderReqLock(); + } else { + // Nothing to do; release the global lock and wait. + lockTaken.unlock(); + lockTaken = null; + try { + rollLogRequestLock.wait(this.threadWakeFrequency); } catch (InterruptedException e) { // Fall through } + continue; } - continue; } - // Time for periodic roll - LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); - } else { - LOG.debug("WAL roll requested"); - } - rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH - try { + + if (periodic) { + LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); + } else { + LOG.debug("WAL roll requested"); + } + this.lastrolltime = now; - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); - final WAL wal = entry.getKey(); - // Force the roll if the logroll.period is elapsed or if a roll was requested. + for (WAL wal : walsToRoll) { // The returned value is an array of actual region names. try { - final byte[][] regionsToFlush = - wal.rollWriter(periodic || entry.getValue().booleanValue()); - walNeedsRoll.put(wal, Boolean.FALSE); + final byte[][] regionsToFlush = wal.rollWriter(true); if (regionsToFlush != null) { for (byte[] r : regionsToFlush) { scheduleFlush(r); @@ -197,6 +235,12 @@ public class LogRoller extends HasThread implements Closeable { } } catch (WALClosedException e) { LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); + walNeedsRoll.remove(wal); // TODO: potentially races with puts + } + } + if (rollRequests != null) { + for (Iterator> iter = rollRequests.iterator(); iter.hasNext(); ) { + iter.next().set(null); // Notify the caller that the roll of all logs is done. iter.remove(); } } @@ -212,16 +256,32 @@ public class LogRoller extends HasThread implements Closeable { LOG.error("Log rolling failed", ex); abort("Log rolling failed", ex); } finally { - try { - rollLog.set(false); - } finally { - rollLock.unlock(); + if (lockTaken != null) { + lockTaken.unlock(); } + // The old semantics of requestRollAll are to wait forever, so put these back on error. + if (rollRequests != null && !rollRequests.isEmpty()) { + synchronized (rollLogRequestLock) { + this.allWalsNeedToRollRequests.addAll(rollRequests); + // No need to notify, we'll see them again in the next iteration. + } + } + } } LOG.info("LogRoller exiting."); } + private Collection prepareToRollAllWalsUnderReqLock() { + anyWalNeedsRoll = false; + // We are rolling all WALs, so just unset outstanding roll requests; + // we don't care about precision w.r.t. the WAL set. + for (Entry e : walNeedsRoll.entrySet()) { + e.setValue(Boolean.FALSE); + } + return walNeedsRoll.keySet(); + } + /** * @param encodedRegionName Encoded name of region to flush. */ @@ -244,8 +304,10 @@ public class LogRoller extends HasThread implements Closeable { } /** + * Test method - doesn't do any syncronization so it can be racy. * @return true if all WAL roll finished */ + @VisibleForTesting public boolean walRollFinished() { for (boolean needRoll : walNeedsRoll.values()) { if (needRoll) { @@ -255,14 +317,7 @@ public class LogRoller extends HasThread implements Closeable { return true; } - /** - * Wait until all wals have been rolled after calling {@link #requestRollAll()}. - */ - public void waitUntilWalRollFinished() throws InterruptedException { - while (!walRollFinished()) { - Thread.sleep(100); - } - } + @Override public void close() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index df84dcf57f..ce4aedc28a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2293,6 +2293,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws ServiceException */ @Override + @SuppressWarnings("FutureReturnValueIgnored") // This API is fire-and-forget. public RollWALWriterResponse rollWALWriter(final RpcController controller, final RollWALWriterRequest request) throws ServiceException { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index d01b130503..896413bb78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.LogRoller; @@ -202,14 +204,18 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { // data inconsistency. So here we need to roll the wal, and let the ReplicationSource // track the new wal file, and throw the old wal files away. LogRoller roller = rs.getWalRoller(); - roller.requestRollAll(); + Future future = roller.requestRollAll(); try { - roller.waitUntilWalRollFinished(); + future.get(); } catch (InterruptedException e) { // reset the interrupted flag Thread.currentThread().interrupt(); throw (IOException) new InterruptedIOException( "Interrupted while waiting for wal roll finish").initCause(e); + } catch (ExecutionException e) { + // This shouldn't currently happen; however, it may be valid for + // the WAL roll to fail in future. + throw new IOException("Failed to roll WALs", e); } } SyncReplicationState oldState = peer.getSyncReplicationState(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 1b98518728..62329c632f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -167,6 +167,7 @@ public class SerialReplicationTestBase { for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { t.getRegionServer().getWalRoller().requestRollAll(); } + // TODO: change to use futures UTIL.waitFor(30000, new ExplainingPredicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java index 672564965e..7147d97164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java @@ -77,8 +77,7 @@ public class TestSerialSyncReplication extends SyncReplicationTestBase { // will not replay this wal when transiting to DA. for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) { LogRoller roller = t.getRegionServer().getWalRoller(); - roller.requestRollAll(); - roller.waitUntilWalRollFinished(); + roller.requestRollAll().get(); } waitUntilDeleted(UTIL2, remoteWAL);