From 28211c28f841b073a20bb372c1244182fe002b95 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 8 Oct 2018 14:52:53 +0800 Subject: [PATCH] HBASE-21254 Need to find a way to limit the number of proc wal files --- .../hbase/procedure2/ProcedureExecutor.java | 90 +++++++++++++------ .../hbase/procedure2/store/BitSetNode.java | 23 +++++ .../procedure2/store/ProcedureStore.java | 26 +++++- .../procedure2/store/ProcedureStoreBase.java | 22 ++--- .../store/ProcedureStoreTracker.java | 13 ++- .../store/wal/WALProcedureStore.java | 16 ++-- .../apache/hadoop/hbase/master/HMaster.java | 9 +- .../master/procedure/MasterProcedureEnv.java | 21 ----- .../master/assignment/MockMasterServices.java | 9 +- 9 files changed, 155 insertions(+), 74 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 85d7e0b33c..6df29ed305 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.procedure2.Procedure.LockState; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -347,6 +349,8 @@ public class ProcedureExecutor { */ private final ProcedureScheduler scheduler; + private final ConcurrentLinkedQueue forceUpdateProcIds = new ConcurrentLinkedQueue<>(); + private final AtomicLong lastProcId = new AtomicLong(-1); private final AtomicLong workerId = new AtomicLong(0); private final AtomicInteger activeExecutorCount = new AtomicInteger(0); @@ -377,7 +381,15 @@ public class ProcedureExecutor { this.conf = conf; this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); refreshConfiguration(conf); + store.registerListener(new ProcedureStoreListener() { + @Override + public void forceUpdate(long[] procIds) { + for (long procId: procIds) { + forceUpdateProcIds.add(procId); + } + } + }); } private void load(final boolean abortOnCorruption) throws IOException { @@ -1057,7 +1069,7 @@ public class ProcedureExecutor { // Now, the procedure is not finished, and no one can execute it since we take the lock now // And we can be sure that its ancestor is not running too, since their child has not // finished yet - Procedure current = procedure; + Procedure current = procedure; while (current != null) { LOG.debug("Bypassing {}", current); current.bypass(); @@ -1989,37 +2001,65 @@ public class ProcedureExecutor { scheduler.signalAll(); } + private boolean tryForceUpdateProcedure() throws IOException { + Long procIdLong = forceUpdateProcIds.poll(); + if (procIdLong == null) { + return false; + } + long procId = procIdLong.longValue(); + IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); + try { + Procedure proc = procedures.get(procId); + if (proc == null || proc.isFinished()) { + return false; + } + store.update(proc); + } finally { + procExecutionLock.releaseLockEntry(lockEntry); + } + return true; + } + + private boolean tryExecuteProcedure() throws IOException { + @SuppressWarnings("unchecked") + Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + if (proc == null) { + return false; + } + this.activeProcedure = proc; + int activeCount = activeExecutorCount.incrementAndGet(); + int runningCount = store.setRunningProcedureCount(activeCount); + LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), + runningCount, activeCount); + executionStartTime.set(EnvironmentEdgeManager.currentTime()); + IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); + try { + executeProcedure(proc); + } catch (AssertionError e) { + LOG.info("ASSERT pid=" + proc.getProcId(), e); + throw e; + } finally { + procExecutionLock.releaseLockEntry(lockEntry); + activeCount = activeExecutorCount.decrementAndGet(); + runningCount = store.setRunningProcedureCount(activeCount); + LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), + runningCount, activeCount); + this.activeProcedure = null; + executionStartTime.set(Long.MAX_VALUE); + } + return true; + } + @Override public void run() { long lastUpdate = EnvironmentEdgeManager.currentTime(); try { while (isRunning() && keepAlive(lastUpdate)) { - @SuppressWarnings("unchecked") - Procedure proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (proc == null) { - continue; + if (tryForceUpdateProcedure()) { + lastUpdate = EnvironmentEdgeManager.currentTime(); } - this.activeProcedure = proc; - int activeCount = activeExecutorCount.incrementAndGet(); - int runningCount = store.setRunningProcedureCount(activeCount); - LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), - runningCount, activeCount); - executionStartTime.set(EnvironmentEdgeManager.currentTime()); - IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); - try { - executeProcedure(proc); - } catch (AssertionError e) { - LOG.info("ASSERT pid=" + proc.getProcId(), e); - throw e; - } finally { - procExecutionLock.releaseLockEntry(lockEntry); - activeCount = activeExecutorCount.decrementAndGet(); - runningCount = store.setRunningProcedureCount(activeCount); - LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), - runningCount, activeCount); - this.activeProcedure = null; + if (tryExecuteProcedure()) { lastUpdate = EnvironmentEdgeManager.currentTime(); - executionStartTime.set(Long.MAX_VALUE); } } } catch (Throwable t) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java index b76c026d01..d14a69324a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.procedure2.store; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState; import org.apache.yetus.audience.InterfaceAudience; @@ -196,6 +198,27 @@ class BitSetNode { return true; } + /** + * @return all the active procedure ids in this bit set. + */ + public long[] getActiveProcIds() { + List procIds = new ArrayList<>(); + for (int wordIndex = 0; wordIndex < modified.length; wordIndex++) { + if (deleted[wordIndex] == WORD_MASK || modified[wordIndex] == 0) { + // This should be the common case, where most procedures has been deleted. + continue; + } + long baseProcId = getStart() + (wordIndex << ADDRESS_BITS_PER_WORD); + for (int i = 0; i < (1 << ADDRESS_BITS_PER_WORD); i++) { + long mask = 1 << i; + if ((deleted[wordIndex] & mask) == 0 && (modified[wordIndex] & mask) == 1) { + procIds.add(baseProcId + i); + } + } + } + return procIds.stream().mapToLong(Long::longValue).toArray(); + } + /** * @return true, if there are no active procedures in this BitSetNode, else false. */ diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 8063b125ba..0599acfcc8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -34,19 +34,37 @@ import org.apache.hadoop.hbase.procedure2.Procedure; public interface ProcedureStore { /** * Store listener interface. + *

* The main process should register a listener and respond to the store events. */ public interface ProcedureStoreListener { + /** * triggered when the store sync is completed. */ - void postSync(); + default void postSync() { + } + + /** + * triggered when the store is not able to write out data. the main process should abort. + */ + default void abortProcess() { + } /** - * triggered when the store is not able to write out data. - * the main process should abort. + * Suggest that the upper layer should update the state of some procedures. Ignore this call + * will not effect correctness but performance. + *

+ * For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file + * have been deleted, or updated later in another WAL file, then we can delete the WAL file. If + * there are old procedures in a WAL file which are never deleted or updated, then we can not + * delete the WAL file and this will cause we hold lots of WAL file and slow down the master + * restarts. So here we introduce this method to tell the upper layer that please update the + * states of these procedures so that we can delete the old WAL file. + * @param procIds the id for the procedures */ - void abortProcess(); + default void forceUpdate(long[] procIds) { + } } /** diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java index 90da9331f0..b1a8d3d1a2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java @@ -15,12 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -58,19 +56,15 @@ public abstract class ProcedureStoreBase implements ProcedureStore { return listeners.remove(listener); } - protected void sendPostSyncSignal() { - if (!this.listeners.isEmpty()) { - for (ProcedureStoreListener listener : this.listeners) { - listener.postSync(); - } - } + protected final void sendPostSyncSignal() { + listeners.forEach(ProcedureStoreListener::postSync); + } + + protected final void sendAbortProcessSignal() { + listeners.forEach(ProcedureStoreListener::abortProcess); } - protected void sendAbortProcessSignal() { - if (!this.listeners.isEmpty()) { - for (ProcedureStoreListener listener : this.listeners) { - listener.abortProcess(); - } - } + protected final void sendForceUpdateSignal(long[] procIds) { + listeners.forEach(l -> l.forceUpdate(procIds)); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 361419ab48..f98c766d77 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -23,9 +23,10 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; - +import java.util.stream.LongStream; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** @@ -335,6 +336,16 @@ public class ProcedureStoreTracker { return true; } + /** + * Will be used when there are too many proc wal files. We will rewrite the states of the active + * procedures in the oldest proc wal file so that we can delete it. + * @return all the active procedure ids in this tracker. + */ + public long[] getAllActiveProcIds() { + return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0) + .flatMapToLong(LongStream::of).toArray(); + } + /** * Clears the list of updated procedure ids. This doesn't affect global list of active * procedure ids. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 00dfe85c1c..47fdd3a2ff 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -496,7 +496,9 @@ public class WALProcedureStore extends ProcedureStoreBase { private void tryCleanupLogsOnLoad() { // nothing to cleanup. - if (logs.size() <= 1) return; + if (logs.size() <= 1) { + return; + } // the config says to not cleanup wals on load. if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, @@ -1041,7 +1043,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @VisibleForTesting - boolean rollWriter(final long logId) throws IOException { + boolean rollWriter(long logId) throws IOException { assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); @@ -1059,10 +1061,10 @@ public class WALProcedureStore extends ProcedureStoreBase { try { newStream = CommonFSUtils.createForWal(fs, newLogFile, false); } catch (FileAlreadyExistsException e) { - LOG.error("Log file with id=" + logId + " already exists", e); + LOG.error("Log file with id={} already exists", logId, e); return false; } catch (RemoteException re) { - LOG.warn("failed to create log file with id=" + logId, re); + LOG.warn("failed to create log file with id={}", logId, re); return false; } // After we create the stream but before we attempt to use it at all @@ -1099,9 +1101,9 @@ public class WALProcedureStore extends ProcedureStoreBase { if (logs.size() == 2) { buildHoldingCleanupTracker(); } else if (logs.size() > walCountWarnThreshold) { - LOG.warn("procedure WALs count=" + logs.size() + - " above the warning threshold " + walCountWarnThreshold + - ". check running procedures to see if something is stuck."); + LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" + + " to see if something is stuck.", logs.size(), walCountWarnThreshold); + sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); } LOG.info("Rolled new Procedure Store WAL, id={}", logId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8ae8be35ed..d653b7df79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -161,6 +161,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.MasterQuotasObserver; @@ -1480,7 +1481,13 @@ public class HMaster extends HRegionServer implements MasterServices { MasterProcedureEnv procEnv = new MasterProcedureEnv(this); procedureStore = new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); - procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); + procedureStore.registerListener(new ProcedureStoreListener() { + + @Override + public void abortProcess() { + abort("The Procedure Store lost the lease", null); + } + }); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler); configurationManager.registerObserver(procEnv); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 2c01b166b4..cd402344c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -73,26 +72,6 @@ public class MasterProcedureEnv implements ConfigurationObserver { } } - @InterfaceAudience.Private - public static class MasterProcedureStoreListener - implements ProcedureStore.ProcedureStoreListener { - private final MasterServices master; - - public MasterProcedureStoreListener(final MasterServices master) { - this.master = master; - } - - @Override - public void postSync() { - // no-op - } - - @Override - public void abortProcess() { - master.abort("The Procedure Store lost the lease", null); - } - } - private final RSProcedureDispatcher remoteDispatcher; private final MasterProcedureScheduler procSched; private final MasterServices master; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index 320687756b..c0dc72c39a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.zookeeper.KeeperException; @@ -220,7 +221,13 @@ public class MockMasterServices extends MockNoopMasterServices { throws IOException { final Configuration conf = getConfiguration(); this.procedureStore = new NoopProcedureStore(); - this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); + this.procedureStore.registerListener(new ProcedureStoreListener() { + + @Override + public void abortProcess() { + abort("The Procedure Store lost the lease", null); + } + }); this.procedureEnv = new MasterProcedureEnv(this, remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); -- 2.17.1