diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java index 575be39451..992b9017e3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.backup.regionserver; import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; @@ -92,10 +94,14 @@ public class LogRollBackupSubprocedure extends Subprocedure { LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum + " highest: " + highest + " on " + rss.getServerName()); - ((HRegionServer) rss).getWalRoller().requestRollAll(); + Future f = ((HRegionServer) rss).getWalRoller().requestRollAll(); long start = EnvironmentEdgeManager.currentTime(); - while (!((HRegionServer) rss).getWalRoller().walRollFinished()) { - Thread.sleep(20); + try { + f.get(); + } catch (ExecutionException ex) { + // Shouldn't currently happen, but could be a valid path (roll failed). + LOG.error("Log rolling failed", ex); + throw ex; } LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start)); LOG.info("After roll log in backup subprocedure, current log number: " + fsWAL.getFilenum() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 05a8fdf778..2600a11814 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -20,14 +20,16 @@ package org.apache.hadoop.hbase.regionserver; import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.hbase.HConstants; +import com.google.common.util.concurrent.SettableFuture;import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; @@ -56,8 +58,19 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti public class LogRoller extends HasThread implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); - private final AtomicBoolean rollLog = new AtomicBoolean(false); - private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap<>(); + + private final Object rollLogRequestLock = new Object(); + private boolean anyWalNeedsRoll = false; + // Admin/procedure requests to roll all WALs. + private LinkedList> allWalsNeedToRollRequests = new LinkedList<>(); + // Note: this map serves a dual purpose... first, it's used in places that merely want to + // iterate all the WALs and presumably don't care about parallel changes; CHM sync + // is used for that. It's also used as a per-WAL equivalent of anyWalNeedsRoll; that + // (i.e. any code changing values) is synchronized via rollLogRequestLock to avoid + // an ABA (sortof) problem where we may lose multiple valid requests. + // TODO: perhaps split into two structures? + private final ConcurrentHashMap walToForceRoll = new ConcurrentHashMap<>(); + private final Server server; protected final RegionServerServices services; private volatile long lastrolltime = System.currentTimeMillis(); @@ -70,29 +83,30 @@ public class LogRoller extends HasThread implements Closeable { private volatile boolean running = true; public void addWAL(final WAL wal) { - if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { + // TODO: potentially races with remove in run()? esp. the put-true below. + if (null == walToForceRoll.putIfAbsent(wal, Boolean.FALSE)) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(boolean lowReplicas) { - walNeedsRoll.put(wal, Boolean.TRUE); // TODO logs will contend with each other here, replace with e.g. DelayedQueue - synchronized(rollLog) { - rollLog.set(true); - rollLog.notifyAll(); + // ...if contention is observed empirically + synchronized (rollLogRequestLock) { + anyWalNeedsRoll = true; + walToForceRoll.put(wal, Boolean.TRUE); + rollLogRequestLock.notifyAll(); } } }); } } - public void requestRollAll() { - for (WAL wal : walNeedsRoll.keySet()) { - walNeedsRoll.put(wal, Boolean.TRUE); - } - synchronized(rollLog) { - rollLog.set(true); - rollLog.notifyAll(); + public Future requestRollAll() { + SettableFuture result = SettableFuture.create(); + synchronized (rollLogRequestLock) { + allWalsNeedToRollRequests.add(result); + rollLogRequestLock.notifyAll(); } + return result; } /** @param server */ @@ -110,9 +124,9 @@ public class LogRoller extends HasThread implements Closeable { @Override public void interrupt() { - // Wake up if we are waiting on rollLog. For tests. - synchronized (rollLog) { - this.rollLog.notify(); + // Wake up if we are waiting on anyWalNeedsRoll. For tests. + synchronized(rollLogRequestLock) { + rollLogRequestLock.notifyAll(); } super.interrupt(); } @@ -122,9 +136,9 @@ public class LogRoller extends HasThread implements Closeable { */ void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry entry : walToForceRoll.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue(); // Best-effort; no lock. if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -140,7 +154,7 @@ public class LogRoller extends HasThread implements Closeable { // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it // is already broken. - for (WAL wal : walNeedsRoll.keySet()) { + for (WAL wal : walToForceRoll.keySet()) { // shutdown rather than close here since we are going to abort the RS and the wals need to be // split when recovery try { @@ -155,48 +169,63 @@ public class LogRoller extends HasThread implements Closeable { @Override public void run() { while (running) { - long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); // TODO: use nanoTime; not reliable checkLowReplication(now); boolean periodic = false; - if (!rollLog.get()) { - periodic = (now - this.lastrolltime) > this.rollperiod; - if (!periodic) { - synchronized (rollLog) { + // TODO: what is this lock for? LogRoller is a thread, there's only one of it, + // and this is the only usage. Keeping it for now. + rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH + ReentrantLock lockTaken = rollLock; + LinkedList> rollRequests = null; + try { + // TODO: why do we need to roll the non-requested ones? seems unnecessary + Collection walsToForceRoll = null, walsToNoForceRoll = null; + synchronized (rollLogRequestLock) { + // Unset the flags atomically, so that if another roll is requested after writer is + // replaced but before rollWriter is done (e.g. during archiving), we don't lose it. + if (!allWalsNeedToRollRequests.isEmpty()) { + // Administrator, or some procedure wants to roll all WALs. + rollRequests = new LinkedList<>(allWalsNeedToRollRequests); + allWalsNeedToRollRequests.clear(); + walsToForceRoll = prepareToRollAllWalsUnderReqLock(); + } else if (anyWalNeedsRoll) { + // Some WALs want to be rolled (e.g. based on size). + anyWalNeedsRoll = false; + walsToForceRoll = new LinkedList<>(); + walsToNoForceRoll = new LinkedList<>(); + for (Entry e : walToForceRoll.entrySet()) { + // Separate WALs into those where flush was requested, and those where it wasn't. Unset flags under lock. + (e.getValue() ? walsToForceRoll : walsToNoForceRoll).add(e.getKey()); + e.setValue(Boolean.FALSE); + } + } else if (periodic = ((now - this.lastrolltime) > this.rollperiod)) { + // Roll all WALs due to a timeout since the last roll. + walsToForceRoll = prepareToRollAllWalsUnderReqLock(); + } else { + // Nothing to do; release the global lock and wait. + lockTaken.unlock(); + lockTaken = null; try { - if (!rollLog.get()) { - rollLog.wait(this.threadWakeFrequency); - } + rollLogRequestLock.wait(this.threadWakeFrequency); } catch (InterruptedException e) { // Fall through } + continue; } - continue; } - // Time for periodic roll - LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); - } else { - LOG.debug("WAL roll requested"); - } - rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH - try { + + if (periodic) { + LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); + } else { + LOG.debug("WAL roll requested"); + } + this.lastrolltime = now; - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); - final WAL wal = entry.getKey(); - // Force the roll if the logroll.period is elapsed or if a roll was requested. - // The returned value is an array of actual region names. - try { - final byte[][] regionsToFlush = - wal.rollWriter(periodic || entry.getValue().booleanValue()); - walNeedsRoll.put(wal, Boolean.FALSE); - if (regionsToFlush != null) { - for (byte[] r : regionsToFlush) { - scheduleFlush(r); - } - } - } catch (WALClosedException e) { - LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); + rollWals(walsToForceRoll, true); + rollWals(walsToNoForceRoll, false); + if (rollRequests != null) { + for (Iterator> iter = rollRequests.iterator(); ; iter.hasNext()) { + iter.next().set(null); // Notify the caller that the roll of all logs is done. iter.remove(); } } @@ -212,16 +241,50 @@ public class LogRoller extends HasThread implements Closeable { LOG.error("Log rolling failed", ex); abort("Log rolling failed", ex); } finally { - try { - rollLog.set(false); - } finally { - rollLock.unlock(); + if (lockTaken != null) { + lockTaken.unlock(); + } + // The old semantics of requestRollAll are to wait forever, so put these back on error. + if (rollRequests != null && !rollRequests.isEmpty()) { + synchronized (rollLogRequestLock) { + this.allWalsNeedToRollRequests.addAll(rollRequests); + // No need to notify, we'll see them again in the next iteration. + } } + } } LOG.info("LogRoller exiting."); } + private Collection prepareToRollAllWalsUnderReqLock() { + anyWalNeedsRoll = false; + // We are rolling all WALs, so just unset outstanding roll requests; + // we don't care about precision w.r.t. the WAL set. + for (Entry e : walToForceRoll.entrySet()) { + e.setValue(Boolean.FALSE); + } + return walToForceRoll.keySet(); + } + + private void rollWals(Collection wals, boolean force) throws IOException { + if (wals == null) return; + for (WAL wal : wals) { + // The returned value is an array of actual region names. + try { + final byte[][] regionsToFlush = wal.rollWriter(force); + if (regionsToFlush != null) { + for (byte[] r : regionsToFlush) { + scheduleFlush(r); + } + } + } catch (WALClosedException e) { + LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); + walToForceRoll.remove(wal); // TODO: potentially races with puts + } + } + } + /** * @param encodedRegionName Encoded name of region to flush. */ @@ -244,10 +307,12 @@ public class LogRoller extends HasThread implements Closeable { } /** + * Test method - doesn't do any syncronization so it can be racy. * @return true if all WAL roll finished */ + @VisibleForTesting public boolean walRollFinished() { - for (boolean needRoll : walNeedsRoll.values()) { + for (boolean needRoll : walToForceRoll.values()) { if (needRoll) { return false; } @@ -255,14 +320,7 @@ public class LogRoller extends HasThread implements Closeable { return true; } - /** - * Wait until all wals have been rolled after calling {@link #requestRollAll()}. - */ - public void waitUntilWalRollFinished() throws InterruptedException { - while (!walRollFinished()) { - Thread.sleep(100); - } - } + @Override public void close() { @@ -272,6 +330,6 @@ public class LogRoller extends HasThread implements Closeable { @VisibleForTesting Map getWalNeedsRoll() { - return this.walNeedsRoll; + return this.walToForceRoll; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index d01b130503..c6e518a2e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.LogRoller; @@ -202,14 +204,17 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { // data inconsistency. So here we need to roll the wal, and let the ReplicationSource // track the new wal file, and throw the old wal files away. LogRoller roller = rs.getWalRoller(); - roller.requestRollAll(); + Future future = roller.requestRollAll(); try { - roller.waitUntilWalRollFinished(); + future.get(); } catch (InterruptedException e) { // reset the interrupted flag Thread.currentThread().interrupt(); throw (IOException) new InterruptedIOException( "Interrupted while waiting for wal roll finish").initCause(e); + } catch (ExecutionException e) { + // This shouldn't currently happen; however it may be valid for the WAL roll to fail in future. + throw new IOException("Failed to roll WALs", e); } } SyncReplicationState oldState = peer.getSyncReplicationState(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 1b98518728..62329c632f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -167,6 +167,7 @@ public class SerialReplicationTestBase { for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { t.getRegionServer().getWalRoller().requestRollAll(); } + // TODO: change to use futures UTIL.waitFor(30000, new ExplainingPredicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java index 672564965e..7147d97164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java @@ -77,8 +77,7 @@ public class TestSerialSyncReplication extends SyncReplicationTestBase { // will not replay this wal when transiting to DA. for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) { LogRoller roller = t.getRegionServer().getWalRoller(); - roller.requestRollAll(); - roller.waitUntilWalRollFinished(); + roller.requestRollAll().get(); } waitUntilDeleted(UTIL2, remoteWAL);