diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index f9032c4..7cc0954 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -24,10 +24,11 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.HashSet; +import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -470,12 +471,7 @@ public class ProcedureExecutor { // Initialize procedures executor for (int i = 0; i < numThreads; ++i) { - threads[i] = new Thread("ProcedureExecutor-" + i) { - @Override - public void run() { - execLoop(); - } - }; + threads[i] = new Executor("ProcedureExecutor-" + i); } // Initialize procedures timeout handler (this is the +1 thread) @@ -580,6 +576,10 @@ public class ProcedureExecutor { return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE); } + public long submitProcedure(final Procedure proc, boolean addToFront) { + return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE, addToFront); + } + /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. @@ -591,6 +591,14 @@ public class ProcedureExecutor { final Procedure proc, final long nonceGroup, final long nonce) { + return submitProcedure(proc, nonceGroup, nonce, false); + } + + public long submitProcedure( + final Procedure proc, + final long nonceGroup, + final long nonce, + boolean addToFront) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(isRunning()); Preconditions.checkArgument(lastProcId.get() >= 0); @@ -639,7 +647,11 @@ public class ProcedureExecutor { assert !procedures.containsKey(currentProcId); procedures.put(currentProcId, proc); sendProcedureAddedNotification(currentProcId); - runnables.addBack(proc); + if (addToFront) { + runnables.addFront(proc); + } else { + runnables.addBack(proc); + } return currentProcId; } @@ -739,6 +751,92 @@ public class ProcedureExecutor { return runnables; } + public Iterable getActiveProcedures() { + return new ActiveProcedureIterator(); + } + + private class Executor extends Thread { + private final ReentrantLock lock = new ReentrantLock(); + private Procedure activeProcedure; + + public Executor(final String name) { + super(name); + } + + @Override + public void run() { + while (isRunning()) { + lock.lock(); + try { + Long procId = runnables.poll(); + activeProcedure = procId != null ? procedures.get(procId) : null; + if (activeProcedure == null) continue; + } finally { + lock.unlock(); + } + + try { + activeExecutorCount.incrementAndGet(); + execLoop(activeProcedure); + } finally { + activeExecutorCount.decrementAndGet(); + } + } + } + + public Procedure getActiveProcedure() { + if (lock.tryLock()) { + Procedure proc = activeProcedure; + lock.unlock(); + return proc; + } + return null; + } + } + + private class ActiveProcedureIterator implements Iterator, Iterable { + private Procedure nextProc = null; + private int index = 0; + + public ActiveProcedureIterator() { + } + + @Override + public Iterator iterator() { + return this; + } + + @Override + public boolean hasNext() { + nextProc = fetchNext(); + return nextProc != null; + } + + @Override + public Procedure next() { + if (nextProc == null) { + nextProc = fetchNext(); + if (nextProc == null) { + throw new NoSuchElementException(); + } + } + return nextProc; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private Procedure fetchNext() { + Procedure proc = null; + for (; index < (threads.length - 1) && proc == null; ++index) { + proc = ((Executor)threads[index]).getActiveProcedure(); + } + return proc; + } + }; + /** * Execution loop (N threads) * while the executor is in a running state, diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java index 242ae86..de05a5c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2; import java.util.Map; +import java.util.NavigableSet; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ConcurrentSkipListMap; @@ -66,6 +67,10 @@ public class ProcedureFairRunQueues keySet() { + return objMap.keySet(); + } + public TQueue get(final TKey key) { return objMap.get(key); } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 34774ed..e7d08ba 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -127,6 +127,16 @@ public class ProcedureTestingUtility { } } + public static boolean isProcedureActive(ProcedureExecutor procExecutor, + long procId) { + for (Procedure proc: procExecutor.getActiveProcedures()) { + if (proc.getProcId() == procId) { + return true; + } + } + return false; + } + public static long submitAndWait(ProcedureExecutor procExecutor, Procedure proc) { return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -145,6 +155,12 @@ public class ProcedureTestingUtility { } } + public static void waitProcedureActive(ProcedureExecutor procExecutor, long procId) { + while (!isProcedureActive(procExecutor, procId) && procExecutor.isRunning()) { + Threads.sleepWithoutInterrupt(250); + } + } + public static void waitNoProcedureRunning(ProcedureExecutor procExecutor) { int stableRuns = 0; while (stableRuns < 10) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutorMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutorMetrics.java new file mode 100644 index 0000000..f183059 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutorMetrics.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import org.junit.After; +import org.junit.Before; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureExecutorMetrics { + private static final Log LOG = LogFactory.getLog(TestProcedureExecutorMetrics.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 4; + private static final Procedure NULL_PROC = null; + + private ProcedureExecutor procExecutor; + private ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + procStore = new NoopProcedureStore(); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + } + + @Test(timeout=60000) + public void testRunningProcedures() throws Exception { + assertEquals(0, procExecutor.getActiveExecutorCount()); + assertEquals(0, countActiveProcedures()); + + TestProcedure[] procs = new TestProcedure[PROCEDURE_EXECUTOR_SLOTS * 2]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestProcedure(); + } + + // submit all the procedures, only the first N are running + for (int i = 0; i < procs.length; ++i) { + long procId = procExecutor.submitProcedure(procs[i]); + if (i < PROCEDURE_EXECUTOR_SLOTS) { + ProcedureTestingUtility.waitProcedureActive(procExecutor, procId); + } + + int expectedActive = Math.min(i + 1, PROCEDURE_EXECUTOR_SLOTS); + assertEquals(expectedActive, procExecutor.getActiveExecutorCount()); + assertExpectedProcedures(procs, 0, expectedActive); + } + + // start 'stopping' the procedures, and check which ones are running + for (int i = 0; i < procs.length; ++i) { + // stop the procedure + procs[i].latchCountDown(); + ProcedureTestingUtility.waitProcedure(procExecutor, procs[i].getProcId()); + + int expectedActive = Math.min(procs.length - i - 1, PROCEDURE_EXECUTOR_SLOTS); + if (expectedActive == 0) break; + + // make sure we have the last one expected running + long procId = procs[i + expectedActive].getProcId(); + ProcedureTestingUtility.waitProcedureActive(procExecutor, procId); + + assertEquals(expectedActive, procExecutor.getActiveExecutorCount()); + assertExpectedProcedures(procs, i + 1, expectedActive); + } + + assertEquals(0, procExecutor.getActiveExecutorCount()); + assertEquals(0, countActiveProcedures()); + } + + private int countActiveProcedures() { + int count = 0; + for (Procedure proc: procExecutor.getActiveProcedures()) { + count++; + } + return count; + } + + private void assertExpectedProcedures(TestProcedure[] procs, int offset, int count) { + int found = 0; + for (Procedure proc: procExecutor.getActiveProcedures()) { + boolean match = false; + for (int i = offset; i < (offset + count); ++i) { + if (proc == procs[i]) { + match = true; + break; + } + } + assertTrue("unexpected procedure " + proc, match); + found++; + } + assertEquals(count, found); + } + + private static class TestProcedure extends Procedure { + private CountDownLatch latch; + + public TestProcedure() { + setLatch(1); + } + + public void setLatch(int count) { + latch = new CountDownLatch(count); + } + + public void latchCountDown() { + latch.countDown(); + } + + @Override + protected Procedure[] execute(Void env) throws InterruptedException { + latch.await(); + return null; + } + + @Override + protected void rollback(Void env) { } + + @Override + protected boolean abort(Void env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index f7f98fe..1a5b67a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -70,8 +70,10 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; -import org.apache.hadoop.hbase.master.handler.DisableTableHandler; -import org.apache.hadoop.hbase.master.handler.EnableTableHandler; +import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.quotas.RegionStateListener; @@ -1581,7 +1583,7 @@ public class AssignmentManager { } } // assign all the replicas that were not recorded in the meta - assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server)); + assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server)); } /** @@ -1645,7 +1647,7 @@ public class AssignmentManager { // maybe because it crashed. PairOfSameType p = MetaTableAccessor.getMergeRegions(result); if (p.getFirst() != null && p.getSecond() != null) { - int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst(). + int numReplicas = server.getTableDescriptors().get(p.getFirst(). getTable()).getRegionReplication(); for (HRegionInfo merge : p) { for (int i = 1; i < numReplicas; i++) { @@ -1710,16 +1712,33 @@ public class AssignmentManager { */ private void recoverTableInDisablingState() throws KeeperException, IOException { + // complete in-progress enable operations + ProcedureSyncWait.waitForProceduresToComplete( + server.getMasterProcedureExecutor(), DisableTableProcedure.class, + new ProcedureSyncWait.ProcedureEditor() { + @Override + public void editProcedure(final Procedure proc) { + assert proc.getClass().isAssignableFrom(DisableTableProcedure.class); + ((DisableTableProcedure)proc).setSkipInitWait(true); + } + }); + + // Recover by calling 'disable'. this should happen only when migrating + // from a non proc-v2 to a proc-v2 master. Set disablingTables = - tableStateManager.getTablesInStates(TableState.State.DISABLING); + tableStateManager.getTablesInStates(TableState.State.DISABLING); if (disablingTables.size() != 0) { for (TableName tableName : disablingTables) { - // Recover by calling DisableTableHandler - LOG.info("The table " + tableName - + " is in DISABLING state. Hence recovering by moving the table" - + " to DISABLED state."); - new DisableTableHandler(this.server, tableName, - this, tableLockManager, true).prepare().process(); + LOG.info("The table " + tableName + " is in DISABLING state. " + + "Unable to find a pending DisableTableProcedure, starting a new one."); + try { + ProcedureSyncWait.submitAndWaitProcedure(server.getMasterProcedureExecutor(), + new DisableTableProcedure(server.getMasterProcedureExecutor().getEnvironment(), + tableName), true); + } catch (TableNotFoundException e) { + LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); + continue; + } } } } @@ -1732,27 +1751,34 @@ public class AssignmentManager { * @throws org.apache.hadoop.hbase.TableNotFoundException * @throws IOException */ - private void recoverTableInEnablingState() - throws KeeperException, IOException { - Set enablingTables = tableStateManager. - getTablesInStates(TableState.State.ENABLING); + private void recoverTableInEnablingState() throws KeeperException, IOException { + // complete in-progress enable operations + ProcedureSyncWait.waitForProceduresToComplete( + server.getMasterProcedureExecutor(), EnableTableProcedure.class, + new ProcedureSyncWait.ProcedureEditor() { + @Override + public void editProcedure(final Procedure proc) { + assert proc.getClass().isAssignableFrom(EnableTableProcedure.class); + ((EnableTableProcedure)proc).setSkipInitWait(true); + } + }); + + // Recover by calling 'enable'. this should happen only when migrating + // from a non proc-v2 to a proc-v2 master. + Set enablingTables = + tableStateManager.getTablesInStates(TableState.State.ENABLING); if (enablingTables.size() != 0) { for (TableName tableName : enablingTables) { - // Recover by calling EnableTableHandler - LOG.info("The table " + tableName - + " is in ENABLING state. Hence recovering by moving the table" - + " to ENABLED state."); - // enableTable in sync way during master startup, - // no need to invoke coprocessor - EnableTableHandler eth = new EnableTableHandler(this.server, tableName, - this, tableLockManager, true); + LOG.info("The table " + tableName + " is in ENABLING state. " + + "Unable to find a pending EnableTableProcedure, starting a new one."); try { - eth.prepare(); + ProcedureSyncWait.submitAndWaitProcedure(server.getMasterProcedureExecutor(), + new EnableTableProcedure(server.getMasterProcedureExecutor().getEnvironment(), + tableName), true); } catch (TableNotFoundException e) { LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); continue; } - eth.process(); } } } @@ -2627,7 +2653,7 @@ public class AssignmentManager { } int numReplicas = 1; try { - numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()). + numReplicas = server.getTableDescriptors().get(mergedHri.getTable()). getRegionReplication(); } catch (IOException e) { LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() + @@ -2656,7 +2682,7 @@ public class AssignmentManager { // the replica1s of daughters will be on the same machine int numReplicas = 1; try { - numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()). + numReplicas = server.getTableDescriptors().get(parentHri.getTable()). getRegionReplication(); } catch (IOException e) { LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java deleted file mode 100644 index d34f25e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkAssigner; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.htrace.Trace; - -/** - * Handler to run disable of a table. - */ -@InterfaceAudience.Private -public class DisableTableHandler extends EventHandler { - private static final Log LOG = LogFactory.getLog(DisableTableHandler.class); - private final TableName tableName; - private final AssignmentManager assignmentManager; - private final TableLockManager tableLockManager; - private final boolean skipTableStateCheck; - private TableLock tableLock; - - public DisableTableHandler(Server server, TableName tableName, - AssignmentManager assignmentManager, TableLockManager tableLockManager, - boolean skipTableStateCheck) { - super(server, EventType.C_M_DISABLE_TABLE); - this.tableName = tableName; - this.assignmentManager = assignmentManager; - this.tableLockManager = tableLockManager; - this.skipTableStateCheck = skipTableStateCheck; - } - - public DisableTableHandler prepare() - throws TableNotFoundException, TableNotEnabledException, IOException { - if(tableName.equals(TableName.META_TABLE_NAME)) { - throw new ConstraintException("Cannot disable catalog table"); - } - //acquire the table write lock, blocking - this.tableLock = this.tableLockManager.writeLock(tableName, - EventType.C_M_DISABLE_TABLE.toString()); - this.tableLock.acquire(); - - boolean success = false; - try { - // Check if table exists - if (!MetaTableAccessor.tableExists(this.server.getConnection(), tableName)) { - throw new TableNotFoundException(tableName); - } - - // There could be multiple client requests trying to disable or enable - // the table at the same time. Ensure only the first request is honored - // After that, no other requests can be accepted until the table reaches - // DISABLED or ENABLED. - //TODO: reevaluate this since we have table locks now - if (!skipTableStateCheck) { - TableState.State state = this.assignmentManager. - getTableStateManager().setTableStateIfInStates( - this.tableName, TableState.State.DISABLING, - TableState.State.ENABLED); - if (state!=null) { - LOG.info("Table " + tableName + " isn't enabled;is "+state.name()+"; skipping disable"); - throw new TableNotEnabledException(this.tableName+" state is "+state.name()); - } - } - success = true; - } finally { - if (!success) { - releaseTableLock(); - } - } - - return this; - } - - @Override - public String toString() { - String name = "UnknownServerName"; - if(server != null && server.getServerName() != null) { - name = server.getServerName().toString(); - } - return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" + - tableName; - } - - @Override - public void process() { - try { - LOG.info("Attempting to disable table " + this.tableName); - MasterCoprocessorHost cpHost = ((HMaster) this.server) - .getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.preDisableTableHandler(this.tableName); - } - handleDisableTable(); - if (cpHost != null) { - cpHost.postDisableTableHandler(this.tableName); - } - } catch (IOException e) { - LOG.error("Error trying to disable table " + this.tableName, e); - } finally { - releaseTableLock(); - } - } - - private void releaseTableLock() { - if (this.tableLock != null) { - try { - this.tableLock.release(); - } catch (IOException ex) { - LOG.warn("Could not release the table lock", ex); - } - } - } - - private void handleDisableTable() throws IOException { - // Set table disabling flag up in zk. - this.assignmentManager.getTableStateManager().setTableState(this.tableName, - TableState.State.DISABLING); - boolean done = false; - while (true) { - // Get list of online regions that are of this table. Regions that are - // already closed will not be included in this list; i.e. the returned - // list is not ALL regions in a table, its all online regions according - // to the in-memory state on this master. - final List regions = this.assignmentManager - .getRegionStates().getRegionsOfTable(tableName); - if (regions.size() == 0) { - done = true; - break; - } - LOG.info("Offlining " + regions.size() + " regions."); - BulkDisabler bd = new BulkDisabler(this.server, regions); - try { - if (bd.bulkAssign()) { - done = true; - break; - } - } catch (InterruptedException e) { - LOG.warn("Disable was interrupted"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - break; - } - } - // Flip the table to disabled if success. - if (done) this.assignmentManager.getTableStateManager().setTableState(this.tableName, - TableState.State.DISABLED); - LOG.info("Disabled table, " + this.tableName + ", is done=" + done); - } - - /** - * Run bulk disable. - */ - class BulkDisabler extends BulkAssigner { - private final List regions; - - BulkDisabler(final Server server, final List regions) { - super(server); - this.regions = regions; - } - - @Override - protected void populatePool(ExecutorService pool) { - RegionStates regionStates = assignmentManager.getRegionStates(); - for (HRegionInfo region: regions) { - if (regionStates.isRegionInTransition(region) - && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) { - continue; - } - final HRegionInfo hri = region; - pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler",new Runnable() { - public void run() { - assignmentManager.unassign(hri); - } - })); - } - } - - @Override - protected boolean waitUntilDone(long timeout) - throws InterruptedException { - long startTime = System.currentTimeMillis(); - long remaining = timeout; - List regions = null; - long lastLogTime = startTime; - while (!server.isStopped() && remaining > 0) { - Thread.sleep(waitingTimeForEvents); - regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName); - long now = System.currentTimeMillis(); - // Don't log more than once every ten seconds. Its obnoxious. And only log table regions - // if we are waiting a while for them to go down... - if (LOG.isDebugEnabled() && ((now - lastLogTime) > 10000)) { - lastLogTime = now; - LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions); - } - if (regions.isEmpty()) break; - remaining = timeout - (now - startTime); - } - return regions != null && regions.isEmpty(); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java deleted file mode 100644 index fd3d4c7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotDisabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkAssigner; -import org.apache.hadoop.hbase.master.GeneralBulkAssigner; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; - -/** - * Handler to run enable of a table. - */ -@InterfaceAudience.Private -public class EnableTableHandler extends EventHandler { - private static final Log LOG = LogFactory.getLog(EnableTableHandler.class); - private final TableName tableName; - private final AssignmentManager assignmentManager; - private final TableLockManager tableLockManager; - private boolean skipTableStateCheck = false; - private TableLock tableLock; - private MasterServices services; - - public EnableTableHandler(Server server, TableName tableName, - AssignmentManager assignmentManager, TableLockManager tableLockManager, - boolean skipTableStateCheck) { - super(server, EventType.C_M_ENABLE_TABLE); - this.tableName = tableName; - this.assignmentManager = assignmentManager; - this.tableLockManager = tableLockManager; - this.skipTableStateCheck = skipTableStateCheck; - } - - public EnableTableHandler(MasterServices services, TableName tableName, - AssignmentManager assignmentManager, - TableLockManager tableLockManager, boolean skipTableStateCheck) { - this((Server)services, tableName, assignmentManager, tableLockManager, - skipTableStateCheck); - this.services = services; - } - - public EnableTableHandler prepare() - throws TableNotFoundException, TableNotDisabledException, IOException { - //acquire the table write lock, blocking - this.tableLock = this.tableLockManager.writeLock(tableName, - EventType.C_M_ENABLE_TABLE.toString()); - this.tableLock.acquire(); - - boolean success = false; - try { - // Check if table exists - if (!MetaTableAccessor.tableExists(this.server.getConnection(), tableName)) { - throw new TableNotFoundException(tableName); - } - - // There could be multiple client requests trying to disable or enable - // the table at the same time. Ensure only the first request is honored - // After that, no other requests can be accepted until the table reaches - // DISABLED or ENABLED. - if (!skipTableStateCheck) { - TableState.State state = this.assignmentManager - .getTableStateManager().setTableStateIfInStates( - this.tableName, TableState.State.ENABLING, - TableState.State.DISABLED); - if (state!=null) { - LOG.info("Table " + tableName + " isn't disabled;is "+state.name()+"; skipping enable"); - throw new TableNotDisabledException(this.tableName+" state is "+state.name()); - } - } - success = true; - } finally { - if (!success) { - releaseTableLock(); - } - } - return this; - } - - @Override - public String toString() { - String name = "UnknownServerName"; - if(server != null && server.getServerName() != null) { - name = server.getServerName().toString(); - } - return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" + - tableName; - } - - @Override - public void process() { - try { - LOG.info("Attempting to enable the table " + this.tableName); - MasterCoprocessorHost cpHost = ((HMaster) this.server) - .getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.preEnableTableHandler(this.tableName); - } - handleEnableTable(); - if (cpHost != null) { - cpHost.postEnableTableHandler(this.tableName); - } - } catch (IOException | InterruptedException e) { - LOG.error("Error trying to enable the table " + this.tableName, e); - } finally { - releaseTableLock(); - } - } - - private void releaseTableLock() { - if (this.tableLock != null) { - try { - this.tableLock.release(); - } catch (IOException ex) { - LOG.warn("Could not release the table lock", ex); - } - } - } - - private void handleEnableTable() throws IOException, - InterruptedException { - // I could check table is disabling and if so, not enable but require - // that user first finish disabling but that might be obnoxious. - - this.assignmentManager.getTableStateManager().setTableState(this.tableName, - TableState.State.ENABLING); - boolean done = false; - ServerManager serverManager = ((HMaster)this.server).getServerManager(); - // Get the regions of this table. We're done when all listed - // tables are onlined. - List> tableRegionsAndLocations; - if (TableName.META_TABLE_NAME.equals(tableName)) { - tableRegionsAndLocations = new MetaTableLocator().getMetaRegionsAndLocations( - server.getZooKeeper()); - } else { - tableRegionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations( - server.getConnection(), tableName, true); - } - - int countOfRegionsInTable = tableRegionsAndLocations.size(); - Map regionsToAssign = - regionsToAssignWithServerName(tableRegionsAndLocations); - if (services != null) { - // need to potentially create some regions for the replicas - List unrecordedReplicas = AssignmentManager.replicaRegionsNotRecordedInMeta( - new HashSet(regionsToAssign.keySet()), services); - Map> srvToUnassignedRegs = - this.assignmentManager.getBalancer().roundRobinAssignment(unrecordedReplicas, - serverManager.getOnlineServersList()); - if (srvToUnassignedRegs != null) { - for (Map.Entry> entry : srvToUnassignedRegs.entrySet()) { - for (HRegionInfo h : entry.getValue()) { - regionsToAssign.put(h, entry.getKey()); - } - } - } - } - int regionsCount = regionsToAssign.size(); - if (regionsCount == 0) { - done = true; - } - LOG.info("Table '" + this.tableName + "' has " + countOfRegionsInTable - + " regions, of which " + regionsCount + " are offline."); - List onlineServers = serverManager.createDestinationServersList(); - Map> bulkPlan = - this.assignmentManager.getBalancer().retainAssignment(regionsToAssign, onlineServers); - if (bulkPlan != null) { - LOG.info("Bulk assigning " + regionsCount + " region(s) across " + bulkPlan.size() - + " server(s), retainAssignment=true"); - - BulkAssigner ba = - new GeneralBulkAssigner(this.server, bulkPlan, this.assignmentManager, true); - try { - if (ba.bulkAssign()) { - done = true; - } - } catch (InterruptedException e) { - LOG.warn("Enable operation was interrupted when enabling table '" - + this.tableName + "'"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - } - } else { - LOG.info("Balancer was unable to find suitable servers for table " + tableName - + ", leaving unassigned"); - done = true; - } - if (done) { - // Flip the table to enabled. - this.assignmentManager.getTableStateManager().setTableState( - this.tableName, TableState.State.ENABLED); - LOG.info("Table '" + this.tableName - + "' was successfully enabled. Status: done=" + done); - } else { - LOG.warn("Table '" + this.tableName - + "' wasn't successfully enabled. Status: done=" + done); - } - } - - /** - * @param regionsInMeta - * @return List of regions neither in transition nor assigned. - * @throws IOException - */ - private Map regionsToAssignWithServerName( - final List> regionsInMeta) throws IOException { - Map regionsToAssign = - new HashMap(regionsInMeta.size()); - RegionStates regionStates = this.assignmentManager.getRegionStates(); - for (Pair regionLocation : regionsInMeta) { - HRegionInfo hri = regionLocation.getFirst(); - ServerName sn = regionLocation.getSecond(); - if (regionStates.isRegionOffline(hri)) { - regionsToAssign.put(hri, sn); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping assign for the region " + hri + " during enable table " - + hri.getTable() + " because its already in tranition or assigned."); - } - } - } - return regionsToAssign; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index 351751e3..74e2510 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -63,8 +63,9 @@ public class DisableTableProcedure private final ProcedurePrepareLatch syncLatch; private TableName tableName; - private boolean skipTableStateCheck; private UserGroupInformation user; + private boolean skipTableStateCheck; + private boolean skipInitWait; private Boolean traceEnabled = null; @@ -76,6 +77,20 @@ public class DisableTableProcedure public DisableTableProcedure() { syncLatch = null; + skipInitWait = false; + } + + /** + * Constructor used by AM + * @param env MasterProcedureEnv + * @param tableName the table to operate on + * @throws IOException + */ + public DisableTableProcedure( + final MasterProcedureEnv env, + final TableName tableName) throws IOException { + this(env, tableName, true, null); + skipInitWait = true; } /** @@ -117,6 +132,15 @@ public class DisableTableProcedure // Note: the member syncLatch could be null if we are in failover or recovery scenario. // This is ok for backward compatible, as 1.0 client would not able to peek at procedure. this.syncLatch = syncLatch; + this.skipInitWait = false; + } + + /** + * used by the AM to force disable on startup + */ + @InterfaceAudience.Private + public void setSkipInitWait(boolean skipInitWait) { + this.skipInitWait = skipInitWait; } @Override @@ -214,7 +238,7 @@ public class DisableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (!skipInitWait && !env.isInitialized()) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock( tableName, EventType.C_M_DISABLE_TABLE.toString()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index ef13d07..e03471e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -67,13 +67,28 @@ public class EnableTableProcedure private final ProcedurePrepareLatch syncLatch; private TableName tableName; - private boolean skipTableStateCheck; private UserGroupInformation user; + private boolean skipTableStateCheck; + private boolean skipInitWait; private Boolean traceEnabled = null; public EnableTableProcedure() { syncLatch = null; + skipInitWait = false; + } + + /** + * Constructor used by AM + * @param env MasterProcedureEnv + * @param tableName the table to operate on + * @throws IOException + */ + public EnableTableProcedure( + final MasterProcedureEnv env, + final TableName tableName) throws IOException { + this(env, tableName, true, null); + this.skipInitWait = true; } /** @@ -115,6 +130,15 @@ public class EnableTableProcedure // Note: the member syncLatch could be null if we are in failover or recovery scenario. // This is ok for backward compatible, as 1.0 client would not able to peek at procedure. this.syncLatch = syncLatch; + this.skipInitWait = false; + } + + /** + * used by the AM to force enable on startup + */ + @InterfaceAudience.Private + public void setSkipInitWait(boolean skipInitWait) { + this.skipInitWait = skipInitWait; } @Override @@ -234,7 +258,7 @@ public class EnableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (!skipInitWait && !env.isInitialized()) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock( tableName, EventType.C_M_ENABLE_TABLE.toString()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java index c4c7747..d5fbd5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; +import java.util.NavigableSet; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -159,6 +160,34 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { return rq.poll(); } + @InterfaceAudience.Private + public interface Predicate { + boolean evaluate(long procId); + } + + public Long pollIfMatch(final TableName table, final Predicate predicate) { + lock.lock(); + try { + TableRunQueue queue = getRunQueue(table); + if (queue != null && !queue.isEmpty() && predicate.evaluate(queue.peek())) { + return doPoll(queue); + } + return null; + } finally { + lock.unlock(); + } + } + + public Long pop(final TableName table) { + lock.lock(); + try { + TableRunQueue queue = getRunQueue(table); + return queue != null ? doPoll(queue) : null; + } finally { + lock.unlock(); + } + } + @Override public void signalAll() { lock.lock(); @@ -264,6 +293,10 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { return new TableRunQueue(priority); } + public NavigableSet getTables() { + return tableFairQ.keySet(); + } + private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) { return new ServerRunQueue(DEFAULT_SERVER_PRIORITY); } @@ -391,6 +424,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { void addBack(Procedure proc); Long poll(); boolean acquireDeleteLock(); + Long peek(); } /** @@ -447,6 +481,11 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { return tryExclusiveLock(); } + @Override + public Long peek() { + return this.runnables.peek(); + } + public synchronized boolean isLocked() { return isExclusiveLock() || sharedLock > 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java index 1eb0073..b22e66a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -65,9 +65,19 @@ public final class ProcedureSyncWait { T evaluate() throws IOException; } + @InterfaceAudience.Private + public interface ProcedureEditor { + void editProcedure(final Procedure proc); + } + public static byte[] submitAndWaitProcedure(ProcedureExecutor procExec, final Procedure proc) throws IOException { - long procId = procExec.submitProcedure(proc); + return submitAndWaitProcedure(procExec, proc, false); + } + + public static byte[] submitAndWaitProcedure(ProcedureExecutor procExec, + final Procedure proc, final boolean addToFront) throws IOException { + long procId = procExec.submitProcedure(proc, addToFront); return waitForProcedureToComplete(procExec, procId); } @@ -94,6 +104,44 @@ public final class ProcedureSyncWait { } } + public static void waitForProceduresToComplete(ProcedureExecutor procExec, + final Class procType, final ProcedureEditor procEditor) + throws IOException { + for (TableName table: procExec.getEnvironment().getProcedureQueue().getTables()) { + waitForProcedureToComplete(procExec, table, procType, procEditor); + } + } + + public static byte[] waitForProcedureToComplete( + final ProcedureExecutor procExec, + final TableName table, final Class procType, + final ProcedureEditor procEditor) throws IOException { + MasterProcedureEnv env = procExec.getEnvironment(); + // look if the procedure is the head of the queue + Long procId = env.getProcedureQueue().pollIfMatch(table, new MasterProcedureQueue.Predicate() { + @Override + public boolean evaluate(long procId) { + Procedure proc = procExec.getProcedure(procId); + return proc.getClass().isAssignableFrom(procType); + } + }); + if (procId != null) { + if (procEditor != null) { + Procedure proc = procExec.getProcedure(procId); + procEditor.editProcedure(proc); + } + return waitForProcedureToComplete(procExec, procId); + } + // look into active procedures + for (Procedure proc: procExec.getActiveProcedures()) { + if (proc.getClass().isAssignableFrom(procType)) { + if (procEditor != null) procEditor.editProcedure(proc); + return waitForProcedureToComplete(procExec, proc.getProcId()); + } + } + return null; + } + public static T waitFor(MasterProcedureEnv env, String purpose, Predicate predicate) throws IOException { final Configuration conf = env.getMasterConfiguration(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestEnableTableHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestEnableTableHandler.java deleted file mode 100644 index f5c8b90..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestEnableTableHandler.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.util.ArrayList; -import java.util.List; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@Category({ MasterTests.class, MediumTests.class }) -public class TestEnableTableHandler { - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final Log LOG = LogFactory.getLog(TestEnableTableHandler.class); - private static final byte[] FAMILYNAME = Bytes.toBytes("fam"); - - @Before - public void setUp() throws Exception { - TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, - MasterSyncObserver.class.getName()); - TEST_UTIL.startMiniCluster(1); - } - - @After - public void tearDown() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test(timeout = 300000) - public void testEnableTableWithNoRegionServers() throws Exception { - final TableName tableName = TableName.valueOf("testEnableTableWithNoRegionServers"); - final MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); - final HMaster m = cluster.getMaster(); - final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); - final HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(FAMILYNAME)); - admin.createTable(desc); - admin.disableTable(tableName); - TEST_UTIL.waitTableDisabled(tableName.getName()); - - admin.enableTable(tableName); - TEST_UTIL.waitTableEnabled(tableName); - // disable once more - admin.disableTable(tableName); - - TEST_UTIL.waitUntilNoRegionsInTransition(60000); - // now stop region servers - JVMClusterUtil.RegionServerThread rs = cluster.getRegionServerThreads().get(0); - rs.getRegionServer().stop("stop"); - cluster.waitForRegionServerToStop(rs.getRegionServer().getServerName(), 10000); - - LOG.debug("Now enabling table " + tableName); - - admin.enableTable(tableName); - assertTrue(admin.isTableEnabled(tableName)); - - JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer(); - cluster.waitForRegionServerToStart(rs2.getRegionServer().getServerName().getHostname(), - rs2.getRegionServer().getServerName().getPort(), 60000); - - List regions = TEST_UTIL.getHBaseAdmin().getTableRegions(tableName); - assertEquals(1, regions.size()); - for (HRegionInfo region : regions) { - TEST_UTIL.getHBaseAdmin().assign(region.getEncodedNameAsBytes()); - } - LOG.debug("Waiting for table assigned " + tableName); - TEST_UTIL.waitUntilAllRegionsAssigned(tableName); - List onlineRegions = admin.getOnlineRegions( - rs2.getRegionServer().getServerName()); - ArrayList tableRegions = filterTableRegions(tableName, onlineRegions); - assertEquals(1, tableRegions.size()); - } - - private ArrayList filterTableRegions(final TableName tableName, - List onlineRegions) { - return Lists.newArrayList(Iterables.filter(onlineRegions, new Predicate() { - @Override - public boolean apply(HRegionInfo input) { - return input.getTable().equals(tableName); - } - })); - } - - /** - * We were only clearing rows that had a hregioninfo column in hbase:meta. Mangled rows that - * were missing the hregioninfo because of error were being left behind messing up any - * subsequent table made with the same name. HBASE-12980 - * @throws IOException - * @throws InterruptedException - */ - @Test(timeout=60000) - public void testDeleteForSureClearsAllTableRowsFromMeta() - throws IOException, InterruptedException { - final TableName tableName = TableName.valueOf("testDeleteForSureClearsAllTableRowsFromMeta"); - final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); - final HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(FAMILYNAME)); - try { - createTable(TEST_UTIL, desc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - } catch (Exception e) { - e.printStackTrace(); - fail("Got an exception while creating " + tableName); - } - // Now I have a nice table, mangle it by removing the HConstants.REGIONINFO_QUALIFIER_STR - // content from a few of the rows. - try (Table metaTable = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { - try (ResultScanner scanner = - metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { - for (Result result : scanner) { - // Just delete one row. - Delete d = new Delete(result.getRow()); - d.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); - LOG.info("Mangled: " + d); - metaTable.delete(d); - break; - } - } - admin.disableTable(tableName); - TEST_UTIL.waitTableDisabled(tableName.getName()); - // Rely on the coprocessor based latch to make the operation synchronous. - try { - deleteTable(TEST_UTIL, tableName); - } catch (Exception e) { - e.printStackTrace(); - fail("Got an exception while deleting " + tableName); - } - int rowCount = 0; - try (ResultScanner scanner = - metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { - for (Result result : scanner) { - LOG.info("Found when none expected: " + result); - rowCount++; - } - } - assertEquals(0, rowCount); - } - } - - public static class MasterSyncObserver extends BaseMasterObserver { - volatile CountDownLatch tableCreationLatch = null; - volatile CountDownLatch tableDeletionLatch = null; - - @Override - public void postCreateTableHandler(final ObserverContext ctx, - HTableDescriptor desc, HRegionInfo[] regions) throws IOException { - // the AccessController test, some times calls only and directly the postCreateTableHandler() - if (tableCreationLatch != null) { - tableCreationLatch.countDown(); - } - } - - @Override - public void postDeleteTableHandler(final ObserverContext ctx, - TableName tableName) - throws IOException { - // the AccessController test, some times calls only and directly the postDeleteTableHandler() - if (tableDeletionLatch != null) { - tableDeletionLatch.countDown(); - } - } - } - - public static void createTable(HBaseTestingUtility testUtil, HTableDescriptor htd, - byte [][] splitKeys) - throws Exception { - createTable(testUtil, testUtil.getHBaseAdmin(), htd, splitKeys); - } - - public static void createTable(HBaseTestingUtility testUtil, HBaseAdmin admin, - HTableDescriptor htd, byte [][] splitKeys) - throws Exception { - // NOTE: We need a latch because admin is not sync, - // so the postOp coprocessor method may be called after the admin operation returned. - MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster() - .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName()); - observer.tableCreationLatch = new CountDownLatch(1); - if (splitKeys != null) { - admin.createTable(htd, splitKeys); - } else { - admin.createTable(htd); - } - observer.tableCreationLatch.await(); - observer.tableCreationLatch = null; - testUtil.waitUntilAllRegionsAssigned(htd.getTableName()); - } - - public static void deleteTable(HBaseTestingUtility testUtil, TableName tableName) - throws Exception { - deleteTable(testUtil, testUtil.getHBaseAdmin(), tableName); - } - - public static void deleteTable(HBaseTestingUtility testUtil, HBaseAdmin admin, - TableName tableName) - throws Exception { - // NOTE: We need a latch because admin is not sync, - // so the postOp coprocessor method may be called after the admin operation returned. - MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster() - .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName()); - observer.tableDeletionLatch = new CountDownLatch(1); - try { - admin.disableTable(tableName); - } catch (Exception e) { - LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); - } - admin.deleteTable(tableName); - observer.tableDeletionLatch.await(); - observer.tableDeletionLatch = null; - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java index c8d3a62..d6cd0bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java @@ -453,6 +453,7 @@ public class TestMasterFailoverWithProcedures { LOG.info("Trigger master failover"); masterFailover(testUtil); + LOG.info("Backup master is up"); procExec = testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor(); ProcedureTestingUtility.waitProcedure(procExec, procId); @@ -478,14 +479,21 @@ public class TestMasterFailoverWithProcedures { final HMaster oldMaster) throws Exception { MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); + int count = 0; HMaster newMaster = cluster.getMaster(); while (newMaster == null || newMaster == oldMaster) { Thread.sleep(250); + if (++count % 10 == 0) { + LOG.debug("waiting for backup master to be online"); + } newMaster = cluster.getMaster(); } while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { Thread.sleep(250); + if (++count % 10 == 0) { + LOG.debug("waiting for backup master to be online"); + } } }