diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java index b7ea47e..fca2eac 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java @@ -224,10 +224,10 @@ public class ProcedureInfo implements Cloneable { procProto.getOwner(), procProto.getState(), procProto.hasParentId() ? procProto.getParentId() : -1, - procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null, + procProto.hasException() ? procProto.getException() : null, procProto.getLastUpdate(), procProto.getStartTime(), - procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null); + procProto.hasResult() ? procProto.getResult().toByteArray() : null); } /** diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 11073c6..74d28d7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -785,8 +785,7 @@ public class ProcedureExecutor { */ private void execLoop() { while (isRunning()) { - Long procId = runnables.poll(); - Procedure proc = procId != null ? procedures.get(procId) : null; + Procedure proc = runnables.poll(); if (proc == null) continue; try { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java deleted file mode 100644 index 242ae86..0000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import java.util.Map; - -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.ConcurrentSkipListMap; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * This class is a container of queues that allows to select a queue - * in a round robin fashion, considering priority of the queue. - * - * the quantum is just how many poll() will return the same object. - * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B - * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B - * then the object priority is just a priority * quantum - * - * Example: - * - three queues (A, B, C) with priorities (1, 1, 2) - * - The first poll() will return A - * - The second poll() will return B - * - The third and forth poll() will return C - * - and so on again and again. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureFairRunQueues { - private ConcurrentSkipListMap objMap = - new ConcurrentSkipListMap(); - - private final ReentrantLock lock = new ReentrantLock(); - private final int quantum; - - private Map.Entry current = null; - private int currentQuantum = 0; - - public interface FairObject { - boolean isAvailable(); - int getPriority(); - } - - /** - * @param quantum how many poll() will return the same object. - */ - public ProcedureFairRunQueues(final int quantum) { - this.quantum = quantum; - } - - public TQueue get(final TKey key) { - return objMap.get(key); - } - - public TQueue add(final TKey key, final TQueue queue) { - TQueue oldq = objMap.putIfAbsent(key, queue); - return oldq != null ? oldq : queue; - } - - public TQueue remove(final TKey key) { - TQueue queue = objMap.get(key); - if (queue != null) { - lock.lock(); - try { - queue = objMap.remove(key); - if (current != null && queue == current.getValue()) { - currentQuantum = 0; - current = null; - } - } finally { - lock.unlock(); - } - } - return queue; - } - - public void clear() { - lock.lock(); - try { - currentQuantum = 0; - current = null; - objMap.clear(); - } finally { - lock.unlock(); - } - } - - /** - * @return the next available item if present - */ - public TQueue poll() { - lock.lock(); - try { - TQueue queue; - if (currentQuantum == 0) { - if (nextObject() == null) { - // nothing here - return null; - } - - queue = current.getValue(); - currentQuantum = calculateQuantum(queue) - 1; - } else { - currentQuantum--; - queue = current.getValue(); - } - - if (!queue.isAvailable()) { - Map.Entry last = current; - // Try the next one - do { - if (nextObject() == null) - return null; - } while (current.getValue() != last.getValue() && !current.getValue().isAvailable()); - - queue = current.getValue(); - currentQuantum = calculateQuantum(queue) - 1; - } - - return queue; - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append('{'); - for (Map.Entry entry: objMap.entrySet()) { - builder.append(entry.getKey()); - builder.append(':'); - builder.append(entry.getValue()); - } - builder.append('}'); - return builder.toString(); - } - - private Map.Entry nextObject() { - Map.Entry next = null; - - // If we have already a key, try the next one - if (current != null) { - next = objMap.higherEntry(current.getKey()); - } - - // if there is no higher key, go back to the first - current = (next != null) ? next : objMap.firstEntry(); - return current; - } - - private int calculateQuantum(final TQueue fairObject) { - // TODO - return Math.max(1, fairObject.getPriority() * quantum); - } -} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java index 2d7ba39..65df692 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java @@ -55,9 +55,9 @@ public interface ProcedureRunnableSet { /** * Fetch one Procedure from the queue - * @return the Procedure ID to execute, or null if nothing present. + * @return the Procedure to execute, or null if nothing present. */ - Long poll(); + Procedure poll(); /** * In case the class is blocking on poll() waiting for items to be added, diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java index 7b17fb2..d23680d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { - private final Deque runnables = new ArrayDeque(); + private final Deque runnables = new ArrayDeque(); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); @@ -40,7 +40,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public void addFront(final Procedure proc) { lock.lock(); try { - runnables.addFirst(proc.getProcId()); + runnables.addFirst(proc); waitCond.signal(); } finally { lock.unlock(); @@ -51,7 +51,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public void addBack(final Procedure proc) { lock.lock(); try { - runnables.addLast(proc.getProcId()); + runnables.addLast(proc); waitCond.signal(); } finally { lock.unlock(); @@ -65,7 +65,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Long poll() { + public Procedure poll() { lock.lock(); try { if (runnables.isEmpty()) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java deleted file mode 100644 index e36a295..0000000 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.testclassification.MasterTests; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; - -@Category({MasterTests.class, SmallTests.class}) -public class TestProcedureFairRunQueues { - private static class TestRunQueue implements ProcedureFairRunQueues.FairObject { - private final int priority; - private final String name; - - private boolean available = true; - - public TestRunQueue(String name, int priority) { - this.name = name; - this.priority = priority; - } - - @Override - public String toString() { - return name; - } - - private void setAvailable(boolean available) { - this.available = available; - } - - @Override - public boolean isAvailable() { - return available; - } - - @Override - public int getPriority() { - return priority; - } - } - - @Test - public void testEmptyFairQueues() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - for (int i = 0; i < 3; ++i) { - assertEquals(null, fairq.poll()); - } - } - - @Test - public void testFairQueues() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - for (int i = 0; i < 3; ++i) { - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - } - } - - @Test - public void testFairQueuesNotAvailable() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - // m is not available - m.setAvailable(false); - for (int i = 0; i < 3; ++i) { - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - } - - // m is available - m.setAvailable(true); - for (int i = 0; i < 3; ++i) { - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - } - - // b is not available - b.setAvailable(false); - for (int i = 0; i < 3; ++i) { - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(a, fairq.poll()); - } - - assertEquals(m, fairq.poll()); - m.setAvailable(false); - // m should be fetched next, but is no longer available - assertEquals(a, fairq.poll()); - assertEquals(a, fairq.poll()); - b.setAvailable(true); - for (int i = 0; i < 3; ++i) { - assertEquals(b, fairq.poll()); - assertEquals(a, fairq.poll()); - } - } - - @Test - public void testFairQueuesDelete() throws Exception { - ProcedureFairRunQueues fairq - = new ProcedureFairRunQueues(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - // Fetch A and then remove it - assertEquals(a, fairq.poll()); - assertEquals(a, fairq.remove("A")); - - // Fetch B and then remove it - assertEquals(b, fairq.poll()); - assertEquals(b, fairq.remove("B")); - - // Fetch M and then remove it - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.remove("M")); - - // nothing left - assertEquals(null, fairq.poll()); - } -} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2431681..5bf9798 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -277,14 +278,15 @@ public class HMaster extends HRegionServer implements MasterServices { // flag set after we complete initialization once active, // it is not private since it's used in unit tests - volatile boolean initialized = false; + private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); // flag set after master services are started, // initialization may have not completed yet. volatile boolean serviceStarted = false; // flag set after we complete assignMeta. - private volatile boolean serverCrashProcessingEnabled = false; + private final ProcedureEvent serverCrashProcessingEnabled = + new ProcedureEvent("server crash processing"); LoadBalancer balancer; private RegionNormalizer normalizer; @@ -781,8 +783,10 @@ public class HMaster extends HRegionServer implements MasterServices { status.markComplete("Initialization successful"); LOG.info("Master has completed initialization"); configurationManager.registerObserver(this.balancer); + // Set master as 'initialized'. - initialized = true; + setInitialized(true); + // assign the meta replicas Set EMPTY_SET = new HashSet(); int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM, @@ -976,8 +980,8 @@ public class HMaster extends HRegionServer implements MasterServices { // servers. This is required so that if meta is assigning to a server which dies after // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be // stuck here waiting forever if waitForMeta is specified. - if (!serverCrashProcessingEnabled) { - serverCrashProcessingEnabled = true; + if (!isServerCrashProcessingEnabled()) { + setServerCrashProcessingEnabled(true); this.serverManager.processQueuedDeadServers(); } @@ -1207,7 +1211,7 @@ public class HMaster extends HRegionServer implements MasterServices { public boolean balance(boolean force) throws IOException { // if master not initialized, don't run balancer. - if (!this.initialized) { + if (!isInitialized()) { LOG.debug("Master has not been initialized, don't run balancer."); return false; } @@ -1308,7 +1312,7 @@ public class HMaster extends HRegionServer implements MasterServices { * is globally disabled) */ public boolean normalizeRegions() throws IOException { - if (!this.initialized) { + if (!isInitialized()) { LOG.debug("Master has not been initialized, don't run region normalizer."); return false; } @@ -1615,7 +1619,7 @@ public class HMaster extends HRegionServer implements MasterServices { } } - private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) + private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) throws IOException { // FIFO compaction has some requirements // Actually FCP ignores periodic major compactions @@ -1672,7 +1676,7 @@ public class HMaster extends HRegionServer implements MasterServices { } } } - + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { @@ -2300,6 +2304,15 @@ public class HMaster extends HRegionServer implements MasterServices { */ @Override public boolean isInitialized() { + return initialized.isReady(); + } + + @VisibleForTesting + public void setInitialized(boolean isInitialized) { + procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized); + } + + public ProcedureEvent getInitializedEvent() { return initialized; } @@ -2310,12 +2323,16 @@ public class HMaster extends HRegionServer implements MasterServices { */ @Override public boolean isServerCrashProcessingEnabled() { - return this.serverCrashProcessingEnabled; + return serverCrashProcessingEnabled.isReady(); } @VisibleForTesting public void setServerCrashProcessingEnabled(final boolean b) { - this.serverCrashProcessingEnabled = b; + procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); + } + + public ProcedureEvent getServerCrashProcessingEnabledEvent() { + return serverCrashProcessingEnabled; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java index 58da1d1..b57540b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -185,10 +184,8 @@ public class AddColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_ADD_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java index f934737..87b411e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java @@ -205,7 +205,9 @@ public class CreateNamespaceProcedure return true; } - return false; + if (env.waitInitialized(this)) { + return false; + } } return getTableNamespaceManager(env).acquireExclusiveLock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index 7b48f3b..d786bb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -266,7 +266,7 @@ public class CreateTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized() && !getTableName().isSystemTable()) { + if (!getTableName().isSystemTable() && env.waitInitialized(this)) { return false; } return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java index 5781ae6..7e135f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -202,10 +201,8 @@ public class DeleteColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_DELETE_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index baef112..0c43c57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -200,7 +200,7 @@ public class DeleteTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (env.waitInitialized(this)) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index 716897f..fcc1b7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; @@ -215,10 +214,8 @@ public class DisableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_DISABLE_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index bc1fc0f..d24d94b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.GeneralBulkAssigner; @@ -235,10 +234,8 @@ public class EnableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_ENABLE_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 6700b63..090b8cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.User; @@ -85,12 +87,12 @@ public class MasterProcedureEnv { } } - private final MasterProcedureQueue procQueue; + private final MasterProcedureScheduler procSched; private final MasterServices master; public MasterProcedureEnv(final MasterServices master) { this.master = master; - this.procQueue = new MasterProcedureQueue(master.getConfiguration(), + this.procSched = new MasterProcedureScheduler(master.getConfiguration(), master.getTableLockManager()); } @@ -114,8 +116,8 @@ public class MasterProcedureEnv { return master.getMasterCoprocessorHost(); } - public MasterProcedureQueue getProcedureQueue() { - return procQueue; + public MasterProcedureScheduler getProcedureQueue() { + return procSched; } public boolean isRunning() { @@ -125,4 +127,28 @@ public class MasterProcedureEnv { public boolean isInitialized() { return master.isInitialized(); } + + public boolean waitInitialized(Procedure proc) { + return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc); + } + + public boolean waitServerCrashProcessingEnabled(Procedure proc) { + return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); + } + + public void wake(ProcedureEvent event) { + procSched.wake(event); + } + + public void suspend(ProcedureEvent event) { + procSched.suspend(event); + } + + public void setEventReady(ProcedureEvent event, boolean isReady) { + if (isReady) { + procSched.wake(event); + } else { + procSched.suspend(event); + } + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java deleted file mode 100644 index c4c7747..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java +++ /dev/null @@ -1,578 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues; -import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; - -/** - * ProcedureRunnableSet for the Master Procedures. - * This RunnableSet tries to provide to the ProcedureExecutor procedures - * that can be executed without having to wait on a lock. - * Most of the master operations can be executed concurrently, if they - * are operating on different tables (e.g. two create table can be performed - * at the same, time assuming table A and table B) or against two different servers; say - * two servers that crashed at about the same time. - * - *

Each procedure should implement an interface providing information for this queue. - * for example table related procedures should implement TableProcedureInterface. - * each procedure will be pushed in its own queue, and based on the operation type - * we may take smarter decision. e.g. we can abort all the operations preceding - * a delete table, or similar. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MasterProcedureQueue implements ProcedureRunnableSet { - private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class); - - // Two queues to ensure that server procedures run ahead of table precedures always. - private final ProcedureFairRunQueues tableFairQ; - /** - * Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the - * server that was carrying meta should rise to the top of the queue (this is how it used to - * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers - * that were carrying system tables on crash; do I need to have these servers have priority? - * - *

Apart from the special-casing of meta and system tables, fairq is what we want - */ - private final ProcedureFairRunQueues serverFairQ; - - private final ReentrantLock lock = new ReentrantLock(); - private final Condition waitCond = lock.newCondition(); - private final TableLockManager lockManager; - - private final int metaTablePriority; - private final int userTablePriority; - private final int sysTablePriority; - private static final int DEFAULT_SERVER_PRIORITY = 1; - - /** - * Keeps count across server and table queues. - */ - private int queueSize; - - public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) { - this.tableFairQ = new ProcedureFairRunQueues(1); - this.serverFairQ = new ProcedureFairRunQueues(1); - this.lockManager = lockManager; - - // TODO: should this be part of the HTD? - metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); - sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); - userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); - } - - @Override - public void addFront(final Procedure proc) { - lock.lock(); - try { - getRunQueueOrCreate(proc).addFront(proc); - queueSize++; - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void addBack(final Procedure proc) { - lock.lock(); - try { - getRunQueueOrCreate(proc).addBack(proc); - queueSize++; - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void yield(final Procedure proc) { - addBack(proc); - } - - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Long poll() { - Long pollResult = null; - lock.lock(); - try { - if (queueSize == 0) { - waitCond.await(); - if (queueSize == 0) { - return null; - } - } - // For now, let server handling have precedence over table handling; presumption is that it - // is more important handling crashed servers than it is running the - // enabling/disabling tables, etc. - pollResult = doPoll(serverFairQ.poll()); - if (pollResult == null) { - pollResult = doPoll(tableFairQ.poll()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - lock.unlock(); - } - return pollResult; - } - - private Long doPoll(final RunQueue rq) { - if (rq == null || !rq.isAvailable()) return null; - this.queueSize--; - return rq.poll(); - } - - @Override - public void signalAll() { - lock.lock(); - try { - waitCond.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void clear() { - lock.lock(); - try { - serverFairQ.clear(); - tableFairQ.clear(); - queueSize = 0; - } finally { - lock.unlock(); - } - } - - @Override - public int size() { - lock.lock(); - try { - return queueSize; - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - lock.lock(); - try { - return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ + - ", serverFairQ: " + serverFairQ; - } finally { - lock.unlock(); - } - } - - @Override - public void completionCleanup(Procedure proc) { - if (proc instanceof TableProcedureInterface) { - TableProcedureInterface iProcTable = (TableProcedureInterface)proc; - boolean tableDeleted; - if (proc.hasException()) { - IOException procEx = proc.getException().unwrapRemoteException(); - if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { - // create failed because the table already exist - tableDeleted = !(procEx instanceof TableExistsException); - } else { - // the operation failed because the table does not exist - tableDeleted = (procEx instanceof TableNotFoundException); - } - } else { - // the table was deleted - tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); - } - if (tableDeleted) { - markTableAsDeleted(iProcTable.getTableName()); - } - } - // No cleanup for ServerProcedureInterface types, yet. - } - - private RunQueue getRunQueueOrCreate(final Procedure proc) { - if (proc instanceof TableProcedureInterface) { - final TableName table = ((TableProcedureInterface)proc).getTableName(); - return getRunQueueOrCreate(table); - } - if (proc instanceof ServerProcedureInterface) { - return getRunQueueOrCreate((ServerProcedureInterface)proc); - } - // TODO: at the moment we only have Table and Server procedures - // if you are implementing a non-table/non-server procedure, you have two options: create - // a group for all the non-table/non-server procedures or try to find a key for your - // non-table/non-server procedures and implement something similar to the TableRunQueue. - throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet"); - } - - private TableRunQueue getRunQueueOrCreate(final TableName table) { - final TableRunQueue queue = getRunQueue(table); - if (queue != null) return queue; - return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table)); - } - - private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) { - final ServerRunQueue queue = getRunQueue(spi.getServerName()); - if (queue != null) return queue; - return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi)); - } - - private TableRunQueue createTableRunQueue(final TableName table) { - int priority = userTablePriority; - if (table.equals(TableName.META_TABLE_NAME)) { - priority = metaTablePriority; - } else if (table.isSystemTable()) { - priority = sysTablePriority; - } - return new TableRunQueue(priority); - } - - private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) { - return new ServerRunQueue(DEFAULT_SERVER_PRIORITY); - } - - private TableRunQueue getRunQueue(final TableName table) { - return (TableRunQueue)tableFairQ.get(table); - } - - private ServerRunQueue getRunQueue(final ServerName sn) { - return (ServerRunQueue)serverFairQ.get(sn); - } - - /** - * Try to acquire the write lock on the specified table. - * other operations in the table-queue will be executed after the lock is released. - * @param table Table to lock - * @param purpose Human readable reason for locking the table - * @return true if we were able to acquire the lock on the table, otherwise false. - */ - public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) { - return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose); - } - - /** - * Release the write lock taken with tryAcquireTableWrite() - * @param table the name of the table that has the write lock - */ - public void releaseTableExclusiveLock(final TableName table) { - getRunQueue(table).releaseExclusiveLock(lockManager, table); - } - - /** - * Try to acquire the read lock on the specified table. - * other read operations in the table-queue may be executed concurrently, - * otherwise they have to wait until all the read-locks are released. - * @param table Table to lock - * @param purpose Human readable reason for locking the table - * @return true if we were able to acquire the lock on the table, otherwise false. - */ - public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) { - return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose); - } - - /** - * Release the read lock taken with tryAcquireTableRead() - * @param table the name of the table that has the read lock - */ - public void releaseTableSharedLock(final TableName table) { - getRunQueue(table).releaseSharedLock(lockManager, table); - } - - /** - * Try to acquire the write lock on the specified server. - * @see #releaseServerExclusiveLock(ServerProcedureInterface) - * @param spi Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. - */ - public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) { - return getRunQueueOrCreate(spi).tryExclusiveLock(); - } - - /** - * Release the write lock - * @see #tryAcquireServerExclusiveLock(ServerProcedureInterface) - * @param spi the server that has the write lock - */ - public void releaseServerExclusiveLock(final ServerProcedureInterface spi) { - getRunQueue(spi.getServerName()).releaseExclusiveLock(); - } - - /** - * Try to acquire the read lock on the specified server. - * @see #releaseServerSharedLock(ServerProcedureInterface) - * @param spi Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. - */ - public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) { - return getRunQueueOrCreate(spi).trySharedLock(); - } - - /** - * Release the read lock taken - * @see #tryAcquireServerSharedLock(ServerProcedureInterface) - * @param spi the server that has the read lock - */ - public void releaseServerSharedLock(final ServerProcedureInterface spi) { - getRunQueue(spi.getServerName()).releaseSharedLock(); - } - - /** - * Tries to remove the queue and the table-lock of the specified table. - * If there are new operations pending (e.g. a new create), - * the remove will not be performed. - * @param table the name of the table that should be marked as deleted - * @return true if deletion succeeded, false otherwise meaning that there are - * other new operations pending for that table (e.g. a new create). - */ - protected boolean markTableAsDeleted(final TableName table) { - TableRunQueue queue = getRunQueue(table); - if (queue != null) { - lock.lock(); - try { - if (queue.isEmpty() && queue.acquireDeleteLock()) { - tableFairQ.remove(table); - - // Remove the table lock - try { - lockManager.tableDeleted(table); - } catch (IOException e) { - LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical - } - } else { - // TODO: If there are no create, we can drop all the other ops - return false; - } - } finally { - lock.unlock(); - } - } - return true; - } - - private interface RunQueue extends ProcedureFairRunQueues.FairObject { - void addFront(Procedure proc); - void addBack(Procedure proc); - Long poll(); - boolean acquireDeleteLock(); - } - - /** - * Base abstract class for RunQueue implementations. - * Be careful honoring synchronizations in subclasses. In here we protect access but if you are - * acting on a state found in here, be sure dependent code keeps synchronization. - * Implements basic in-memory read/write locking mechanism to prevent procedure steps being run - * in parallel. - */ - private static abstract class AbstractRunQueue implements RunQueue { - // All modification of runnables happens with #lock held. - private final Deque runnables = new ArrayDeque(); - private final int priority; - private boolean exclusiveLock = false; - private int sharedLock = 0; - - public AbstractRunQueue(int priority) { - this.priority = priority; - } - - boolean isEmpty() { - return this.runnables.isEmpty(); - } - - @Override - public boolean isAvailable() { - synchronized (this) { - return !exclusiveLock && !runnables.isEmpty(); - } - } - - @Override - public int getPriority() { - return this.priority; - } - - @Override - public void addFront(Procedure proc) { - this.runnables.addFirst(proc.getProcId()); - } - - @Override - public void addBack(Procedure proc) { - this.runnables.addLast(proc.getProcId()); - } - - @Override - public Long poll() { - return this.runnables.poll(); - } - - @Override - public synchronized boolean acquireDeleteLock() { - return tryExclusiveLock(); - } - - public synchronized boolean isLocked() { - return isExclusiveLock() || sharedLock > 0; - } - - public synchronized boolean isExclusiveLock() { - return this.exclusiveLock; - } - - public synchronized boolean trySharedLock() { - if (isExclusiveLock()) return false; - sharedLock++; - return true; - } - - public synchronized void releaseSharedLock() { - sharedLock--; - } - - /** - * @return True if only one instance of a shared lock outstanding. - */ - synchronized boolean isSingleSharedLock() { - return sharedLock == 1; - } - - public synchronized boolean tryExclusiveLock() { - if (isLocked()) return false; - exclusiveLock = true; - return true; - } - - public synchronized void releaseExclusiveLock() { - exclusiveLock = false; - } - - @Override - public String toString() { - return this.runnables.toString(); - } - } - - /** - * Run Queue for Server procedures. - */ - private static class ServerRunQueue extends AbstractRunQueue { - public ServerRunQueue(int priority) { - super(priority); - } - } - - /** - * Run Queue for a Table. It contains a read-write lock that is used by the - * MasterProcedureQueue to decide if we should fetch an item from this queue - * or skip to another one which will be able to run without waiting for locks. - */ - private static class TableRunQueue extends AbstractRunQueue { - private TableLock tableLock = null; - - public TableRunQueue(int priority) { - super(priority); - } - - // TODO: Improve run-queue push with TableProcedureInterface.getType() - // we can take smart decisions based on the type of the operation (e.g. create/delete) - @Override - public void addBack(final Procedure proc) { - super.addBack(proc); - } - - public synchronized boolean trySharedLock(final TableLockManager lockManager, - final TableName tableName, final String purpose) { - if (isExclusiveLock()) return false; - - // Take zk-read-lock - tableLock = lockManager.readLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire read lock on " + tableName, e); - tableLock = null; - return false; - } - trySharedLock(); - return true; - } - - public synchronized void releaseSharedLock(final TableLockManager lockManager, - final TableName tableName) { - releaseTableLock(lockManager, isSingleSharedLock()); - releaseSharedLock(); - } - - public synchronized boolean tryExclusiveLock(final TableLockManager lockManager, - final TableName tableName, final String purpose) { - if (isLocked()) return false; - // Take zk-write-lock - tableLock = lockManager.writeLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire write lock on " + tableName, e); - tableLock = null; - return false; - } - tryExclusiveLock(); - return true; - } - - public synchronized void releaseExclusiveLock(final TableLockManager lockManager, - final TableName tableName) { - releaseTableLock(lockManager, true); - releaseExclusiveLock(); - } - - private void releaseTableLock(final TableLockManager lockManager, boolean reset) { - for (int i = 0; i < 3; ++i) { - try { - tableLock.release(); - if (reset) { - tableLock = null; - } - break; - } catch (IOException e) { - LOG.warn("Could not release the table write-lock", e); - } - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java new file mode 100644 index 0000000..9a3714f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -0,0 +1,1241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; + +/** + * ProcedureRunnableSet for the Master Procedures. + * This RunnableSet tries to provide to the ProcedureExecutor procedures + * that can be executed without having to wait on a lock. + * Most of the master operations can be executed concurrently, if they + * are operating on different tables (e.g. two create table can be performed + * at the same, time assuming table A and table B) or against two different servers; say + * two servers that crashed at about the same time. + * + *

Each procedure should implement an interface providing information for this queue. + * for example table related procedures should implement TableProcedureInterface. + * each procedure will be pushed in its own queue, and based on the operation type + * we may take smarter decision. e.g. we can abort all the operations preceding + * a delete table, or similar. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterProcedureScheduler implements ProcedureRunnableSet { + private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); + + private final TableLockManager lockManager; + private final ReentrantLock schedLock = new ReentrantLock(); + private final Condition schedWaitCond = schedLock.newCondition(); + + private final FairQueue serverRunQueue = new FairQueue(); + private final FairQueue tableRunQueue = new FairQueue(); + private int queueSize = 0; + + private final Object[] serverBuckets = new Object[128]; + private Queue namespaceMap = null; + private Queue tableMap = null; + + private final int metaTablePriority; + private final int userTablePriority; + private final int sysTablePriority; + + // TODO: metrics + private long pollCalls = 0; + private long nullPollCalls = 0; + + public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) { + this.lockManager = lockManager; + + // TODO: should this be part of the HTD? + metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); + sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); + userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + } + + @Override + public void addFront(Procedure proc) { + doAdd(proc, true); + } + + @Override + public void addBack(Procedure proc) { + doAdd(proc, false); + } + + @Override + public void yield(final Procedure proc) { + doAdd(proc, isTableProcedure(proc)); + } + + private void doAdd(final Procedure proc, final boolean addFront) { + schedLock.lock(); + try { + if (isTableProcedure(proc)) { + doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); + } else if (isServerProcedure(proc)) { + doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet"); + } + schedWaitCond.signal(); + } finally { + schedLock.unlock(); + } + } + + private > void doAdd(final FairQueue fairq, + final Queue queue, final Procedure proc, final boolean addFront) { + queue.add(proc, addFront); + if (!(queue.isSuspended() || queue.hasExclusiveLock())) { + if (queue.size() == 1 && !IterableList.isLinked(queue)) { + fairq.add(queue); + } + queueSize++; + } + } + + @Override + public Procedure poll() { + return poll(-1); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + Procedure poll(long waitNsec) { + Procedure pollResult = null; + schedLock.lock(); + try { + if (queueSize == 0) { + if (waitNsec < 0) { + schedWaitCond.await(); + } else { + schedWaitCond.awaitNanos(waitNsec); + } + if (queueSize == 0) { + return null; + } + } + + // For now, let server handling have precedence over table handling; presumption is that it + // is more important handling crashed servers than it is running the + // enabling/disabling tables, etc. + pollResult = doPoll(serverRunQueue); + if (pollResult == null) { + pollResult = doPoll(tableRunQueue); + } + + // update metrics + pollCalls++; + nullPollCalls += (pollResult == null) ? 1 : 0; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + schedLock.unlock(); + } + return pollResult; + } + + private > Procedure doPoll(final FairQueue fairq) { + Queue rq = fairq.poll(); + if (rq == null || !rq.isAvailable()) { + return null; + } + + assert !rq.isSuspended() : "rq=" + rq + " is suspended"; + Procedure pollResult = rq.poll(); + this.queueSize--; + if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) { + removeFromRunQueue(fairq, rq); + } + return pollResult; + } + + @Override + public void clear() { + // NOTE: USED ONLY FOR TESTING + schedLock.lock(); + try { + // Remove Servers + for (int i = 0; i < serverBuckets.length; ++i) { + clear((ServerQueue)serverBuckets[i], serverRunQueue); + serverBuckets[i] = null; + } + + // Remove Tables + clear(tableMap, tableRunQueue); + tableMap = null; + + assert queueSize == 0 : "expected queue size to be 0, got " + queueSize; + } finally { + schedLock.unlock(); + } + } + + private > void clear(Queue treeMap, FairQueue fairq) { + while (treeMap != null) { + Queue node = AvlTree.getFirst(treeMap); + assert !node.isSuspended() : "can't clear suspended " + node.getKey(); + treeMap = AvlTree.remove(treeMap, node.getKey()); + removeFromRunQueue(fairq, node); + } + } + + @Override + public void signalAll() { + schedLock.lock(); + try { + schedWaitCond.signalAll(); + } finally { + schedLock.unlock(); + } + } + + @Override + public int size() { + schedLock.lock(); + try { + return queueSize; + } finally { + schedLock.unlock(); + } + } + + @Override + public void completionCleanup(Procedure proc) { + if (proc instanceof TableProcedureInterface) { + TableProcedureInterface iProcTable = (TableProcedureInterface)proc; + boolean tableDeleted; + if (proc.hasException()) { + IOException procEx = proc.getException().unwrapRemoteException(); + if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { + // create failed because the table already exist + tableDeleted = !(procEx instanceof TableExistsException); + } else { + // the operation failed because the table does not exist + tableDeleted = (procEx instanceof TableNotFoundException); + } + } else { + // the table was deleted + tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); + } + if (tableDeleted) { + markTableAsDeleted(iProcTable.getTableName()); + return; + } + } else { + // No cleanup for ServerProcedureInterface types, yet. + return; + } + } + + private > void addToRunQueue(FairQueue fairq, Queue queue) { + if (IterableList.isLinked(queue)) return; + if (!queue.isEmpty()) { + fairq.add(queue); + queueSize += queue.size(); + } + } + + private > void removeFromRunQueue(FairQueue fairq, Queue queue) { + if (!IterableList.isLinked(queue)) return; + fairq.remove(queue); + queueSize -= queue.size(); + } + + // ============================================================================ + // TODO: Metrics + // ============================================================================ + public long getPollCalls() { + return pollCalls; + } + + public long getNullPollCalls() { + return nullPollCalls; + } + + // ============================================================================ + // Event Helpers + // ============================================================================ + public boolean waitEvent(ProcedureEvent event, Procedure procedure) { + return waitEvent(event, procedure, false); + } + + public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + synchronized (event) { + if (event.isReady()) { + return false; + } + + // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue + if (!suspendQueue) suspendQueue = true; + + if (isTableProcedure(procedure)) { + suspendTableQueue(event, getTableName(procedure)); + } else if (isServerProcedure(procedure)) { + suspendServerQueue(event, getServerName(procedure)); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet"); + } + } + return true; + } + + private void suspendTableQueue(ProcedureEvent event, TableName tableName) { + schedLock.lock(); + try { + TableQueue queue = getTableQueue(tableName); + if (!queue.setSuspended(true)) return; + + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend table queue " + tableName); + } + removeFromRunQueue(tableRunQueue, queue); + event.suspendTableQueue(queue); + } finally { + schedLock.unlock(); + } + } + + private void suspendServerQueue(ProcedureEvent event, ServerName serverName) { + schedLock.lock(); + try { + // TODO: This will change once we have the new AM + ServerQueue queue = getServerQueue(serverName); + if (!queue.setSuspended(true)) return; + + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend server queue " + serverName); + } + removeFromRunQueue(serverRunQueue, queue); + event.suspendServerQueue(queue); + } finally { + schedLock.unlock(); + } + } + + public void suspend(ProcedureEvent event) { + synchronized (event) { + event.setReady(false); + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend event " + event); + } + } + } + + public void wake(ProcedureEvent event) { + synchronized (event) { + event.setReady(true); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake event " + event); + } + + schedLock.lock(); + try { + while (event.hasWaitingTables()) { + Queue queue = event.popWaitingTable(); + addToRunQueue(tableRunQueue, queue); + } + // TODO: This will change once we have the new AM + while (event.hasWaitingServers()) { + Queue queue = event.popWaitingServer(); + addToRunQueue(serverRunQueue, queue); + } + + if (queueSize > 1) { + schedWaitCond.signalAll(); + } else if (queueSize > 0) { + schedWaitCond.signal(); + } + } finally { + schedLock.unlock(); + } + } + } + + public static class ProcedureEvent { + private final String description; + + private Queue waitingServers = null; + private Queue waitingTables = null; + private boolean ready = false; + + public ProcedureEvent(String description) { + this.description = description; + } + + public synchronized boolean isReady() { + return ready; + } + + private synchronized void setReady(boolean isReady) { + this.ready = isReady; + } + + private void suspendTableQueue(Queue queue) { + waitingTables = IterableList.append(waitingTables, queue); + } + + private void suspendServerQueue(Queue queue) { + waitingServers = IterableList.append(waitingServers, queue); + } + + private boolean hasWaitingTables() { + return waitingTables != null; + } + + private Queue popWaitingTable() { + Queue node = waitingTables; + waitingTables = IterableList.remove(waitingTables, node); + node.setSuspended(false); + return node; + } + + private boolean hasWaitingServers() { + return waitingServers != null; + } + + private Queue popWaitingServer() { + Queue node = waitingServers; + waitingServers = IterableList.remove(waitingServers, node); + node.setSuspended(false); + return node; + } + + @Override + public String toString() { + return String.format("ProcedureEvent(%s)", description); + } + } + + // ============================================================================ + // Table Queue Lookup Helpers + // ============================================================================ + private TableQueue getTableQueueWithLock(TableName tableName) { + schedLock.lock(); + try { + return getTableQueue(tableName); + } finally { + schedLock.unlock(); + } + } + + private TableQueue getTableQueue(TableName tableName) { + Queue node = AvlTree.get(tableMap, tableName); + if (node != null) return (TableQueue)node; + + node = new TableQueue(tableName, getTablePriority(tableName)); + tableMap = AvlTree.insert(tableMap, node); + return (TableQueue)node; + } + + private void removeTableQueue(TableName tableName) { + tableMap = AvlTree.remove(tableMap, tableName); + } + + private int getTablePriority(TableName tableName) { + if (tableName.equals(TableName.META_TABLE_NAME)) { + return metaTablePriority; + } else if (tableName.isSystemTable()) { + return sysTablePriority; + } + return userTablePriority; + } + + private static boolean isTableProcedure(Procedure proc) { + return proc instanceof TableProcedureInterface; + } + + private static TableName getTableName(Procedure proc) { + return ((TableProcedureInterface)proc).getTableName(); + } + + // ============================================================================ + // Server Queue Lookup Helpers + // ============================================================================ + private ServerQueue getServerQueueWithLock(ServerName serverName) { + schedLock.lock(); + try { + return getServerQueue(serverName); + } finally { + schedLock.unlock(); + } + } + + private ServerQueue getServerQueue(ServerName serverName) { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + Queue root = getTreeRoot(serverBuckets, index); + Queue node = AvlTree.get(root, serverName); + if (node != null) return (ServerQueue)node; + + node = new ServerQueue(serverName); + serverBuckets[index] = AvlTree.insert(root, node); + return (ServerQueue)node; + } + + private void removeServerQueue(ServerName serverName) { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName); + } + + @SuppressWarnings("unchecked") + private static > Queue getTreeRoot(Object[] buckets, int index) { + return (Queue) buckets[index]; + } + + private static int getBucketIndex(Object[] buckets, int hashCode) { + return Math.abs(hashCode) % buckets.length; + } + + private static boolean isServerProcedure(Procedure proc) { + return proc instanceof ServerProcedureInterface; + } + + private static ServerName getServerName(Procedure proc) { + return ((ServerProcedureInterface)proc).getServerName(); + } + + // ============================================================================ + // Table and Server Queue Implementation + // ============================================================================ + public static class ServerQueue extends QueueImpl { + public ServerQueue(ServerName serverName) { + super(serverName); + } + + public boolean requireExclusiveLock(Procedure proc) { + ServerProcedureInterface spi = (ServerProcedureInterface)proc; + switch (spi.getServerOperationType()) { + case CRASH_HANDLER: + return true; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType()); + } + } + + public static class TableQueue extends QueueImpl { + private TableLock tableLock = null; + + public TableQueue(TableName tableName, int priority) { + super(tableName, priority); + } + + // TODO: We can abort pending/in-progress operation if the new call is + // something like drop table. We can Override addBack(), + // check the type and abort all the in-flight procedurs. + private boolean canAbortPendingOperations(Procedure proc) { + TableProcedureInterface tpi = (TableProcedureInterface)proc; + switch (tpi.getTableOperationType()) { + case DELETE: + return true; + default: + return false; + } + } + + public boolean requireExclusiveLock(Procedure proc) { + TableProcedureInterface tpi = (TableProcedureInterface)proc; + switch (tpi.getTableOperationType()) { + case CREATE: + case DELETE: + case DISABLE: + case EDIT: + case ENABLE: + return true; + case READ: + return false; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType()); + } + + private synchronized boolean trySharedLock(final TableLockManager lockManager, + final String purpose) { + if (hasExclusiveLock()) return false; + + // Take zk-read-lock + TableName tableName = getKey(); + tableLock = lockManager.readLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire read lock on " + tableName, e); + tableLock = null; + return false; + } + + trySharedLock(); + return true; + } + + private synchronized void releaseSharedLock(final TableLockManager lockManager) { + releaseTableLock(lockManager, isSingleSharedLock()); + releaseSharedLock(); + } + + private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager, + final String purpose) { + // Take zk-write-lock + TableName tableName = getKey(); + tableLock = lockManager.writeLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire write lock on " + tableName, e); + tableLock = null; + return false; + } + return true; + } + + private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) { + releaseTableLock(lockManager, true); + } + + private void releaseTableLock(final TableLockManager lockManager, boolean reset) { + for (int i = 0; i < 3; ++i) { + try { + tableLock.release(); + if (reset) { + tableLock = null; + } + break; + } catch (IOException e) { + LOG.warn("Could not release the table write-lock", e); + } + } + } + } + + // ============================================================================ + // Locking Helpers + // ============================================================================ + /** + * Try to acquire the exclusive lock on the specified table. + * other operations in the table-queue will be executed after the lock is released. + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) { + schedLock.lock(); + TableQueue queue = getTableQueue(table); + boolean hasXLock = queue.tryExclusiveLock(); + if (!hasXLock) { + schedLock.unlock(); + return false; + } + + removeFromRunQueue(tableRunQueue, queue); + schedLock.unlock(); + + // Zk lock is expensive... + hasXLock = queue.tryZkExclusiveLock(lockManager, purpose); + if (!hasXLock) { + schedLock.lock(); + queue.releaseExclusiveLock(); + addToRunQueue(tableRunQueue, queue); + schedLock.unlock(); + } + return hasXLock; + } + + /** + * Release the exclusive lock taken with tryAcquireTableWrite() + * @param table the name of the table that has the exclusive lock + */ + public void releaseTableExclusiveLock(final TableName table) { + schedLock.lock(); + TableQueue queue = getTableQueue(table); + schedLock.unlock(); + + // Zk lock is expensive... + queue.releaseZkExclusiveLock(lockManager); + + schedLock.lock(); + queue.releaseExclusiveLock(); + addToRunQueue(tableRunQueue, queue); + schedLock.unlock(); + } + + /** + * Try to acquire the shared lock on the specified table. + * other "read" operations in the table-queue may be executed concurrently, + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) { + return getTableQueueWithLock(table).trySharedLock(lockManager, purpose); + } + + /** + * Release the shared lock taken with tryAcquireTableRead() + * @param table the name of the table that has the shared lock + */ + public void releaseTableSharedLock(final TableName table) { + getTableQueueWithLock(table).releaseSharedLock(lockManager); + } + + /** + * Tries to remove the queue and the table-lock of the specified table. + * If there are new operations pending (e.g. a new create), + * the remove will not be performed. + * @param table the name of the table that should be marked as deleted + * @return true if deletion succeeded, false otherwise meaning that there are + * other new operations pending for that table (e.g. a new create). + */ + protected boolean markTableAsDeleted(final TableName table) { + final ReentrantLock l = schedLock; + l.lock(); + try { + TableQueue queue = getTableQueue(table); + if (queue == null) return true; + + if (queue.isEmpty() && queue.acquireDeleteLock()) { + // remove the table from the run-queue and the map + if (IterableList.isLinked(queue)) { + tableRunQueue.remove(queue); + } + + // Remove the table lock + try { + lockManager.tableDeleted(table); + } catch (IOException e) { + LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical + } + + removeTableQueue(table); + } else { + // TODO: If there are no create, we can drop all the other ops + return false; + } + } finally { + l.unlock(); + } + return true; + } + + // ============================================================================ + // Server Locking Helpers + // ============================================================================ + /** + * Release the exclusive lock + * @see #tryAcquireServerExclusiveLock(ServerName) + * @param serverName the server that has the exclusive lock + */ + public boolean tryAcquireServerExclusiveLock(final ServerName serverName) { + schedLock.lock(); + try { + ServerQueue queue = getServerQueue(serverName); + if (queue.tryExclusiveLock()) { + removeFromRunQueue(serverRunQueue, queue); + return true; + } + } finally { + schedLock.unlock(); + } + return false; + } + + /** + * Release the exclusive lock + * @see #tryAcquireServerExclusiveLock(ServerName) + * @param serverName the server that has the exclusive lock + */ + public void releaseServerExclusiveLock(final ServerName serverName) { + schedLock.lock(); + try { + ServerQueue queue = getServerQueue(serverName); + queue.releaseExclusiveLock(); + addToRunQueue(serverRunQueue, queue); + } finally { + schedLock.unlock(); + } + } + + /** + * Try to acquire the shared lock on the specified server. + * @see #releaseServerSharedLock(ServerName) + * @param serverName Server to lock + * @return true if we were able to acquire the lock on the server, otherwise false. + */ + public boolean tryAcquireServerSharedLock(final ServerName serverName) { + return getServerQueueWithLock(serverName).trySharedLock(); + } + + /** + * Release the shared lock taken + * @see #tryAcquireServerSharedLock(ServerName) + * @param serverName the server that has the shared lock + */ + public void releaseServerSharedLock(final ServerName serverName) { + getServerQueueWithLock(serverName).releaseSharedLock(); + } + + // ============================================================================ + // Generic Helpers + // ============================================================================ + private static interface QueueInterface { + boolean isAvailable(); + boolean isEmpty(); + int size(); + void add(Procedure proc, boolean addFront); + boolean requireExclusiveLock(Procedure proc); + Procedure poll(); + + boolean isSuspended(); + } + + private static abstract class Queue> implements QueueInterface { + private Queue avlRight = null; + private Queue avlLeft = null; + private int avlHeight = 1; + + private Queue iterNext = null; + private Queue iterPrev = null; + private boolean suspended = false; + + private boolean exclusiveLock = false; + private int sharedLock = 0; + + private final TKey key; + private final int priority; + + public Queue(TKey key) { + this(key, 1); + } + + public Queue(TKey key, int priority) { + this.key = key; + this.priority = priority; + } + + protected TKey getKey() { + return key; + } + + protected int getPriority() { + return priority; + } + + /** + * True if the queue is not in the run-queue and it is owned by an event. + */ + public boolean isSuspended() { + return suspended; + } + + protected boolean setSuspended(boolean isSuspended) { + if (this.suspended == isSuspended) return false; + this.suspended = isSuspended; + return true; + } + + // ====================================================================== + // Read/Write Locking helpers + // ====================================================================== + public synchronized boolean isLocked() { + return hasExclusiveLock() || sharedLock > 0; + } + + public synchronized boolean hasExclusiveLock() { + return this.exclusiveLock; + } + + public synchronized boolean trySharedLock() { + if (hasExclusiveLock()) return false; + sharedLock++; + return true; + } + + public synchronized void releaseSharedLock() { + sharedLock--; + } + + protected synchronized boolean isSingleSharedLock() { + return sharedLock == 1; + } + + public synchronized boolean tryExclusiveLock() { + if (isLocked()) return false; + exclusiveLock = true; + return true; + } + + public synchronized void releaseExclusiveLock() { + exclusiveLock = false; + } + + public synchronized boolean acquireDeleteLock() { + return tryExclusiveLock(); + } + + // This should go away when we have the new AM and its events + // and we move xlock to the lock-event-queue. + public synchronized boolean isAvailable() { + return !exclusiveLock && !isEmpty(); + } + + // ====================================================================== + // Generic Helpers + // ====================================================================== + public int compareKey(TKey cmpKey) { + return key.compareTo(cmpKey); + } + + public int compareTo(Queue other) { + return compareKey(other.key); + } + + @Override + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), key); + } + } + + // ====================================================================== + // Helper Data Structures + // ====================================================================== + private static abstract class QueueImpl> extends Queue { + private final ArrayDeque runnables = new ArrayDeque(); + + public QueueImpl(TKey key) { + super(key); + } + + public QueueImpl(TKey key, int priority) { + super(key, priority); + } + + public void add(final Procedure proc, final boolean addToFront) { + if (addToFront) { + addFront(proc); + } else { + addBack(proc); + } + } + + protected void addFront(final Procedure proc) { + runnables.addFirst(proc); + } + + protected void addBack(final Procedure proc) { + runnables.addLast(proc); + } + + @Override + public Procedure poll() { + return runnables.poll(); + } + + @Override + public boolean isEmpty() { + return runnables.isEmpty(); + } + + public int size() { + return runnables.size(); + } + } + + private static class FairQueue> { + private final int quantum; + + private Queue currentQueue = null; + private Queue queueHead = null; + private int currentQuantum = 0; + + public FairQueue() { + this(1); + } + + public FairQueue(int quantum) { + this.quantum = quantum; + } + + public void add(Queue queue) { + queueHead = IterableList.append(queueHead, queue); + if (currentQueue == null) setNextQueue(queueHead); + } + + public void remove(Queue queue) { + Queue nextQueue = queue.iterNext; + queueHead = IterableList.remove(queueHead, queue); + if (currentQueue == queue) { + setNextQueue(queueHead != null ? nextQueue : null); + } + } + + public Queue poll() { + if (currentQuantum == 0) { + if (!nextQueue()) { + return null; // nothing here + } + currentQuantum = calculateQuantum(currentQueue) - 1; + } else { + currentQuantum--; + } + + // This should go away when we have the new AM and its events + if (!currentQueue.isAvailable()) { + Queue lastQueue = currentQueue; + do { + if (!nextQueue()) + return null; + } while (currentQueue != lastQueue && !currentQueue.isAvailable()); + + currentQuantum = calculateQuantum(currentQueue) - 1; + } + return currentQueue; + } + + private boolean nextQueue() { + if (currentQueue == null) return false; + currentQueue = currentQueue.iterNext; + return currentQueue != null; + } + + private void setNextQueue(Queue queue) { + currentQueue = queue; + if (queue != null) { + currentQuantum = calculateQuantum(currentQueue); + } else { + currentQuantum = 0; + } + } + + private int calculateQuantum(final Queue queue) { + return Math.max(1, queue.getPriority() * quantum); // TODO + } + } + + private static class AvlTree { + public static > Queue get(Queue root, T key) { + while (root != null) { + int cmp = root.compareKey(key); + if (cmp > 0) { + root = root.avlLeft; + } else if (cmp < 0) { + root = root.avlRight; + } else { + return root; + } + } + return null; + } + + public static > Queue getFirst(Queue root) { + if (root != null) { + while (root.avlLeft != null) { + root = root.avlLeft; + } + } + return root; + } + + public static > Queue getLast(Queue root) { + if (root != null) { + while (root.avlRight != null) { + root = root.avlRight; + } + } + return root; + } + + public static > Queue insert(Queue root, Queue node) { + if (root == null) return node; + if (node.compareTo(root) < 0) { + root.avlLeft = insert(root.avlLeft, node); + } else { + root.avlRight = insert(root.avlRight, node); + } + return balance(root); + } + + private static > Queue removeMin(Queue p) { + if (p.avlLeft == null) + return p.avlRight; + p.avlLeft = removeMin(p.avlLeft); + return balance(p); + } + + public static > Queue remove(Queue root, T key) { + if (root == null) return null; + + int cmp = root.compareKey(key); + if (cmp == 0) { + Queue q = root.avlLeft; + Queue r = root.avlRight; + if (r == null) return q; + Queue min = getFirst(r); + min.avlRight = removeMin(r); + min.avlLeft = q; + return balance(min); + } else if (cmp > 0) { + root.avlLeft = remove(root.avlLeft, key); + } else /* if (cmp < 0) */ { + root.avlRight = remove(root.avlRight, key); + } + return balance(root); + } + + private static > Queue balance(Queue p) { + fixHeight(p); + int balance = balanceFactor(p); + if (balance == 2) { + if (balanceFactor(p.avlRight) < 0) { + p.avlRight = rotateRight(p.avlRight); + } + return rotateLeft(p); + } else if (balance == -2) { + if (balanceFactor(p.avlLeft) > 0) { + p.avlLeft = rotateLeft(p.avlLeft); + } + return rotateRight(p); + } + return p; + } + + private static > Queue rotateRight(Queue p) { + Queue q = p.avlLeft; + p.avlLeft = q.avlRight; + q.avlRight = p; + fixHeight(p); + fixHeight(q); + return q; + } + + private static > Queue rotateLeft(Queue q) { + Queue p = q.avlRight; + q.avlRight = p.avlLeft; + p.avlLeft = q; + fixHeight(q); + fixHeight(p); + return p; + } + + private static > void fixHeight(Queue node) { + int heightLeft = height(node.avlLeft); + int heightRight = height(node.avlRight); + node.avlHeight = 1 + Math.max(heightLeft, heightRight); + } + + private static > int height(Queue node) { + return node != null ? node.avlHeight : 0; + } + + private static > int balanceFactor(Queue node) { + return height(node.avlRight) - height(node.avlLeft); + } + } + + private static class IterableList { + public static > Queue prepend(Queue head, Queue node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + Queue tail = head.iterPrev; + tail.iterNext = node; + head.iterPrev = node; + node.iterNext = head; + node.iterPrev = tail; + } else { + node.iterNext = node; + node.iterPrev = node; + } + return node; + } + + public static > Queue append(Queue head, Queue node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + Queue tail = head.iterPrev; + tail.iterNext = node; + node.iterNext = head; + node.iterPrev = tail; + head.iterPrev = node; + return head; + } + node.iterNext = node; + node.iterPrev = node; + return node; + } + + public static > Queue appendList(Queue head, Queue otherHead) { + if (head == null) return otherHead; + if (otherHead == null) return head; + + Queue tail = head.iterPrev; + Queue otherTail = otherHead.iterPrev; + tail.iterNext = otherHead; + otherHead.iterPrev = tail; + otherTail.iterNext = head; + head.iterPrev = otherTail; + return head; + } + + private static > Queue remove(Queue head, Queue node) { + assert isLinked(node) : node + " is not linked"; + if (node != node.iterNext) { + node.iterPrev.iterNext = node.iterNext; + node.iterNext.iterPrev = node.iterPrev; + head = (head == node) ? node.iterNext : head; + } else { + head = null; + } + node.iterNext = null; + node.iterPrev = null; + return head; + } + + private static > boolean isLinked(Queue node) { + return node.iterPrev != null && node.iterNext != null; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java index b858e0c..3a30527 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -182,10 +181,8 @@ public class ModifyColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_MODIFY_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index a6300dd..6663e46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; @@ -215,10 +214,8 @@ public class ModifyTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - getTableName(), - EventType.C_M_MODIFY_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table"); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index bdcd89c..970c9c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -553,13 +553,13 @@ implements ServerProcedureInterface { @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false; - return env.getProcedureQueue().tryAcquireServerExclusiveLock(this); + if (env.waitServerCrashProcessingEnabled(this)) return false; + return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName()); } @Override protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseServerExclusiveLock(this); + env.getProcedureQueue().releaseServerExclusiveLock(getServerName()); } @Override @@ -751,6 +751,11 @@ implements ServerProcedureInterface { return this.carryingMeta; } + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CRASH_HANDLER; + } + /** * For this procedure, yield at end of each successful flow step so that all crashed servers * can make progress rather than do the default which has each procedure running to completion diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index 5b0c45f..b5c24ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -28,6 +28,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public interface ServerProcedureInterface { + public enum ServerOperationType { + CRASH_HANDLER + }; + /** * @return Name of this server instance. */ @@ -37,4 +41,12 @@ public interface ServerProcedureInterface { * @return True if this server has an hbase:meta table region. */ boolean hasMetaTableRegion(); -} \ No newline at end of file + + /** + * Given an operation type we can take decisions about what to do with pending operations. + * e.g. if we get a crash handler and we have some assignment operation pending + * we can abort those operations. + * @return the operation type that the procedure is executing. + */ + ServerOperationType getServerOperationType(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java index 2e39b80..0d17bf6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java @@ -182,7 +182,7 @@ public class TruncateTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (env.waitInitialized(this)) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index 4d0093c..99e0e3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -128,14 +128,14 @@ public class TestMaster { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); try { - m.initialized = false; // fake it, set back later + m.setInitialized(false); // fake it, set back later HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO; m.move(meta.getEncodedNameAsBytes(), null); fail("Region should not be moved since master is not initialized"); } catch (IOException ioe) { assertTrue(ioe instanceof PleaseHoldException); } finally { - m.initialized = true; + m.setInitialized(true); } } @@ -172,13 +172,13 @@ public class TestMaster { try { List tableRegions = admin.getTableRegions(tableName); - master.initialized = false; // fake it, set back later + master.setInitialized(false); // fake it, set back later admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null); fail("Region should not be moved since master is not initialized"); } catch (IOException ioe) { assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException")); } finally { - master.initialized = true; + master.setInitialized(true); TEST_UTIL.deleteTable(tableName); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index 398a898..cafee7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -306,7 +306,7 @@ public class TestMasterNoCluster { try { // Wait till master is initialized. - while (!master.initialized) Threads.sleep(10); + while (!master.isInitialized()) Threads.sleep(10); LOG.info("Master is initialized"); assertFalse("The dead server should not be pulled in", diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java new file mode 100644 index 0000000..af8d6ba --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, MediumTests.class}) +public class TestMasterProcedureEvents { + private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static long nonceGroup = HConstants.NO_NONCE; + private static long nonce = HConstants.NO_NONCE; + + private static void setupConf(Configuration conf) { + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8); + conf.setBoolean("hbase.procedure.store.wal.use.hsync", false); + } + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(3); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test + public void testMasterInitializedEvent() throws Exception { + TableName tableName = TableName.valueOf("testMasterInitializedEvent"); + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); + + HRegionInfo hri = new HRegionInfo(tableName); + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("f"); + htd.addFamily(hcd); + + while (!master.isInitialized()) Thread.sleep(250); + master.setInitialized(false); // fake it, set back later + + CreateTableProcedure proc = new CreateTableProcedure( + procExec.getEnvironment(), htd, new HRegionInfo[] { hri }); + + long pollCalls = procSched.getPollCalls(); + long nullPollCalls = procSched.getNullPollCalls(); + + long procId = procExec.submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE); + for (int i = 0; i < 10; ++i) { + Thread.sleep(100); + assertEquals(pollCalls + 1, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + master.setInitialized(true); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + assertEquals(pollCalls + 2, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + @Test + public void testServerCrashProcedureEvent() throws Exception { + TableName tableName = TableName.valueOf("testServerCrashProcedureEventTb"); + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); + + while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() || + master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Thread.sleep(25); + } + + UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]); + try (Table t = UTIL.getConnection().getTable(tableName)) { + // Load the table with a bit of data so some logs to split and some edits in each region. + UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]); + } + + master.setServerCrashProcessingEnabled(false); // fake it, set back later + + long pollCalls = procSched.getPollCalls(); + long nullPollCalls = procSched.getNullPollCalls(); + + // Kill a server. Master will notice but do nothing other than add it to list of dead servers. + HRegionServer hrs = getServerWithRegions(); + boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName()); + UTIL.getHBaseCluster().killRegionServer(hrs.getServerName()); + hrs.join(); + + // Wait until the expiration of the server has arrived at the master. We won't process it + // by queuing a ServerCrashProcedure because we have disabled crash processing... but wait + // here so ServerManager gets notice and adds expired server to appropriate queues. + while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10); + + // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'. + master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName()); + + long procId = procExec.submitProcedure( + new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta)); + + for (int i = 0; i < 10; ++i) { + Thread.sleep(100); + assertEquals(pollCalls + 1, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + // Now, reenable processing else we can't get a lock on the ServerCrashProcedure. + master.setServerCrashProcessingEnabled(true); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + LOG.debug("server crash processing poll calls: " + procSched.getPollCalls()); + assertTrue(procSched.getPollCalls() >= (pollCalls + 2)); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + + UTIL.deleteTable(tableName); + } + + private HRegionServer getServerWithRegions() { + for (int i = 0; i < 3; ++i) { + HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i); + if (hrs.getNumberOfOnlineRegions() > 0) { + return hrs; + } + } + return null; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java deleted file mode 100644 index 7e6e356..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java +++ /dev/null @@ -1,484 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.testclassification.MasterTests; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@Category({MasterTests.class, SmallTests.class}) -public class TestMasterProcedureQueue { - private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class); - - private MasterProcedureQueue queue; - private Configuration conf; - - @Before - public void setUp() throws IOException { - conf = HBaseConfiguration.create(); - queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager()); - } - - @After - public void tearDown() throws IOException { - assertEquals(0, queue.size()); - } - - @Test - public void testConcurrentCreateDelete() throws Exception { - final MasterProcedureQueue procQueue = queue; - final TableName table = TableName.valueOf("testtb"); - final AtomicBoolean running = new AtomicBoolean(true); - final AtomicBoolean failure = new AtomicBoolean(false); - Thread createThread = new Thread() { - @Override - public void run() { - try { - while (running.get() && !failure.get()) { - if (procQueue.tryAcquireTableExclusiveLock(table, "create")) { - procQueue.releaseTableExclusiveLock(table); - } - } - } catch (Throwable e) { - LOG.error("create failed", e); - failure.set(true); - } - } - }; - - Thread deleteThread = new Thread() { - @Override - public void run() { - try { - while (running.get() && !failure.get()) { - if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) { - procQueue.releaseTableExclusiveLock(table); - } - procQueue.markTableAsDeleted(table); - } - } catch (Throwable e) { - LOG.error("delete failed", e); - failure.set(true); - } - } - }; - - createThread.start(); - deleteThread.start(); - for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) { - Thread.sleep(100); - } - running.set(false); - createThread.join(); - deleteThread.join(); - assertEquals(false, failure.get()); - } - - /** - * Verify simple create/insert/fetch/delete of the table queue. - */ - @Test - public void testSimpleTableOpsQueues() throws Exception { - final int NUM_TABLES = 10; - final int NUM_ITEMS = 10; - - int count = 0; - for (int i = 1; i <= NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("test-%04d", i)); - // insert items - for (int j = 1; j <= NUM_ITEMS; ++j) { - queue.addBack(new TestTableProcedure(i * 1000 + j, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - assertEquals(++count, queue.size()); - } - } - assertEquals(NUM_TABLES * NUM_ITEMS, queue.size()); - - for (int j = 1; j <= NUM_ITEMS; ++j) { - for (int i = 1; i <= NUM_TABLES; ++i) { - Long procId = queue.poll(); - assertEquals(--count, queue.size()); - assertEquals(i * 1000 + j, procId.longValue()); - } - } - assertEquals(0, queue.size()); - - for (int i = 1; i <= NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("test-%04d", i)); - // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); - } - } - - /** - * Check that the table queue is not deletable until every procedure - * in-progress is completed (this is a special case for write-locks). - */ - @Test - public void testCreateDeleteTableOperationsWithWriteLock() throws Exception { - TableName tableName = TableName.valueOf("testtb"); - - queue.addBack(new TestTableProcedure(1, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - - // table can't be deleted because one item is in the queue - assertFalse(queue.markTableAsDeleted(tableName)); - - // fetch item and take a lock - assertEquals(1, queue.poll().longValue()); - // take the xlock - assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write")); - // table can't be deleted because we have the lock - assertEquals(0, queue.size()); - assertFalse(queue.markTableAsDeleted(tableName)); - // release the xlock - queue.releaseTableExclusiveLock(tableName); - // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); - } - - /** - * Check that the table queue is not deletable until every procedure - * in-progress is completed (this is a special case for read-locks). - */ - @Test - public void testCreateDeleteTableOperationsWithReadLock() throws Exception { - final TableName tableName = TableName.valueOf("testtb"); - final int nitems = 2; - - for (int i = 1; i <= nitems; ++i) { - queue.addBack(new TestTableProcedure(i, tableName, - TableProcedureInterface.TableOperationType.READ)); - } - - // table can't be deleted because one item is in the queue - assertFalse(queue.markTableAsDeleted(tableName)); - - for (int i = 1; i <= nitems; ++i) { - // fetch item and take a lock - assertEquals(i, queue.poll().longValue()); - // take the rlock - assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i)); - // table can't be deleted because we have locks and/or items in the queue - assertFalse(queue.markTableAsDeleted(tableName)); - } - - for (int i = 1; i <= nitems; ++i) { - // table can't be deleted because we have locks - assertFalse(queue.markTableAsDeleted(tableName)); - // release the rlock - queue.releaseTableSharedLock(tableName); - } - - // there are no items and no lock in the queeu - assertEquals(0, queue.size()); - // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); - } - - /** - * Verify the correct logic of RWLocks on the queue - */ - @Test - public void testVerifyRwLocks() throws Exception { - TableName tableName = TableName.valueOf("testtb"); - queue.addBack(new TestTableProcedure(1, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - queue.addBack(new TestTableProcedure(2, tableName, - TableProcedureInterface.TableOperationType.READ)); - queue.addBack(new TestTableProcedure(3, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - queue.addBack(new TestTableProcedure(4, tableName, - TableProcedureInterface.TableOperationType.READ)); - queue.addBack(new TestTableProcedure(5, tableName, - TableProcedureInterface.TableOperationType.READ)); - - // Fetch the 1st item and take the write lock - Long procId = queue.poll(); - assertEquals(1, procId.longValue()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); - - // Fetch the 2nd item and verify that the lock can't be acquired - assertEquals(null, queue.poll()); - - // Release the write lock and acquire the read lock - queue.releaseTableExclusiveLock(tableName); - - // Fetch the 2nd item and take the read lock - procId = queue.poll(); - assertEquals(2, procId.longValue()); - assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); - - // Fetch the 3rd item and verify that the lock can't be acquired - procId = queue.poll(); - assertEquals(3, procId.longValue()); - assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); - - // release the rdlock of item 2 and take the wrlock for the 3d item - queue.releaseTableSharedLock(tableName); - assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); - - // Fetch 4th item and verify that the lock can't be acquired - assertEquals(null, queue.poll()); - - // Release the write lock and acquire the read lock - queue.releaseTableExclusiveLock(tableName); - - // Fetch the 4th item and take the read lock - procId = queue.poll(); - assertEquals(4, procId.longValue()); - assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); - - // Fetch the 4th item and take the read lock - procId = queue.poll(); - assertEquals(5, procId.longValue()); - assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); - - // Release 4th and 5th read-lock - queue.releaseTableSharedLock(tableName); - queue.releaseTableSharedLock(tableName); - - // remove table queue - assertEquals(0, queue.size()); - assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName)); - } - - /** - * Verify that "write" operations for a single table are serialized, - * but different tables can be executed in parallel. - */ - @Test(timeout=90000) - public void testConcurrentWriteOps() throws Exception { - final TestTableProcSet procSet = new TestTableProcSet(queue); - - final int NUM_ITEMS = 10; - final int NUM_TABLES = 4; - final AtomicInteger opsCount = new AtomicInteger(0); - for (int i = 0; i < NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); - for (int j = 1; j < NUM_ITEMS; ++j) { - procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - opsCount.incrementAndGet(); - } - } - assertEquals(opsCount.get(), queue.size()); - - final Thread[] threads = new Thread[NUM_TABLES * 2]; - final HashSet concurrentTables = new HashSet(); - final ArrayList failures = new ArrayList(); - final AtomicInteger concurrentCount = new AtomicInteger(0); - for (int i = 0; i < threads.length; ++i) { - threads[i] = new Thread() { - @Override - public void run() { - while (opsCount.get() > 0) { - try { - TableProcedureInterface proc = procSet.acquire(); - if (proc == null) { - queue.signalAll(); - if (opsCount.get() > 0) { - continue; - } - break; - } - synchronized (concurrentTables) { - assertTrue("unexpected concurrency on " + proc.getTableName(), - concurrentTables.add(proc.getTableName())); - } - assertTrue(opsCount.decrementAndGet() >= 0); - try { - long procId = ((Procedure)proc).getProcId(); - TableName tableId = proc.getTableName(); - int concurrent = concurrentCount.incrementAndGet(); - assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, - concurrent >= 1 && concurrent <= NUM_TABLES); - LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); - Thread.sleep(2000); - concurrent = concurrentCount.decrementAndGet(); - LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); - assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); - } finally { - synchronized (concurrentTables) { - assertTrue(concurrentTables.remove(proc.getTableName())); - } - procSet.release(proc); - } - } catch (Throwable e) { - LOG.error("Failed " + e.getMessage(), e); - synchronized (failures) { - failures.add(e.getMessage()); - } - } finally { - queue.signalAll(); - } - } - } - }; - threads[i].start(); - } - for (int i = 0; i < threads.length; ++i) { - threads[i].join(); - } - assertTrue(failures.toString(), failures.isEmpty()); - assertEquals(0, opsCount.get()); - assertEquals(0, queue.size()); - - for (int i = 1; i <= NUM_TABLES; ++i) { - TableName table = TableName.valueOf(String.format("testtb-%04d", i)); - assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table)); - } - } - - public static class TestTableProcSet { - private final MasterProcedureQueue queue; - private Map procsMap = - new ConcurrentHashMap(); - - public TestTableProcSet(final MasterProcedureQueue queue) { - this.queue = queue; - } - - public void addBack(TableProcedureInterface tableProc) { - Procedure proc = (Procedure)tableProc; - procsMap.put(proc.getProcId(), tableProc); - queue.addBack(proc); - } - - public void addFront(TableProcedureInterface tableProc) { - Procedure proc = (Procedure)tableProc; - procsMap.put(proc.getProcId(), tableProc); - queue.addFront(proc); - } - - public TableProcedureInterface acquire() { - TableProcedureInterface proc = null; - boolean avail = false; - while (!avail) { - Long procId = queue.poll(); - proc = procId != null ? procsMap.remove(procId) : null; - if (proc == null) break; - switch (proc.getTableOperationType()) { - case CREATE: - case DELETE: - case EDIT: - avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(), - "op="+ proc.getTableOperationType()); - break; - case READ: - avail = queue.tryAcquireTableSharedLock(proc.getTableName(), - "op="+ proc.getTableOperationType()); - break; - } - if (!avail) { - addFront(proc); - LOG.debug("yield procId=" + procId); - } - } - return proc; - } - - public void release(TableProcedureInterface proc) { - switch (proc.getTableOperationType()) { - case CREATE: - case DELETE: - case EDIT: - queue.releaseTableExclusiveLock(proc.getTableName()); - break; - case READ: - queue.releaseTableSharedLock(proc.getTableName()); - break; - } - } - } - - public static class TestTableProcedure extends Procedure - implements TableProcedureInterface { - private final TableOperationType opType; - private final TableName tableName; - - public TestTableProcedure() { - throw new UnsupportedOperationException("recovery should not be triggered here"); - } - - public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) { - this.tableName = tableName; - this.opType = opType; - setProcId(procId); - } - - @Override - public TableName getTableName() { - return tableName; - } - - @Override - public TableOperationType getTableOperationType() { - return opType; - } - - @Override - protected Procedure[] execute(Void env) { - return null; - } - - @Override - protected void rollback(Void env) { - throw new UnsupportedOperationException(); - } - - @Override - protected boolean abort(Void env) { - throw new UnsupportedOperationException(); - } - - @Override - protected void serializeStateData(final OutputStream stream) throws IOException {} - - @Override - protected void deserializeStateData(final InputStream stream) throws IOException {} - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java new file mode 100644 index 0000000..106b9fa --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -0,0 +1,489 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestMasterProcedureScheduler { + private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class); + + private MasterProcedureScheduler queue; + private Configuration conf; + + @Before + public void setUp() throws IOException { + conf = HBaseConfiguration.create(); + queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager()); + } + + @After + public void tearDown() throws IOException { + assertEquals(0, queue.size()); + } + + @Test + public void testConcurrentCreateDelete() throws Exception { + final MasterProcedureScheduler procQueue = queue; + final TableName table = TableName.valueOf("testtb"); + final AtomicBoolean running = new AtomicBoolean(true); + final AtomicBoolean failure = new AtomicBoolean(false); + Thread createThread = new Thread() { + @Override + public void run() { + try { + while (running.get() && !failure.get()) { + if (procQueue.tryAcquireTableExclusiveLock(table, "create")) { + procQueue.releaseTableExclusiveLock(table); + } + } + } catch (Throwable e) { + LOG.error("create failed", e); + failure.set(true); + } + } + }; + + Thread deleteThread = new Thread() { + @Override + public void run() { + try { + while (running.get() && !failure.get()) { + if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) { + procQueue.releaseTableExclusiveLock(table); + } + procQueue.markTableAsDeleted(table); + } + } catch (Throwable e) { + LOG.error("delete failed", e); + failure.set(true); + } + } + }; + + createThread.start(); + deleteThread.start(); + for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) { + Thread.sleep(100); + } + running.set(false); + createThread.join(); + deleteThread.join(); + assertEquals(false, failure.get()); + } + + /** + * Verify simple create/insert/fetch/delete of the table queue. + */ + @Test + public void testSimpleTableOpsQueues() throws Exception { + final int NUM_TABLES = 10; + final int NUM_ITEMS = 10; + + int count = 0; + for (int i = 1; i <= NUM_TABLES; ++i) { + TableName tableName = TableName.valueOf(String.format("test-%04d", i)); + // insert items + for (int j = 1; j <= NUM_ITEMS; ++j) { + queue.addBack(new TestTableProcedure(i * 1000 + j, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + assertEquals(++count, queue.size()); + } + } + assertEquals(NUM_TABLES * NUM_ITEMS, queue.size()); + + for (int j = 1; j <= NUM_ITEMS; ++j) { + for (int i = 1; i <= NUM_TABLES; ++i) { + Procedure proc = queue.poll(); + assertTrue(proc != null); + TableName tableName = ((TestTableProcedure)proc).getTableName(); + queue.tryAcquireTableExclusiveLock(tableName, "test"); + queue.releaseTableExclusiveLock(tableName); + queue.completionCleanup(proc); + assertEquals(--count, queue.size()); + assertEquals(i * 1000 + j, proc.getProcId()); + } + } + assertEquals(0, queue.size()); + + for (int i = 1; i <= NUM_TABLES; ++i) { + TableName tableName = TableName.valueOf(String.format("test-%04d", i)); + // complete the table deletion + assertTrue(queue.markTableAsDeleted(tableName)); + } + } + + /** + * Check that the table queue is not deletable until every procedure + * in-progress is completed (this is a special case for write-locks). + */ + @Test + public void testCreateDeleteTableOperationsWithWriteLock() throws Exception { + TableName tableName = TableName.valueOf("testtb"); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + + // table can't be deleted because one item is in the queue + assertFalse(queue.markTableAsDeleted(tableName)); + + // fetch item and take a lock + assertEquals(1, queue.poll().getProcId()); + // take the xlock + assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write")); + // table can't be deleted because we have the lock + assertEquals(0, queue.size()); + assertFalse(queue.markTableAsDeleted(tableName)); + // release the xlock + queue.releaseTableExclusiveLock(tableName); + // complete the table deletion + assertTrue(queue.markTableAsDeleted(tableName)); + } + + /** + * Check that the table queue is not deletable until every procedure + * in-progress is completed (this is a special case for read-locks). + */ + @Test + public void testCreateDeleteTableOperationsWithReadLock() throws Exception { + final TableName tableName = TableName.valueOf("testtb"); + final int nitems = 2; + + for (int i = 1; i <= nitems; ++i) { + queue.addBack(new TestTableProcedure(i, tableName, + TableProcedureInterface.TableOperationType.READ)); + } + + // table can't be deleted because one item is in the queue + assertFalse(queue.markTableAsDeleted(tableName)); + + for (int i = 1; i <= nitems; ++i) { + // fetch item and take a lock + assertEquals(i, queue.poll().getProcId()); + // take the rlock + assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i)); + // table can't be deleted because we have locks and/or items in the queue + assertFalse(queue.markTableAsDeleted(tableName)); + } + + for (int i = 1; i <= nitems; ++i) { + // table can't be deleted because we have locks + assertFalse(queue.markTableAsDeleted(tableName)); + // release the rlock + queue.releaseTableSharedLock(tableName); + } + + // there are no items and no lock in the queeu + assertEquals(0, queue.size()); + // complete the table deletion + assertTrue(queue.markTableAsDeleted(tableName)); + } + + /** + * Verify the correct logic of RWLocks on the queue + */ + @Test + public void testVerifyRwLocks() throws Exception { + TableName tableName = TableName.valueOf("testtb"); + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + queue.addBack(new TestTableProcedure(2, tableName, + TableProcedureInterface.TableOperationType.READ)); + queue.addBack(new TestTableProcedure(3, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + queue.addBack(new TestTableProcedure(4, tableName, + TableProcedureInterface.TableOperationType.READ)); + queue.addBack(new TestTableProcedure(5, tableName, + TableProcedureInterface.TableOperationType.READ)); + + // Fetch the 1st item and take the write lock + long procId = queue.poll().getProcId(); + assertEquals(1, procId); + assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); + + // Fetch the 2nd item and verify that the lock can't be acquired + assertEquals(null, queue.poll(0)); + + // Release the write lock and acquire the read lock + queue.releaseTableExclusiveLock(tableName); + + // Fetch the 2nd item and take the read lock + procId = queue.poll().getProcId(); + assertEquals(2, procId); + assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); + + // Fetch the 3rd item and verify that the lock can't be acquired + procId = queue.poll().getProcId(); + assertEquals(3, procId); + assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); + + // release the rdlock of item 2 and take the wrlock for the 3d item + queue.releaseTableSharedLock(tableName); + assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); + + // Fetch 4th item and verify that the lock can't be acquired + assertEquals(null, queue.poll(0)); + + // Release the write lock and acquire the read lock + queue.releaseTableExclusiveLock(tableName); + + // Fetch the 4th item and take the read lock + procId = queue.poll().getProcId(); + assertEquals(4, procId); + assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); + + // Fetch the 4th item and take the read lock + procId = queue.poll().getProcId(); + assertEquals(5, procId); + assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); + + // Release 4th and 5th read-lock + queue.releaseTableSharedLock(tableName); + queue.releaseTableSharedLock(tableName); + + // remove table queue + assertEquals(0, queue.size()); + assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName)); + } + + /** + * Verify that "write" operations for a single table are serialized, + * but different tables can be executed in parallel. + */ + @Test(timeout=90000) + public void testConcurrentWriteOps() throws Exception { + final TestTableProcSet procSet = new TestTableProcSet(queue); + + final int NUM_ITEMS = 10; + final int NUM_TABLES = 4; + final AtomicInteger opsCount = new AtomicInteger(0); + for (int i = 0; i < NUM_TABLES; ++i) { + TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); + for (int j = 1; j < NUM_ITEMS; ++j) { + procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + opsCount.incrementAndGet(); + } + } + assertEquals(opsCount.get(), queue.size()); + + final Thread[] threads = new Thread[NUM_TABLES * 2]; + final HashSet concurrentTables = new HashSet(); + final ArrayList failures = new ArrayList(); + final AtomicInteger concurrentCount = new AtomicInteger(0); + for (int i = 0; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (opsCount.get() > 0) { + try { + TableProcedureInterface proc = procSet.acquire(); + if (proc == null) { + queue.signalAll(); + if (opsCount.get() > 0) { + continue; + } + break; + } + synchronized (concurrentTables) { + assertTrue("unexpected concurrency on " + proc.getTableName(), + concurrentTables.add(proc.getTableName())); + } + assertTrue(opsCount.decrementAndGet() >= 0); + try { + long procId = ((Procedure)proc).getProcId(); + TableName tableId = proc.getTableName(); + int concurrent = concurrentCount.incrementAndGet(); + assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, + concurrent >= 1 && concurrent <= NUM_TABLES); + LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); + Thread.sleep(2000); + concurrent = concurrentCount.decrementAndGet(); + LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); + assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); + } finally { + synchronized (concurrentTables) { + assertTrue(concurrentTables.remove(proc.getTableName())); + } + procSet.release(proc); + } + } catch (Throwable e) { + LOG.error("Failed " + e.getMessage(), e); + synchronized (failures) { + failures.add(e.getMessage()); + } + } finally { + queue.signalAll(); + } + } + } + }; + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + assertTrue(failures.toString(), failures.isEmpty()); + assertEquals(0, opsCount.get()); + assertEquals(0, queue.size()); + + for (int i = 1; i <= NUM_TABLES; ++i) { + TableName table = TableName.valueOf(String.format("testtb-%04d", i)); + assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table)); + } + } + + public static class TestTableProcSet { + private final MasterProcedureScheduler queue; + private Map procsMap = + new ConcurrentHashMap(); + + public TestTableProcSet(final MasterProcedureScheduler queue) { + this.queue = queue; + } + + public void addBack(TableProcedureInterface tableProc) { + Procedure proc = (Procedure)tableProc; + procsMap.put(proc.getProcId(), tableProc); + queue.addBack(proc); + } + + public void addFront(TableProcedureInterface tableProc) { + Procedure proc = (Procedure)tableProc; + procsMap.put(proc.getProcId(), tableProc); + queue.addFront(proc); + } + + public TableProcedureInterface acquire() { + TableProcedureInterface proc = null; + boolean avail = false; + while (!avail) { + Procedure xProc = queue.poll(); + proc = xProc != null ? procsMap.remove(xProc.getProcId()) : null; + if (proc == null) break; + switch (proc.getTableOperationType()) { + case CREATE: + case DELETE: + case EDIT: + avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(), + "op="+ proc.getTableOperationType()); + break; + case READ: + avail = queue.tryAcquireTableSharedLock(proc.getTableName(), + "op="+ proc.getTableOperationType()); + break; + } + if (!avail) { + addFront(proc); + LOG.debug("yield procId=" + proc); + } + } + return proc; + } + + public void release(TableProcedureInterface proc) { + switch (proc.getTableOperationType()) { + case CREATE: + case DELETE: + case EDIT: + queue.releaseTableExclusiveLock(proc.getTableName()); + break; + case READ: + queue.releaseTableSharedLock(proc.getTableName()); + break; + } + } + } + + public static class TestTableProcedure extends Procedure + implements TableProcedureInterface { + private final TableOperationType opType; + private final TableName tableName; + + public TestTableProcedure() { + throw new UnsupportedOperationException("recovery should not be triggered here"); + } + + public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) { + this.tableName = tableName; + this.opType = opType; + setProcId(procId); + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return opType; + } + + @Override + protected Procedure[] execute(Void env) { + return null; + } + + @Override + protected void rollback(Void env) { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(Void env) { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException {} + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException {} + } +}