diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 91ae16e..a4215f9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -24,10 +24,11 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.HashSet; +import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -447,16 +448,11 @@ public class ProcedureExecutor { // Initialize procedures executor for (int i = 0; i < numThreads; ++i) { - threads[i] = new Thread("ProcedureExecutorThread-" + i) { - @Override - public void run() { - execLoop(); - } - }; + threads[i] = new Executor("ProcedureExecutorThread-" + i); } // Initialize procedures timeout handler (this is the +1 thread) - threads[numThreads] = new Thread("ProcedureExecutorTimeoutThread") { + threads[numThreads] = new Executor("ProcedureExecutorTimeoutThread") { @Override public void run() { timeoutLoop(); @@ -552,6 +548,10 @@ public class ProcedureExecutor { * @return the procedure id, that can be used to monitor the operation */ public long submitProcedure(final Procedure proc) { + return submitProcedure(proc, false); + } + + public long submitProcedure(final Procedure proc, boolean addToFront) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(isRunning()); Preconditions.checkArgument(lastProcId.get() >= 0); @@ -574,7 +574,11 @@ public class ProcedureExecutor { assert !procedures.containsKey(proc.getProcId()); procedures.put(proc.getProcId(), proc); sendProcedureAddedNotification(proc.getProcId()); - runnables.addBack(proc); + if (addToFront) { + runnables.addFront(proc); + } else { + runnables.addBack(proc); + } return proc.getProcId(); } @@ -659,26 +663,97 @@ public class ProcedureExecutor { return runnables; } + public Iterable getActiveProcedures() { + return new ActiveProcedureIterator(); + } + /** * Execution loop (N threads) * while the executor is in a running state, * fetch a procedure from the runnables queue and start the execution. */ - private void execLoop() { - while (isRunning()) { - Long procId = runnables.poll(); - Procedure proc = procId != null ? procedures.get(procId) : null; - if (proc == null) continue; + private class Executor extends Thread { + private final ReentrantLock lock = new ReentrantLock(); + private Procedure activeProcedure; - try { - activeExecutorCount.incrementAndGet(); - execLoop(proc); - } finally { - activeExecutorCount.decrementAndGet(); + public Executor(final String name) { + super(name); + } + + @Override + public void run() { + while (isRunning()) { + lock.lock(); + try { + Long procId = runnables.poll(); + activeProcedure = procId != null ? procedures.get(procId) : null; + if (activeProcedure == null) continue; + } finally { + lock.unlock(); + } + + try { + activeExecutorCount.incrementAndGet(); + execLoop(activeProcedure); + } finally { + activeExecutorCount.decrementAndGet(); + } + } + } + + public Procedure getActiveProcedure() { + if (lock.tryLock()) { + Procedure proc = activeProcedure; + lock.unlock(); + return proc; } + return null; } } + private class ActiveProcedureIterator implements Iterator, Iterable { + private Procedure nextProc = null; + private int index = 0; + + public ActiveProcedureIterator() { + } + + @Override + public Iterator iterator() { + return this; + } + + @Override + public boolean hasNext() { + nextProc = fetchNext(); + return nextProc != null; + } + + @Override + public Procedure next() { + if (nextProc == null) { + nextProc = fetchNext(); + if (nextProc == null) { + throw new NoSuchElementException(); + } + } + return nextProc; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private Procedure fetchNext() { + Procedure proc = null; + for (; index < (threads.length - 1) && proc == null; ++index) { + proc = ((Executor)threads[index]).getActiveProcedure(); + } + return proc; + } + }; + private void execLoop(Procedure proc) { if (LOG.isTraceEnabled()) { LOG.trace("Trying to start the execution of " + proc); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index ddea9d2..63a1df8 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -109,6 +109,16 @@ public class ProcedureTestingUtility { ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value); } + public static boolean isProcedureActive(ProcedureExecutor procExecutor, + long procId) { + for (Procedure proc: procExecutor.getActiveProcedures()) { + if (proc.getProcId() == procId) { + return true; + } + } + return false; + } + public static long submitAndWait(ProcedureExecutor procExecutor, Procedure proc) { long procId = procExecutor.submitProcedure(proc); waitProcedure(procExecutor, procId); @@ -121,6 +131,12 @@ public class ProcedureTestingUtility { } } + public static void waitProcedureActive(ProcedureExecutor procExecutor, long procId) { + while (!isProcedureActive(procExecutor, procId) && procExecutor.isRunning()) { + Threads.sleepWithoutInterrupt(250); + } + } + public static void waitNoProcedureRunning(ProcedureExecutor procExecutor) { int stableRuns = 0; while (stableRuns < 10) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutorMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutorMetrics.java new file mode 100644 index 0000000..f183059 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutorMetrics.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import org.junit.After; +import org.junit.Before; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureExecutorMetrics { + private static final Log LOG = LogFactory.getLog(TestProcedureExecutorMetrics.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 4; + private static final Procedure NULL_PROC = null; + + private ProcedureExecutor procExecutor; + private ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + procStore = new NoopProcedureStore(); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + } + + @Test(timeout=60000) + public void testRunningProcedures() throws Exception { + assertEquals(0, procExecutor.getActiveExecutorCount()); + assertEquals(0, countActiveProcedures()); + + TestProcedure[] procs = new TestProcedure[PROCEDURE_EXECUTOR_SLOTS * 2]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestProcedure(); + } + + // submit all the procedures, only the first N are running + for (int i = 0; i < procs.length; ++i) { + long procId = procExecutor.submitProcedure(procs[i]); + if (i < PROCEDURE_EXECUTOR_SLOTS) { + ProcedureTestingUtility.waitProcedureActive(procExecutor, procId); + } + + int expectedActive = Math.min(i + 1, PROCEDURE_EXECUTOR_SLOTS); + assertEquals(expectedActive, procExecutor.getActiveExecutorCount()); + assertExpectedProcedures(procs, 0, expectedActive); + } + + // start 'stopping' the procedures, and check which ones are running + for (int i = 0; i < procs.length; ++i) { + // stop the procedure + procs[i].latchCountDown(); + ProcedureTestingUtility.waitProcedure(procExecutor, procs[i].getProcId()); + + int expectedActive = Math.min(procs.length - i - 1, PROCEDURE_EXECUTOR_SLOTS); + if (expectedActive == 0) break; + + // make sure we have the last one expected running + long procId = procs[i + expectedActive].getProcId(); + ProcedureTestingUtility.waitProcedureActive(procExecutor, procId); + + assertEquals(expectedActive, procExecutor.getActiveExecutorCount()); + assertExpectedProcedures(procs, i + 1, expectedActive); + } + + assertEquals(0, procExecutor.getActiveExecutorCount()); + assertEquals(0, countActiveProcedures()); + } + + private int countActiveProcedures() { + int count = 0; + for (Procedure proc: procExecutor.getActiveProcedures()) { + count++; + } + return count; + } + + private void assertExpectedProcedures(TestProcedure[] procs, int offset, int count) { + int found = 0; + for (Procedure proc: procExecutor.getActiveProcedures()) { + boolean match = false; + for (int i = offset; i < (offset + count); ++i) { + if (proc == procs[i]) { + match = true; + break; + } + } + assertTrue("unexpected procedure " + proc, match); + found++; + } + assertEquals(count, found); + } + + private static class TestProcedure extends Procedure { + private CountDownLatch latch; + + public TestProcedure() { + setLatch(1); + } + + public void setLatch(int count) { + latch = new CountDownLatch(count); + } + + public void latchCountDown() { + latch.countDown(); + } + + @Override + protected Procedure[] execute(Void env) throws InterruptedException { + latch.await(); + return null; + } + + @Override + protected void rollback(Void env) { } + + @Override + protected boolean abort(Void env) { return false; } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 34db4e4..5c6ee78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -70,8 +70,9 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; -import org.apache.hadoop.hbase.master.handler.DisableTableHandler; -import org.apache.hadoop.hbase.master.handler.EnableTableHandler; +import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.quotas.RegionStateListener; @@ -393,7 +394,7 @@ public class AssignmentManager { * @throws IOException * @throws KeeperException * @throws InterruptedException - * @throws CoordinatedStateException + * @throws CoordinatedStateException */ void joinCluster() throws IOException, KeeperException, InterruptedException, CoordinatedStateException { @@ -1574,7 +1575,7 @@ public class AssignmentManager { } } // assign all the replicas that were not recorded in the meta - assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server)); + assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server)); } /** @@ -1638,7 +1639,7 @@ public class AssignmentManager { // maybe because it crashed. PairOfSameType p = MetaTableAccessor.getMergeRegions(result); if (p.getFirst() != null && p.getSecond() != null) { - int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst(). + int numReplicas = server.getTableDescriptors().get(p.getFirst(). getTable()).getRegionReplication(); for (HRegionInfo merge : p) { for (int i = 1; i < numReplicas; i++) { @@ -1711,8 +1712,22 @@ public class AssignmentManager { LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); - new DisableTableHandler(this.server, tableName, - this, tableLockManager, true).prepare().process(); + try { + ProcedureSyncWait.waitForProcedureToComplete(server.getMasterProcedureExecutor(), + tableName, DisableTableProcedure.class); + if (tableStateManager.isTableState(tableName, TableState.State.ENABLING)) { + // Recover by calling 'enable'. this should happen only when migrating + // from a non proc-v2 to a proc-v2 master. + LOG.warn("Unable to find a pending EnableTableProcedure for table " + tableName + + ". running starting a new Enable operation."); + ProcedureSyncWait.submitAndWaitProcedure(server.getMasterProcedureExecutor(), + new DisableTableProcedure(server.getMasterProcedureExecutor().getEnvironment(), + tableName, true), true); + } + } catch (TableNotFoundException e) { + LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); + continue; + } } } } @@ -1735,17 +1750,22 @@ public class AssignmentManager { LOG.info("The table " + tableName + " is in ENABLING state. Hence recovering by moving the table" + " to ENABLED state."); - // enableTable in sync way during master startup, - // no need to invoke coprocessor - EnableTableHandler eth = new EnableTableHandler(this.server, tableName, - this, tableLockManager, true); try { - eth.prepare(); + ProcedureSyncWait.waitForProcedureToComplete(server.getMasterProcedureExecutor(), + tableName, EnableTableProcedure.class); + if (tableStateManager.isTableState(tableName, TableState.State.ENABLING)) { + // Recover by calling 'enable'. this should happen only when migrating + // from a non proc-v2 to a proc-v2 master. + LOG.warn("Unable to find a pending EnableTableProcedure for table " + tableName + + ". running starting a new Enable operation."); + ProcedureSyncWait.submitAndWaitProcedure(server.getMasterProcedureExecutor(), + new EnableTableProcedure(server.getMasterProcedureExecutor().getEnvironment(), + tableName, true), true); + } } catch (TableNotFoundException e) { LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); continue; } - eth.process(); } } } @@ -2620,7 +2640,7 @@ public class AssignmentManager { } int numReplicas = 1; try { - numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()). + numReplicas = server.getTableDescriptors().get(mergedHri.getTable()). getRegionReplication(); } catch (IOException e) { LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() + @@ -2649,7 +2669,7 @@ public class AssignmentManager { // the replica1s of daughters will be on the same machine int numReplicas = 1; try { - numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()). + numReplicas = server.getTableDescriptors().get(parentHri.getTable()). getRegionReplication(); } catch (IOException e) { LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java deleted file mode 100644 index ee77419..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkAssigner; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.htrace.Trace; - -/** - * Handler to run disable of a table. - */ -@InterfaceAudience.Private -public class DisableTableHandler extends EventHandler { - private static final Log LOG = LogFactory.getLog(DisableTableHandler.class); - private final TableName tableName; - private final AssignmentManager assignmentManager; - private final TableLockManager tableLockManager; - private final boolean skipTableStateCheck; - private TableLock tableLock; - - public DisableTableHandler(Server server, TableName tableName, - AssignmentManager assignmentManager, TableLockManager tableLockManager, - boolean skipTableStateCheck) { - super(server, EventType.C_M_DISABLE_TABLE); - this.tableName = tableName; - this.assignmentManager = assignmentManager; - this.tableLockManager = tableLockManager; - this.skipTableStateCheck = skipTableStateCheck; - } - - public DisableTableHandler prepare() - throws TableNotFoundException, TableNotEnabledException, IOException { - if(tableName.equals(TableName.META_TABLE_NAME)) { - throw new ConstraintException("Cannot disable catalog table"); - } - //acquire the table write lock, blocking - this.tableLock = this.tableLockManager.writeLock(tableName, - EventType.C_M_DISABLE_TABLE.toString()); - this.tableLock.acquire(); - - boolean success = false; - try { - // Check if table exists - if (!MetaTableAccessor.tableExists(this.server.getConnection(), tableName)) { - throw new TableNotFoundException(tableName); - } - - // There could be multiple client requests trying to disable or enable - // the table at the same time. Ensure only the first request is honored - // After that, no other requests can be accepted until the table reaches - // DISABLED or ENABLED. - //TODO: reevaluate this since we have table locks now - if (!skipTableStateCheck) { - if (!this.assignmentManager.getTableStateManager().setTableStateIfInStates( - this.tableName, TableState.State.DISABLING, - TableState.State.ENABLED)) { - LOG.info("Table " + tableName + " isn't enabled; skipping disable"); - throw new TableNotEnabledException(this.tableName); - } - } - success = true; - } finally { - if (!success) { - releaseTableLock(); - } - } - - return this; - } - - @Override - public String toString() { - String name = "UnknownServerName"; - if(server != null && server.getServerName() != null) { - name = server.getServerName().toString(); - } - return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" + - tableName; - } - - @Override - public void process() { - try { - LOG.info("Attempting to disable table " + this.tableName); - MasterCoprocessorHost cpHost = ((HMaster) this.server) - .getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.preDisableTableHandler(this.tableName); - } - handleDisableTable(); - if (cpHost != null) { - cpHost.postDisableTableHandler(this.tableName); - } - } catch (IOException e) { - LOG.error("Error trying to disable table " + this.tableName, e); - } finally { - releaseTableLock(); - } - } - - private void releaseTableLock() { - if (this.tableLock != null) { - try { - this.tableLock.release(); - } catch (IOException ex) { - LOG.warn("Could not release the table lock", ex); - } - } - } - - private void handleDisableTable() throws IOException { - // Set table disabling flag up in zk. - this.assignmentManager.getTableStateManager().setTableState(this.tableName, - TableState.State.DISABLING); - boolean done = false; - while (true) { - // Get list of online regions that are of this table. Regions that are - // already closed will not be included in this list; i.e. the returned - // list is not ALL regions in a table, its all online regions according - // to the in-memory state on this master. - final List regions = this.assignmentManager - .getRegionStates().getRegionsOfTable(tableName); - if (regions.size() == 0) { - done = true; - break; - } - LOG.info("Offlining " + regions.size() + " regions."); - BulkDisabler bd = new BulkDisabler(this.server, regions); - try { - if (bd.bulkAssign()) { - done = true; - break; - } - } catch (InterruptedException e) { - LOG.warn("Disable was interrupted"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - break; - } - } - // Flip the table to disabled if success. - if (done) this.assignmentManager.getTableStateManager().setTableState(this.tableName, - TableState.State.DISABLED); - LOG.info("Disabled table, " + this.tableName + ", is done=" + done); - } - - /** - * Run bulk disable. - */ - class BulkDisabler extends BulkAssigner { - private final List regions; - - BulkDisabler(final Server server, final List regions) { - super(server); - this.regions = regions; - } - - @Override - protected void populatePool(ExecutorService pool) { - RegionStates regionStates = assignmentManager.getRegionStates(); - for (HRegionInfo region: regions) { - if (regionStates.isRegionInTransition(region) - && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) { - continue; - } - final HRegionInfo hri = region; - pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler",new Runnable() { - public void run() { - assignmentManager.unassign(hri); - } - })); - } - } - - @Override - protected boolean waitUntilDone(long timeout) - throws InterruptedException { - long startTime = System.currentTimeMillis(); - long remaining = timeout; - List regions = null; - long lastLogTime = startTime; - while (!server.isStopped() && remaining > 0) { - Thread.sleep(waitingTimeForEvents); - regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName); - long now = System.currentTimeMillis(); - // Don't log more than once every ten seconds. Its obnoxious. And only log table regions - // if we are waiting a while for them to go down... - if (LOG.isDebugEnabled() && ((now - lastLogTime) > 10000)) { - lastLogTime = now; - LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions); - } - if (regions.isEmpty()) break; - remaining = timeout - (now - startTime); - } - return regions != null && regions.isEmpty(); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java deleted file mode 100644 index c7145fd..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotDisabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkAssigner; -import org.apache.hadoop.hbase.master.GeneralBulkAssigner; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; - -/** - * Handler to run enable of a table. - */ -@InterfaceAudience.Private -public class EnableTableHandler extends EventHandler { - private static final Log LOG = LogFactory.getLog(EnableTableHandler.class); - private final TableName tableName; - private final AssignmentManager assignmentManager; - private final TableLockManager tableLockManager; - private boolean skipTableStateCheck = false; - private TableLock tableLock; - private MasterServices services; - - public EnableTableHandler(Server server, TableName tableName, - AssignmentManager assignmentManager, TableLockManager tableLockManager, - boolean skipTableStateCheck) { - super(server, EventType.C_M_ENABLE_TABLE); - this.tableName = tableName; - this.assignmentManager = assignmentManager; - this.tableLockManager = tableLockManager; - this.skipTableStateCheck = skipTableStateCheck; - } - - public EnableTableHandler(MasterServices services, TableName tableName, - AssignmentManager assignmentManager, - TableLockManager tableLockManager, boolean skipTableStateCheck) { - this((Server)services, tableName, assignmentManager, tableLockManager, - skipTableStateCheck); - this.services = services; - } - - public EnableTableHandler prepare() - throws TableNotFoundException, TableNotDisabledException, IOException { - //acquire the table write lock, blocking - this.tableLock = this.tableLockManager.writeLock(tableName, - EventType.C_M_ENABLE_TABLE.toString()); - this.tableLock.acquire(); - - boolean success = false; - try { - // Check if table exists - if (!MetaTableAccessor.tableExists(this.server.getConnection(), tableName)) { - throw new TableNotFoundException(tableName); - } - - // There could be multiple client requests trying to disable or enable - // the table at the same time. Ensure only the first request is honored - // After that, no other requests can be accepted until the table reaches - // DISABLED or ENABLED. - if (!skipTableStateCheck) { - if (!this.assignmentManager.getTableStateManager().setTableStateIfInStates( - this.tableName, TableState.State.ENABLING, - TableState.State.DISABLED)) { - LOG.info("Table " + tableName + " isn't disabled; skipping enable"); - throw new TableNotDisabledException(this.tableName); - } - } - success = true; - } finally { - if (!success) { - releaseTableLock(); - } - } - return this; - } - - @Override - public String toString() { - String name = "UnknownServerName"; - if(server != null && server.getServerName() != null) { - name = server.getServerName().toString(); - } - return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" + - tableName; - } - - @Override - public void process() { - try { - LOG.info("Attempting to enable the table " + this.tableName); - MasterCoprocessorHost cpHost = ((HMaster) this.server) - .getMasterCoprocessorHost(); - if (cpHost != null) { - cpHost.preEnableTableHandler(this.tableName); - } - handleEnableTable(); - if (cpHost != null) { - cpHost.postEnableTableHandler(this.tableName); - } - } catch (IOException | InterruptedException e) { - LOG.error("Error trying to enable the table " + this.tableName, e); - } finally { - releaseTableLock(); - } - } - - private void releaseTableLock() { - if (this.tableLock != null) { - try { - this.tableLock.release(); - } catch (IOException ex) { - LOG.warn("Could not release the table lock", ex); - } - } - } - - private void handleEnableTable() throws IOException, - InterruptedException { - // I could check table is disabling and if so, not enable but require - // that user first finish disabling but that might be obnoxious. - - this.assignmentManager.getTableStateManager().setTableState(this.tableName, - TableState.State.ENABLING); - boolean done = false; - ServerManager serverManager = ((HMaster)this.server).getServerManager(); - // Get the regions of this table. We're done when all listed - // tables are onlined. - List> tableRegionsAndLocations; - if (TableName.META_TABLE_NAME.equals(tableName)) { - tableRegionsAndLocations = new MetaTableLocator().getMetaRegionsAndLocations( - server.getZooKeeper()); - } else { - tableRegionsAndLocations = MetaTableAccessor.getTableRegionsAndLocations( - server.getConnection(), tableName, true); - } - - int countOfRegionsInTable = tableRegionsAndLocations.size(); - Map regionsToAssign = - regionsToAssignWithServerName(tableRegionsAndLocations); - if (services != null) { - // need to potentially create some regions for the replicas - List unrecordedReplicas = AssignmentManager.replicaRegionsNotRecordedInMeta( - new HashSet(regionsToAssign.keySet()), services); - Map> srvToUnassignedRegs = - this.assignmentManager.getBalancer().roundRobinAssignment(unrecordedReplicas, - serverManager.getOnlineServersList()); - if (srvToUnassignedRegs != null) { - for (Map.Entry> entry : srvToUnassignedRegs.entrySet()) { - for (HRegionInfo h : entry.getValue()) { - regionsToAssign.put(h, entry.getKey()); - } - } - } - } - int regionsCount = regionsToAssign.size(); - if (regionsCount == 0) { - done = true; - } - LOG.info("Table '" + this.tableName + "' has " + countOfRegionsInTable - + " regions, of which " + regionsCount + " are offline."); - List onlineServers = serverManager.createDestinationServersList(); - Map> bulkPlan = - this.assignmentManager.getBalancer().retainAssignment(regionsToAssign, onlineServers); - if (bulkPlan != null) { - LOG.info("Bulk assigning " + regionsCount + " region(s) across " + bulkPlan.size() - + " server(s), retainAssignment=true"); - - BulkAssigner ba = - new GeneralBulkAssigner(this.server, bulkPlan, this.assignmentManager, true); - try { - if (ba.bulkAssign()) { - done = true; - } - } catch (InterruptedException e) { - LOG.warn("Enable operation was interrupted when enabling table '" - + this.tableName + "'"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - } - } else { - LOG.info("Balancer was unable to find suitable servers for table " + tableName - + ", leaving unassigned"); - done = true; - } - if (done) { - // Flip the table to enabled. - this.assignmentManager.getTableStateManager().setTableState( - this.tableName, TableState.State.ENABLED); - LOG.info("Table '" + this.tableName - + "' was successfully enabled. Status: done=" + done); - } else { - LOG.warn("Table '" + this.tableName - + "' wasn't successfully enabled. Status: done=" + done); - } - } - - /** - * @param regionsInMeta - * @return List of regions neither in transition nor assigned. - * @throws IOException - */ - private Map regionsToAssignWithServerName( - final List> regionsInMeta) throws IOException { - Map regionsToAssign = - new HashMap(regionsInMeta.size()); - RegionStates regionStates = this.assignmentManager.getRegionStates(); - for (Pair regionLocation : regionsInMeta) { - HRegionInfo hri = regionLocation.getFirst(); - ServerName sn = regionLocation.getSecond(); - if (regionStates.isRegionOffline(hri)) { - regionsToAssign.put(hri, sn); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping assign for the region " + hri + " during enable table " - + hri.getTable() + " because its already in tranition or assigned."); - } - } - } - return regionsToAssign; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java index af9eecf..ad83978 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java @@ -67,7 +67,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { * server that was carrying meta should rise to the top of the queue (this is how it used to * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers * that were carrying system tables on crash; do I need to have these servers have priority? - * + * *

Apart from the special-casing of meta and system tables, fairq is what we want */ private final ProcedureFairRunQueues serverFairQ; @@ -159,6 +159,16 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { return rq.poll(); } + public Long peek(final TableName table) { + lock.lock(); + try { + TableRunQueue queue = getRunQueue(table); + return queue != null ? queue.peek() : null; + } finally { + lock.unlock(); + } + } + @Override public void signalAll() { lock.lock(); @@ -390,6 +400,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { void addFront(Procedure proc); void addBack(Procedure proc); Long poll(); + Long peek(); boolean isLocked(); } @@ -443,6 +454,11 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { } @Override + public Long peek() { + return this.runnables.peek(); + } + + @Override public synchronized boolean isLocked() { return isExclusiveLock() || sharedLock > 0; } @@ -477,7 +493,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet { public synchronized void releaseExclusiveLock() { exclusiveLock = false; } - + @Override public String toString() { return this.runnables.toString(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java index 1eb0073..df17a55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -67,7 +67,12 @@ public final class ProcedureSyncWait { public static byte[] submitAndWaitProcedure(ProcedureExecutor procExec, final Procedure proc) throws IOException { - long procId = procExec.submitProcedure(proc); + return submitAndWaitProcedure(procExec, proc, false); + } + + public static byte[] submitAndWaitProcedure(ProcedureExecutor procExec, + final Procedure proc, final boolean addToFront) throws IOException { + long procId = procExec.submitProcedure(proc, addToFront); return waitForProcedureToComplete(procExec, procId); } @@ -94,6 +99,26 @@ public final class ProcedureSyncWait { } } + public static byte[] waitForProcedureToComplete(ProcedureExecutor procExec, + final TableName table, final Class procType) throws IOException { + MasterProcedureEnv env = procExec.getEnvironment(); + // look if the procedure is the head of the queue + Long procId = env.getProcedureQueue().peek(table); + if (procId != null) { + Procedure proc = procExec.getProcedure(procId); + if (proc.getClass().isAssignableFrom(procType)) { + return waitForProcedureToComplete(procExec, proc.getProcId()); + } + } + // look into active procedures + for (Procedure proc: procExec.getActiveProcedures()) { + if (proc.getClass().isAssignableFrom(procType)) { + return waitForProcedureToComplete(procExec, proc.getProcId()); + } + } + return null; + } + public static T waitFor(MasterProcedureEnv env, String purpose, Predicate predicate) throws IOException { final Configuration conf = env.getMasterConfiguration(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestEnableTableHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestEnableTableHandler.java deleted file mode 100644 index f5c8b90..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestEnableTableHandler.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.util.ArrayList; -import java.util.List; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@Category({ MasterTests.class, MediumTests.class }) -public class TestEnableTableHandler { - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final Log LOG = LogFactory.getLog(TestEnableTableHandler.class); - private static final byte[] FAMILYNAME = Bytes.toBytes("fam"); - - @Before - public void setUp() throws Exception { - TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, - MasterSyncObserver.class.getName()); - TEST_UTIL.startMiniCluster(1); - } - - @After - public void tearDown() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test(timeout = 300000) - public void testEnableTableWithNoRegionServers() throws Exception { - final TableName tableName = TableName.valueOf("testEnableTableWithNoRegionServers"); - final MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); - final HMaster m = cluster.getMaster(); - final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); - final HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(FAMILYNAME)); - admin.createTable(desc); - admin.disableTable(tableName); - TEST_UTIL.waitTableDisabled(tableName.getName()); - - admin.enableTable(tableName); - TEST_UTIL.waitTableEnabled(tableName); - // disable once more - admin.disableTable(tableName); - - TEST_UTIL.waitUntilNoRegionsInTransition(60000); - // now stop region servers - JVMClusterUtil.RegionServerThread rs = cluster.getRegionServerThreads().get(0); - rs.getRegionServer().stop("stop"); - cluster.waitForRegionServerToStop(rs.getRegionServer().getServerName(), 10000); - - LOG.debug("Now enabling table " + tableName); - - admin.enableTable(tableName); - assertTrue(admin.isTableEnabled(tableName)); - - JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer(); - cluster.waitForRegionServerToStart(rs2.getRegionServer().getServerName().getHostname(), - rs2.getRegionServer().getServerName().getPort(), 60000); - - List regions = TEST_UTIL.getHBaseAdmin().getTableRegions(tableName); - assertEquals(1, regions.size()); - for (HRegionInfo region : regions) { - TEST_UTIL.getHBaseAdmin().assign(region.getEncodedNameAsBytes()); - } - LOG.debug("Waiting for table assigned " + tableName); - TEST_UTIL.waitUntilAllRegionsAssigned(tableName); - List onlineRegions = admin.getOnlineRegions( - rs2.getRegionServer().getServerName()); - ArrayList tableRegions = filterTableRegions(tableName, onlineRegions); - assertEquals(1, tableRegions.size()); - } - - private ArrayList filterTableRegions(final TableName tableName, - List onlineRegions) { - return Lists.newArrayList(Iterables.filter(onlineRegions, new Predicate() { - @Override - public boolean apply(HRegionInfo input) { - return input.getTable().equals(tableName); - } - })); - } - - /** - * We were only clearing rows that had a hregioninfo column in hbase:meta. Mangled rows that - * were missing the hregioninfo because of error were being left behind messing up any - * subsequent table made with the same name. HBASE-12980 - * @throws IOException - * @throws InterruptedException - */ - @Test(timeout=60000) - public void testDeleteForSureClearsAllTableRowsFromMeta() - throws IOException, InterruptedException { - final TableName tableName = TableName.valueOf("testDeleteForSureClearsAllTableRowsFromMeta"); - final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); - final HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(FAMILYNAME)); - try { - createTable(TEST_UTIL, desc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - } catch (Exception e) { - e.printStackTrace(); - fail("Got an exception while creating " + tableName); - } - // Now I have a nice table, mangle it by removing the HConstants.REGIONINFO_QUALIFIER_STR - // content from a few of the rows. - try (Table metaTable = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { - try (ResultScanner scanner = - metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { - for (Result result : scanner) { - // Just delete one row. - Delete d = new Delete(result.getRow()); - d.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); - LOG.info("Mangled: " + d); - metaTable.delete(d); - break; - } - } - admin.disableTable(tableName); - TEST_UTIL.waitTableDisabled(tableName.getName()); - // Rely on the coprocessor based latch to make the operation synchronous. - try { - deleteTable(TEST_UTIL, tableName); - } catch (Exception e) { - e.printStackTrace(); - fail("Got an exception while deleting " + tableName); - } - int rowCount = 0; - try (ResultScanner scanner = - metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) { - for (Result result : scanner) { - LOG.info("Found when none expected: " + result); - rowCount++; - } - } - assertEquals(0, rowCount); - } - } - - public static class MasterSyncObserver extends BaseMasterObserver { - volatile CountDownLatch tableCreationLatch = null; - volatile CountDownLatch tableDeletionLatch = null; - - @Override - public void postCreateTableHandler(final ObserverContext ctx, - HTableDescriptor desc, HRegionInfo[] regions) throws IOException { - // the AccessController test, some times calls only and directly the postCreateTableHandler() - if (tableCreationLatch != null) { - tableCreationLatch.countDown(); - } - } - - @Override - public void postDeleteTableHandler(final ObserverContext ctx, - TableName tableName) - throws IOException { - // the AccessController test, some times calls only and directly the postDeleteTableHandler() - if (tableDeletionLatch != null) { - tableDeletionLatch.countDown(); - } - } - } - - public static void createTable(HBaseTestingUtility testUtil, HTableDescriptor htd, - byte [][] splitKeys) - throws Exception { - createTable(testUtil, testUtil.getHBaseAdmin(), htd, splitKeys); - } - - public static void createTable(HBaseTestingUtility testUtil, HBaseAdmin admin, - HTableDescriptor htd, byte [][] splitKeys) - throws Exception { - // NOTE: We need a latch because admin is not sync, - // so the postOp coprocessor method may be called after the admin operation returned. - MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster() - .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName()); - observer.tableCreationLatch = new CountDownLatch(1); - if (splitKeys != null) { - admin.createTable(htd, splitKeys); - } else { - admin.createTable(htd); - } - observer.tableCreationLatch.await(); - observer.tableCreationLatch = null; - testUtil.waitUntilAllRegionsAssigned(htd.getTableName()); - } - - public static void deleteTable(HBaseTestingUtility testUtil, TableName tableName) - throws Exception { - deleteTable(testUtil, testUtil.getHBaseAdmin(), tableName); - } - - public static void deleteTable(HBaseTestingUtility testUtil, HBaseAdmin admin, - TableName tableName) - throws Exception { - // NOTE: We need a latch because admin is not sync, - // so the postOp coprocessor method may be called after the admin operation returned. - MasterSyncObserver observer = (MasterSyncObserver)testUtil.getHBaseCluster().getMaster() - .getMasterCoprocessorHost().findCoprocessor(MasterSyncObserver.class.getName()); - observer.tableDeletionLatch = new CountDownLatch(1); - try { - admin.disableTable(tableName); - } catch (Exception e) { - LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); - } - admin.deleteTable(tableName); - observer.tableDeletionLatch.await(); - observer.tableDeletionLatch = null; - } -}