Index: src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java (revision 997548) +++ src/test/java/org/apache/hadoop/hbase/master/TestZKBasedReopenRegion.java (working copy) @@ -1,268 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.ProcessRegionClose; -import org.apache.hadoop.hbase.master.ProcessRegionOpen; -import org.apache.hadoop.hbase.master.RegionServerOperation; -import org.apache.hadoop.hbase.master.RegionServerOperationListener; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.Writables; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestZKBasedReopenRegion { - private static final Log LOG = LogFactory.getLog(TestZKBasedReopenRegion.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final String TABLENAME = "master_transitions"; - private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"), - Bytes.toBytes("b"), Bytes.toBytes("c")}; - - @BeforeClass public static void beforeAllTests() throws Exception { - Configuration c = TEST_UTIL.getConfiguration(); - c.setBoolean("dfs.support.append", true); - c.setInt("hbase.regionserver.info.port", 0); - c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000); - TEST_UTIL.startMiniCluster(2); - TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); - HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); - int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); - waitUntilAllRegionsAssigned(countOfRegions); - addToEachStartKey(countOfRegions); - } - - @AfterClass public static void afterAllTests() throws IOException { - TEST_UTIL.shutdownMiniCluster(); - } - - @Before public void setup() throws IOException { - if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) { - // Need at least two servers. - LOG.info("Started new server=" + - TEST_UTIL.getHBaseCluster().startRegionServer()); - - } - } - - @Test (timeout=300000) public void testOpenRegion() - throws Exception { - MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); - LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size()); - - int rsIdx = 0; - HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx); - Collection regions = regionServer.getOnlineRegions(); - HRegion region = regions.iterator().next(); - LOG.debug("Asking RS to close region " + region.getRegionNameAsString()); - - AtomicBoolean closeEventProcessed = new AtomicBoolean(false); - AtomicBoolean reopenEventProcessed = new AtomicBoolean(false); - RegionServerOperationListener listener = - new ReopenRegionEventListener(region.getRegionNameAsString(), - closeEventProcessed, - reopenEventProcessed); - HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); - master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener); - HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, - region.getRegionInfo(), - Bytes.toBytes("Forcing close in test") - ); - TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg); - - synchronized(closeEventProcessed) { - closeEventProcessed.wait(3*60*1000); - } - if(!closeEventProcessed.get()) { - throw new Exception("Timed out, close event not called on master."); - } - - synchronized(reopenEventProcessed) { - reopenEventProcessed.wait(3*60*1000); - } - if(!reopenEventProcessed.get()) { - throw new Exception("Timed out, open event not called on master after region close."); - } - - LOG.info("Done with test, RS informed master successfully."); - } - - public static class ReopenRegionEventListener implements RegionServerOperationListener { - - private static final Log LOG = LogFactory.getLog(ReopenRegionEventListener.class); - String regionToClose; - AtomicBoolean closeEventProcessed; - AtomicBoolean reopenEventProcessed; - - public ReopenRegionEventListener(String regionToClose, - AtomicBoolean closeEventProcessed, - AtomicBoolean reopenEventProcessed) { - this.regionToClose = regionToClose; - this.closeEventProcessed = closeEventProcessed; - this.reopenEventProcessed = reopenEventProcessed; - } - - @Override - public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { - return true; - } - - @Override - public boolean process(RegionServerOperation op) throws IOException { - return true; - } - - @Override - public void processed(RegionServerOperation op) { - LOG.debug("Master processing object: " + op.getClass().getCanonicalName()); - if(op instanceof ProcessRegionClose) { - ProcessRegionClose regionCloseOp = (ProcessRegionClose)op; - String region = regionCloseOp.getRegionInfo().getRegionNameAsString(); - LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose); - if(regionToClose.equals(region)) { - closeEventProcessed.set(true); - } - synchronized(closeEventProcessed) { - closeEventProcessed.notifyAll(); - } - } - // Wait for open event AFTER we have closed the region - if(closeEventProcessed.get()) { - if(op instanceof ProcessRegionOpen) { - ProcessRegionOpen regionOpenOp = (ProcessRegionOpen)op; - String region = regionOpenOp.getRegionInfo().getRegionNameAsString(); - LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose); - if(regionToClose.equals(region)) { - reopenEventProcessed.set(true); - } - synchronized(reopenEventProcessed) { - reopenEventProcessed.notifyAll(); - } - } - } - - } - - } - - - private static void waitUntilAllRegionsAssigned(final int countOfRegions) - throws IOException { - HTable meta = new HTable(TEST_UTIL.getConfiguration(), - HConstants.META_TABLE_NAME); - while (true) { - int rows = 0; - Scan scan = new Scan(); - scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - ResultScanner s = meta.getScanner(scan); - for (Result r = null; (r = s.next()) != null;) { - byte [] b = - r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - if (b == null || b.length <= 0) break; - rows++; - } - s.close(); - // If I get to here and all rows have a Server, then all have been assigned. - if (rows == countOfRegions) break; - LOG.info("Found=" + rows); - Threads.sleep(1000); - } - } - - /* - * Add to each of the regions in .META. a value. Key is the startrow of the - * region (except its 'aaa' for first region). Actual value is the row name. - * @param expected - * @return - * @throws IOException - */ - private static int addToEachStartKey(final int expected) throws IOException { - HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); - HTable meta = new HTable(TEST_UTIL.getConfiguration(), - HConstants.META_TABLE_NAME); - int rows = 0; - Scan scan = new Scan(); - scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); - ResultScanner s = meta.getScanner(scan); - for (Result r = null; (r = s.next()) != null;) { - byte [] b = - r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); - if (b == null || b.length <= 0) break; - HRegionInfo hri = Writables.getHRegionInfo(b); - // If start key, add 'aaa'. - byte [] row = getStartKey(hri); - Put p = new Put(row); - p.add(getTestFamily(), getTestQualifier(), row); - t.put(p); - rows++; - } - s.close(); - Assert.assertEquals(expected, rows); - return rows; - } - - private static byte [] getStartKey(final HRegionInfo hri) { - return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())? - Bytes.toBytes("aaa"): hri.getStartKey(); - } - - private static byte [] getTestFamily() { - return FAMILIES[0]; - } - - private static byte [] getTestQualifier() { - return getTestFamily(); - } - - public static void main(String args[]) throws Exception { - TestZKBasedReopenRegion.beforeAllTests(); - - TestZKBasedReopenRegion test = new TestZKBasedReopenRegion(); - test.setup(); - test.testOpenRegion(); - - TestZKBasedReopenRegion.afterAllTests(); - } -} Index: src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (revision 997548) +++ src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (working copy) @@ -1,90 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestRestartCluster { - private static final Log LOG = LogFactory.getLog(TestRestartCluster.class); - private static Configuration conf; - private static HBaseTestingUtility utility; - private static ZooKeeperWrapper zkWrapper; - private static final byte[] TABLENAME = Bytes.toBytes("master_transitions"); - private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a")}; - - @BeforeClass public static void beforeAllTests() throws Exception { - conf = HBaseConfiguration.create(); - utility = new HBaseTestingUtility(conf); - } - - @AfterClass public static void afterAllTests() throws IOException { - utility.shutdownMiniCluster(); - } - - @Before public void setup() throws IOException { - } - - @Test (timeout=300000) public void testRestartClusterAfterKill()throws Exception { - utility.startMiniZKCluster(); - zkWrapper = ZooKeeperWrapper.createInstance(conf, "cluster1"); - - // create the unassigned region, throw up a region opened state for META - String unassignedZNode = zkWrapper.getRegionInTransitionZNode(); - zkWrapper.createZNodeIfNotExists(unassignedZNode); - byte[] data = null; - HBaseEventType hbEventType = HBaseEventType.RS2ZK_REGION_OPENED; - try { - data = Writables.getBytes(new RegionTransitionEventData(hbEventType, HMaster.MASTER)); - } catch (IOException e) { - LOG.error("Error creating event data for " + hbEventType, e); - } - zkWrapper.createOrUpdateUnassignedRegion( - HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data); - zkWrapper.createOrUpdateUnassignedRegion( - HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), data); - LOG.debug("Created UNASSIGNED zNode for ROOT and META regions in state " + HBaseEventType.M2ZK_REGION_OFFLINE); - - // start the HB cluster - LOG.info("Starting HBase cluster..."); - utility.startMiniCluster(2); - - utility.createTable(TABLENAME, FAMILIES); - LOG.info("Created a table, waiting for table to be available..."); - utility.waitTableAvailable(TABLENAME, 60*1000); - - LOG.info("Master deleted unassgined region and started up successfully."); - } -} Index: src/test/java/org/apache/hadoop/hbase/master/TestMaster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestMaster.java (revision 997548) +++ src/test/java/org/apache/hadoop/hbase/master/TestMaster.java (working copy) @@ -30,9 +30,6 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.executor.HBaseEventHandler; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventHandlerListener; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -89,7 +86,7 @@ CountDownLatch aboutToOpen = new CountDownLatch(1); CountDownLatch proceed = new CountDownLatch(1); RegionOpenListener list = new RegionOpenListener(aboutToOpen, proceed); - HBaseEventHandler.registerListener(list); + m.getRegionServerOperationQueue().registerRegionServerOperationListener(list); LOG.info("Splitting table"); admin.split(TABLENAME); @@ -112,7 +109,7 @@ } } - static class RegionOpenListener implements HBaseEventHandlerListener { + static class RegionOpenListener implements RegionServerOperationListener { CountDownLatch aboutToOpen, proceed; public RegionOpenListener(CountDownLatch aboutToOpen, CountDownLatch proceed) @@ -122,9 +119,9 @@ } @Override - public void afterProcess(HBaseEventHandler event) { - if (event.getHBEvent() != HBaseEventType.RS2ZK_REGION_OPENED) { - return; + public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { + if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_OPEN)) { + return true; } try { aboutToOpen.countDown(); @@ -132,12 +129,17 @@ } catch (InterruptedException ie) { throw new RuntimeException(ie); } - return; + return true; } @Override - public void beforeProcess(HBaseEventHandler event) { + public boolean process(RegionServerOperation op) throws IOException { + return true; } + + @Override + public void processed(RegionServerOperation op) { + } } } Index: src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java (revision 997548) +++ src/test/java/org/apache/hadoop/hbase/master/TestZKBasedCloseRegion.java (working copy) @@ -1,241 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.ProcessRegionClose; -import org.apache.hadoop.hbase.master.RegionServerOperation; -import org.apache.hadoop.hbase.master.RegionServerOperationListener; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.Writables; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestZKBasedCloseRegion { - private static final Log LOG = LogFactory.getLog(TestZKBasedCloseRegion.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final String TABLENAME = "master_transitions"; - private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"), - Bytes.toBytes("b"), Bytes.toBytes("c")}; - - @BeforeClass public static void beforeAllTests() throws Exception { - Configuration c = TEST_UTIL.getConfiguration(); - c.setBoolean("dfs.support.append", true); - c.setInt("hbase.regionserver.info.port", 0); - c.setInt("hbase.master.meta.thread.rescanfrequency", 5*1000); - TEST_UTIL.startMiniCluster(2); - TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES); - HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); - int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily()); - waitUntilAllRegionsAssigned(countOfRegions); - addToEachStartKey(countOfRegions); - } - - @AfterClass public static void afterAllTests() throws IOException { - TEST_UTIL.shutdownMiniCluster(); - } - - @Before public void setup() throws IOException { - if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) { - // Need at least two servers. - LOG.info("Started new server=" + - TEST_UTIL.getHBaseCluster().startRegionServer()); - - } - } - - @Test (timeout=300000) public void testCloseRegion() - throws Exception { - LOG.info("Running testCloseRegion"); - MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); - LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size()); - - int rsIdx = 0; - HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx); - Collection regions = regionServer.getOnlineRegions(); - HRegion region = regions.iterator().next(); - LOG.debug("Asking RS to close region " + region.getRegionNameAsString()); - - AtomicBoolean closeEventProcessed = new AtomicBoolean(false); - RegionServerOperationListener listener = - new CloseRegionEventListener(region.getRegionNameAsString(), closeEventProcessed); - HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); - master.getRegionServerOperationQueue().registerRegionServerOperationListener(listener); - HMsg closeRegionMsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, - region.getRegionInfo(), - Bytes.toBytes("Forcing close in test") - ); - TEST_UTIL.getHBaseCluster().addMessageToSendRegionServer(rsIdx, closeRegionMsg); - - synchronized(closeEventProcessed) { - // wait for 3 minutes - closeEventProcessed.wait(3*60*1000); - } - if(!closeEventProcessed.get()) { - throw new Exception("Timed out, close event not called on master."); - } - else { - LOG.info("Done with test, RS informed master successfully."); - } - } - - public static class CloseRegionEventListener implements RegionServerOperationListener { - - private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class); - String regionToClose; - AtomicBoolean closeEventProcessed; - - public CloseRegionEventListener(String regionToClose, AtomicBoolean closeEventProcessed) { - this.regionToClose = regionToClose; - this.closeEventProcessed = closeEventProcessed; - } - - @Override - public boolean process(HServerInfo serverInfo, HMsg incomingMsg) { - return true; - } - - @Override - public boolean process(RegionServerOperation op) throws IOException { - return true; - } - - @Override - public void processed(RegionServerOperation op) { - LOG.debug("Master processing object: " + op.getClass().getCanonicalName()); - if(op instanceof ProcessRegionClose) { - ProcessRegionClose regionCloseOp = (ProcessRegionClose)op; - String region = regionCloseOp.getRegionInfo().getRegionNameAsString(); - LOG.debug("Finished closing region " + region + ", expected to close region " + regionToClose); - if(regionToClose.equals(region)) { - closeEventProcessed.set(true); - } - synchronized(closeEventProcessed) { - closeEventProcessed.notifyAll(); - } - } - } - - } - - - private static void waitUntilAllRegionsAssigned(final int countOfRegions) - throws IOException { - HTable meta = new HTable(TEST_UTIL.getConfiguration(), - HConstants.META_TABLE_NAME); - while (true) { - int rows = 0; - Scan scan = new Scan(); - scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - ResultScanner s = meta.getScanner(scan); - for (Result r = null; (r = s.next()) != null;) { - byte [] b = - r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - if (b == null || b.length <= 0) break; - rows++; - } - s.close(); - // If I get to here and all rows have a Server, then all have been assigned. - if (rows == countOfRegions) break; - LOG.info("Found=" + rows); - Threads.sleep(1000); - } - } - - /* - * Add to each of the regions in .META. a value. Key is the startrow of the - * region (except its 'aaa' for first region). Actual value is the row name. - * @param expected - * @return - * @throws IOException - */ - private static int addToEachStartKey(final int expected) throws IOException { - HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); - HTable meta = new HTable(TEST_UTIL.getConfiguration(), - HConstants.META_TABLE_NAME); - int rows = 0; - Scan scan = new Scan(); - scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); - ResultScanner s = meta.getScanner(scan); - for (Result r = null; (r = s.next()) != null;) { - byte [] b = - r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); - if (b == null || b.length <= 0) break; - HRegionInfo hri = Writables.getHRegionInfo(b); - // If start key, add 'aaa'. - byte [] row = getStartKey(hri); - Put p = new Put(row); - p.add(getTestFamily(), getTestQualifier(), row); - t.put(p); - rows++; - } - s.close(); - Assert.assertEquals(expected, rows); - return rows; - } - - private static byte [] getStartKey(final HRegionInfo hri) { - return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())? - Bytes.toBytes("aaa"): hri.getStartKey(); - } - - private static byte [] getTestFamily() { - return FAMILIES[0]; - } - - private static byte [] getTestQualifier() { - return getTestFamily(); - } - - public static void main(String args[]) throws Exception { - TestZKBasedCloseRegion.beforeAllTests(); - - TestZKBasedCloseRegion test = new TestZKBasedCloseRegion(); - test.setup(); - test.testCloseRegion(); - - TestZKBasedCloseRegion.afterAllTests(); - } -} Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (working copy) @@ -25,7 +25,6 @@ import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -125,14 +123,6 @@ * State of the cluster - if up and running or shutting down */ public final String clusterStateZNode; - /* - * Regions that are in transition - */ - private final String rgnsInTransitZNode; - /* - * List of ZNodes in the unassgined region that are already being watched - */ - private Set unassignedZNodesWatched = new HashSet(); private List listeners = new ArrayList(); @@ -196,7 +186,6 @@ rootRegionZNode = getZNode(parentZNode, rootServerZNodeName); rsZNode = getZNode(parentZNode, rsZNodeName); - rgnsInTransitZNode = getZNode(parentZNode, regionsInTransitZNodeName); masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName); clusterStateZNode = getZNode(parentZNode, stateZNodeName); } @@ -402,9 +391,9 @@ * Watch the state of the cluster, up or down * @param watcher Watcher to set on cluster state node */ - public void setClusterStateWatch() { + public void setClusterStateWatch(Watcher watcher) { try { - zooKeeper.exists(clusterStateZNode, this); + zooKeeper.exists(clusterStateZNode, watcher == null ? this : watcher); } catch (InterruptedException e) { LOG.warn("<" + instanceName + ">" + "Failed to check on ZNode " + clusterStateZNode, e); } catch (KeeperException e) { @@ -975,14 +964,6 @@ } /** - * Get the znode that has all the regions in transition. - * @return path to znode - */ - public String getRegionInTransitionZNode() { - return this.rgnsInTransitZNode; - } - - /** * Get the path of this region server's znode * @return path to znode */ @@ -1080,192 +1061,7 @@ } } - /** - * Given a region name and some data, this method creates a new the region - * znode data under the UNASSGINED znode with the data passed in. This method - * will not update data for existing znodes. - * - * @param regionName - encoded name of the region - * @param data - new serialized data to update the region znode - */ - private void createUnassignedRegion(String regionName, byte[] data) { - String znode = getZNode(getRegionInTransitionZNode(), regionName); - if(LOG.isDebugEnabled()) { - // check if this node already exists - - // - it should not exist - // - if it does, it should be in the CLOSED state - if(exists(znode, true)) { - Stat stat = new Stat(); - byte[] oldData = null; - try { - oldData = readZNode(znode, stat); - } catch (IOException e) { - LOG.error("Error reading data for " + znode); - } - if(oldData == null) { - LOG.debug("While creating UNASSIGNED region " + regionName + " exists with no data" ); - } - else { - LOG.debug("While creating UNASSIGNED region " + regionName + " exists, state = " + (HBaseEventType.fromByte(oldData[0]))); - } - } - else { - if(data == null) { - LOG.debug("Creating UNASSIGNED region " + regionName + " with no data" ); - } - else { - LOG.debug("Creating UNASSIGNED region " + regionName + " in state = " + (HBaseEventType.fromByte(data[0]))); - } - } - } - synchronized(unassignedZNodesWatched) { - unassignedZNodesWatched.add(znode); - createZNodeIfNotExists(znode, data, CreateMode.PERSISTENT, true); - } - } - /** - * Given a region name and some data, this method updates the region znode - * data under the UNASSGINED znode with the latest data. This method will - * update the znode data only if it already exists. - * - * @param regionName - encoded name of the region - * @param data - new serialized data to update the region znode - */ - public void updateUnassignedRegion(String regionName, byte[] data) { - String znode = getZNode(getRegionInTransitionZNode(), regionName); - // this is an update - make sure the node already exists - if(!exists(znode, true)) { - LOG.error("Cannot update " + znode + " - node does not exist" ); - return; - } - - Stat stat = new Stat(); - byte[] oldData = null; - try { - oldData = readZNode(znode, stat); - } catch (IOException e) { - LOG.error("Error reading data for " + znode); - } - // If there is no data in the ZNode, then update it - if(oldData == null) { - LOG.debug("While updating UNASSIGNED region " + regionName + " - node exists with no data" ); - } - // If there is data in the ZNode, do not update if it is already correct - else { - HBaseEventType curState = HBaseEventType.fromByte(oldData[0]); - HBaseEventType newState = HBaseEventType.fromByte(data[0]); - // If the znode has the right state already, do not update it. Updating - // the znode again and again will bump up the zk version. This may cause - // the region server to fail. The RS expects that the znode is never - // updated by anyone else while it is opening/closing a region. - if(curState == newState) { - LOG.debug("No need to update UNASSIGNED region " + regionName + - " as it already exists in state = " + curState); - return; - } - - // If the ZNode is in another state, then update it - LOG.debug("UNASSIGNED region " + regionName + " is currently in state = " + - curState + ", updating it to " + newState); - } - // Update the ZNode - synchronized(unassignedZNodesWatched) { - unassignedZNodesWatched.add(znode); - try { - writeZNode(znode, data, -1, true); - } catch (IOException e) { - LOG.error("Error writing data for " + znode + ", could not update state to " + (HBaseEventType.fromByte(data[0]))); - } - } - } - - /** - * This method will create a new region in transition entry in ZK with the - * speficied data if none exists. If one already exists, it will update the - * data with whatever is passed in. - * - * @param regionName - encoded name of the region - * @param data - serialized data for the region znode - */ - public void createOrUpdateUnassignedRegion(String regionName, byte[] data) { - String znode = getZNode(getRegionInTransitionZNode(), regionName); - if(exists(znode, true)) { - updateUnassignedRegion(regionName, data); - } - else { - createUnassignedRegion(regionName, data); - } - } - - public void deleteUnassignedRegion(String regionName) { - String znode = getZNode(getRegionInTransitionZNode(), regionName); - try { - LOG.debug("Deleting ZNode " + znode + " in ZooKeeper as region is open..."); - synchronized(unassignedZNodesWatched) { - unassignedZNodesWatched.remove(znode); - deleteZNode(znode); - } - } catch (KeeperException.SessionExpiredException e) { - LOG.error("Zookeeper session has expired", e); - // if the session has expired try to reconnect to ZK, then perform query - try { - // TODO: ZK-REFACTOR: should just quit on reconnect?? - reconnectToZk(); - synchronized(unassignedZNodesWatched) { - unassignedZNodesWatched.remove(znode); - deleteZNode(znode); - } - } catch (IOException e1) { - LOG.error("Error reconnecting to zookeeper", e1); - throw new RuntimeException("Error reconnecting to zookeeper", e1); - } catch (KeeperException.SessionExpiredException e1) { - LOG.error("Error reading after reconnecting to zookeeper", e1); - throw new RuntimeException("Error reading after reconnecting to zookeeper", e1); - } catch (KeeperException e1) { - LOG.error("Error reading after reconnecting to zookeeper", e1); - } catch (InterruptedException e1) { - LOG.error("Error reading after reconnecting to zookeeper", e1); - } - } catch (KeeperException e) { - LOG.error("Error deleting region " + regionName, e); - } catch (InterruptedException e) { - LOG.error("Error deleting region " + regionName, e); - } - } - - /** - * Atomically adds a watch and reads data from the unwatched znodes in the - * UNASSGINED region. This works because the master is the only person - * deleting nodes. - * @param znode - * @return - */ - public List watchAndGetNewChildren(String znode) { - List nodes = null; - List newNodes = new ArrayList(); - try { - if (checkExistenceOf(znode)) { - synchronized(unassignedZNodesWatched) { - nodes = zooKeeper.getChildren(znode, this); - for (String node : nodes) { - String znodePath = joinPath(znode, node); - if(!unassignedZNodesWatched.contains(znodePath)) { - byte[] data = getDataAndWatch(znode, node, this); - newNodes.add(new ZNodePathAndData(znodePath, data)); - unassignedZNodesWatched.add(znodePath); - } - } - } - } - } catch (KeeperException e) { - LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); - } catch (InterruptedException e) { - LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e); - } - return newNodes; - } - public static class ZNodePathAndData { private String zNodePath; private byte[] data; Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1259,7 +1259,14 @@ if (LOG.isDebugEnabled()) LOG.debug("sending initial server load: " + hsl); lastMsg = System.currentTimeMillis(); - zooKeeperWrapper.writeRSLocation(this.serverInfo); + boolean startCodeOk = false; + while(!startCodeOk) { + this.serverInfo = createServerInfoWithNewStartCode(this.serverInfo); + startCodeOk = zooKeeperWrapper.writeRSLocation(this.serverInfo); + if(!startCodeOk) { + LOG.debug("Start code already taken, trying another one"); + } + } result = this.hbaseMaster.regionServerStartup(this.serverInfo); break; } catch (IOException e) { @@ -1270,6 +1277,26 @@ return result; } + private HServerInfo createServerInfoWithNewStartCode(final HServerInfo hsi) { + return new HServerInfo(hsi.getServerAddress(), hsi.getInfoPort(), + hsi.getHostname()); + } + + /* Add to the outbound message buffer */ + private void reportOpen(HRegionInfo region) { + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); + } + + /* Add to the outbound message buffer */ + private void reportClose(HRegionInfo region) { + reportClose(region, null); + } + + /* Add to the outbound message buffer */ + private void reportClose(final HRegionInfo region, final byte[] message) { + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + } + /** * Add to the outbound message buffer * @@ -1434,12 +1461,8 @@ void openRegion(final HRegionInfo regionInfo) { Integer mapKey = Bytes.mapKey(regionInfo.getRegionName()); HRegion region = this.onlineRegions.get(mapKey); - RSZookeeperUpdater zkUpdater = - new RSZookeeperUpdater(conf, serverInfo.getServerName(), - regionInfo.getEncodedName()); if (region == null) { try { - zkUpdater.startRegionOpenEvent(null, true); region = instantiateRegion(regionInfo, this.hlog); // Startup a compaction early if one is needed, if region has references // or if a store has too many store files @@ -1454,25 +1477,12 @@ // TODO: add an extra field in HRegionInfo to indicate that there is // an error. We can't do that now because that would be an incompatible // change that would require a migration - try { - HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, - regionInfo, - StringUtils.stringifyException(t).getBytes()); - zkUpdater.abortOpenRegion(hmsg); - } catch (IOException e1) { - // TODO: Can we recover? Should be throw RTE? - LOG.error("Failed to abort open region " + regionInfo.getRegionNameAsString(), e1); - } + reportClose(regionInfo, StringUtils.stringifyException(t).getBytes()); return; } addToOnlineRegions(region); } - try { - HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo); - zkUpdater.finishRegionOpenEvent(hmsg); - } catch (IOException e) { - LOG.error("Failed to mark region " + regionInfo.getRegionNameAsString() + " as opened", e); - } + reportOpen(regionInfo); } /* @@ -1511,20 +1521,11 @@ protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) throws IOException { - RSZookeeperUpdater zkUpdater = null; - if(reportWhenCompleted) { - zkUpdater = new RSZookeeperUpdater(conf, - serverInfo.getServerName(), hri.getEncodedName()); - zkUpdater.startRegionCloseEvent(null, false); - } HRegion region = this.removeFromOnlineRegions(hri); if (region != null) { region.close(); if(reportWhenCompleted) { - if(zkUpdater != null) { - HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, hri, null); - zkUpdater.finishRegionCloseEvent(hmsg); - } + reportClose(hri); } } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (working copy) @@ -1,164 +0,0 @@ -package org.apache.hadoop.hbase.regionserver; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; -import org.apache.hadoop.hbase.executor.HBaseEventHandler; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; - -/** - * This is a helper class for region servers to update various states in - * Zookeeper. The various updates are abstracted out here. - * - * The "startRegionXXX" methods are to be called first, followed by the - * "finishRegionXXX" methods. Supports updating zookeeper periodically as a - * part of the "startRegionXXX". Currently handles the following state updates: - * - Close region - * - Open region - */ -// TODO: make this thread local, in which case it is re-usable per thread -public class RSZookeeperUpdater { - private static final Log LOG = LogFactory.getLog(RSZookeeperUpdater.class); - private final String regionServerName; - private String regionName = null; - private String regionZNode = null; - private ZooKeeperWrapper zkWrapper = null; - private int zkVersion = 0; - HBaseEventType lastUpdatedState; - - public RSZookeeperUpdater(Configuration conf, - String regionServerName, String regionName) { - this(conf, regionServerName, regionName, 0); - } - - public RSZookeeperUpdater(Configuration conf, String regionServerName, - String regionName, int zkVersion) { - this.zkWrapper = ZooKeeperWrapper.getInstance(conf, regionServerName); - this.regionServerName = regionServerName; - this.regionName = regionName; - // get the region ZNode we have to create - this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName); - this.zkVersion = zkVersion; - } - - /** - * This method updates the various states in ZK to inform the master that the - * region server has started closing the region. - * @param updatePeriodically - if true, periodically updates the state in ZK - */ - public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException { - // if this ZNode already exists, something is wrong - if(zkWrapper.exists(regionZNode, true)) { - String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region."; - LOG.error(msg); - throw new IOException(msg); - } - - // create the region node in the unassigned directory first - zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true); - - // update the data for "regionName" ZNode in unassigned to CLOSING - updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg); - - // TODO: implement the updatePeriodically logic here - } - - /** - * This method updates the states in ZK to signal that the region has been - * closed. This will stop the periodic updater thread if one was started. - * @throws IOException - */ - public void finishRegionCloseEvent(HMsg hmsg) throws IOException { - // TODO: stop the updatePeriodically here - - // update the data for "regionName" ZNode in unassigned to CLOSED - updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg); - } - - /** - * This method updates the various states in ZK to inform the master that the - * region server has started opening the region. - * @param updatePeriodically - if true, periodically updates the state in ZK - */ - public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException { - Stat stat = new Stat(); - byte[] data = zkWrapper.readZNode(regionZNode, stat); - // if there is no ZNode for this region, something is wrong - if(data == null) { - String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region."; - LOG.error(msg); - throw new IOException(msg); - } - // if the ZNode is not in the closed state, something is wrong - HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]); - if(rsEvent != HBaseEventType.RS2ZK_REGION_CLOSED && rsEvent != HBaseEventType.M2ZK_REGION_OFFLINE) { - String msg = "ZNode " + regionZNode + " is not in CLOSED/OFFLINE state (state = " + rsEvent + "), will NOT open region."; - LOG.error(msg); - throw new IOException(msg); - } - - // get the version to update from ZK - zkVersion = stat.getVersion(); - - // update the data for "regionName" ZNode in unassigned to CLOSING - updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENING, hmsg); - - // TODO: implement the updatePeriodically logic here - } - - /** - * This method updates the states in ZK to signal that the region has been - * opened. This will stop the periodic updater thread if one was started. - * @throws IOException - */ - public void finishRegionOpenEvent(HMsg hmsg) throws IOException { - // TODO: stop the updatePeriodically here - - // update the data for "regionName" ZNode in unassigned to CLOSED - updateZKWithEventData(HBaseEventType.RS2ZK_REGION_OPENED, hmsg); - } - - public boolean isClosingRegion() { - return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_CLOSING); - } - - public boolean isOpeningRegion() { - return (lastUpdatedState == HBaseEventType.RS2ZK_REGION_OPENING); - } - - public void abortOpenRegion(HMsg hmsg) throws IOException { - LOG.error("Aborting open of region " + regionName); - - // TODO: stop the updatePeriodically for start open region here - - // update the data for "regionName" ZNode in unassigned to CLOSED - updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSED, hmsg); - } - - private void updateZKWithEventData(HBaseEventType hbEventType, HMsg hmsg) throws IOException { - // update the data for "regionName" ZNode in unassigned to "hbEventType" - byte[] data = null; - try { - data = Writables.getBytes(new RegionTransitionEventData(hbEventType, regionServerName, hmsg)); - } catch (IOException e) { - LOG.error("Error creating event data for " + hbEventType, e); - } - LOG.debug("Updating ZNode " + regionZNode + - " with [" + hbEventType + "]" + - " expected version = " + zkVersion); - lastUpdatedState = hbEventType; - zkWrapper.writeZNode(regionZNode, data, zkVersion, true); - zkVersion++; - } -} Index: src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (working copy) @@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import java.io.IOException; @@ -115,10 +114,6 @@ } else { master.getRegionManager().removeRegion(regionInfo); } - ZooKeeperWrapper zkWrapper = - ZooKeeperWrapper.getInstance(master.getConfiguration(), - HMaster.class.getName()); - zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName()); return true; } } Index: src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (working copy) @@ -1,185 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; -import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler; -import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; - -/** - * Watches the UNASSIGNED znode in ZK for the master, and handles all events - * relating to region transitions. - */ -public class ZKUnassignedWatcher implements Watcher { - private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class); - - private ZooKeeperWrapper zkWrapper; - String serverName; - ServerManager serverManager; - - public static void start(Configuration conf, HMaster master) - throws IOException { - new ZKUnassignedWatcher(conf, master); - LOG.debug("Started ZKUnassigned watcher"); - } - - public ZKUnassignedWatcher(Configuration conf, HMaster master) - throws IOException { - this.serverName = master.getHServerAddress().toString(); - this.serverManager = master.getServerManager(); - zkWrapper = ZooKeeperWrapper.getInstance(conf, HMaster.class.getName()); - String unassignedZNode = zkWrapper.getRegionInTransitionZNode(); - - // If the UNASSIGNED ZNode exists and this is a fresh cluster start, then - // delete it. - if(master.isClusterStartup() && zkWrapper.exists(unassignedZNode, false)) { - LOG.info("Cluster start, but found " + unassignedZNode + ", deleting it."); - try { - zkWrapper.deleteZNode(unassignedZNode, true); - } catch (KeeperException e) { - LOG.error("Could not delete znode " + unassignedZNode, e); - throw new IOException(e); - } catch (InterruptedException e) { - LOG.error("Could not delete znode " + unassignedZNode, e); - throw new IOException(e); - } - } - - // If the UNASSIGNED ZNode does not exist, create it. - zkWrapper.createZNodeIfNotExists(unassignedZNode); - - // TODO: get the outstanding changes in UNASSIGNED - - // Set a watch on Zookeeper's UNASSIGNED node if it exists. - zkWrapper.registerListener(this); - } - - /** - * This is the processing loop that gets triggered from the ZooKeeperWrapper. - * This zookeeper events process function dies the following: - * - WATCHES the following events: NodeCreated, NodeDataChanged, NodeChildrenChanged - * - IGNORES the following events: None, NodeDeleted - */ - @Override - public synchronized void process(WatchedEvent event) { - EventType type = event.getType(); - LOG.debug("ZK-EVENT-PROCESS: Got zkEvent " + type + - " state:" + event.getState() + - " path:" + event.getPath()); - - // Handle the ignored events - if(type.equals(EventType.None) || - type.equals(EventType.NodeDeleted)) { - return; - } - - // check if the path is for the UNASSIGNED directory we care about - if(event.getPath() == null || - !event.getPath().startsWith(zkWrapper.getZNodePathForHBase( - zkWrapper.getRegionInTransitionZNode()))) { - return; - } - - try - { - /* - * If a node is created in the UNASSIGNED directory in zookeeper, then: - * 1. watch its updates (this is an unassigned region). - * 2. read to see what its state is and handle as needed (state may have - * changed before we started watching it) - */ - if(type.equals(EventType.NodeCreated)) { - zkWrapper.watchZNode(event.getPath()); - handleRegionStateInZK(event.getPath()); - } - /* - * Data on some node has changed. Read to see what the state is and handle - * as needed. - */ - else if(type.equals(EventType.NodeDataChanged)) { - handleRegionStateInZK(event.getPath()); - } - /* - * If there were some nodes created then watch those nodes - */ - else if(type.equals(EventType.NodeChildrenChanged)) { - List newZNodes = - zkWrapper.watchAndGetNewChildren(event.getPath()); - for(ZNodePathAndData zNodePathAndData : newZNodes) { - LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath()); - handleRegionStateInZK(zNodePathAndData.getzNodePath(), - zNodePathAndData.getData()); - } - } - } - catch (IOException e) - { - LOG.error("Could not process event from ZooKeeper", e); - } - } - - /** - * Read the state of a node in ZK, and do the needful. We want to do the - * following: - * 1. If region's state is updated as CLOSED, invoke the ClosedRegionHandler. - * 2. If region's state is updated as OPENED, invoke the OpenRegionHandler. - * @param zNodePath - * @throws IOException - */ - private void handleRegionStateInZK(String zNodePath) throws IOException { - byte[] data = zkWrapper.readZNode(zNodePath, null); - handleRegionStateInZK(zNodePath, data); - } - - private void handleRegionStateInZK(String zNodePath, byte[] data) { - // a null value is set when a node is created, we don't need to handle this - if(data == null) { - return; - } - String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode(); - String region = zNodePath.substring( - zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1); - HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]); - LOG.debug("Got event type [ " + rsEvent + " ] for region " + region); - - // if the node was CLOSED then handle it - if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) { - new MasterCloseRegionHandler(rsEvent, serverManager, serverName, region, data).submit(); - } - // if the region was OPENED then handle that - else if(rsEvent == HBaseEventType.RS2ZK_REGION_OPENED || - rsEvent == HBaseEventType.RS2ZK_REGION_OPENING) { - new MasterOpenRegionHandler(rsEvent, serverManager, serverName, region, data).submit(); - } - } -} - Index: src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (working copy) @@ -1,112 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; -import org.apache.hadoop.hbase.executor.HBaseEventHandler; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.util.Writables; - -/** - * This is the event handler for all events relating to opening regions on the - * HMaster. This could be one of the following: - * - notification that a region server is "OPENING" a region - * - notification that a region server has "OPENED" a region - * The following event types map to this handler: - * - RS_REGION_OPENING - * - RS_REGION_OPENED - */ -public class MasterOpenRegionHandler extends HBaseEventHandler { - private static final Log LOG = LogFactory.getLog(MasterOpenRegionHandler.class); - // other args passed in a byte array form - protected byte[] serializedData; - private String regionName; - private RegionTransitionEventData hbEventData; - ServerManager serverManager; - - public MasterOpenRegionHandler(HBaseEventType eventType, - ServerManager serverManager, - String serverName, - String regionName, - byte[] serData) { - super(false, serverName, eventType); - this.regionName = regionName; - this.serializedData = serData; - this.serverManager = serverManager; - } - - /** - * Handle the various events relating to opening regions. We can get the - * following events here: - * - RS_REGION_OPENING : Keep track to see how long the region open takes. - * If the RS is taking too long, then revert the - * region back to closed state so that it can be - * re-assigned. - * - RS_REGION_OPENED : The region is opened. Add an entry into META for - * the RS having opened this region. Then delete this - * entry in ZK. - */ - @Override - public void process() - { - LOG.debug("Event = " + getHBEvent() + ", region = " + regionName); - if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENING) { - handleRegionOpeningEvent(); - } - else if(this.getHBEvent() == HBaseEventType.RS2ZK_REGION_OPENED) { - handleRegionOpenedEvent(); - } - } - - private void handleRegionOpeningEvent() { - // TODO: not implemented. - LOG.debug("NO-OP call to handling region opening event"); - // Keep track to see how long the region open takes. If the RS is taking too - // long, then revert the region back to closed state so that it can be - // re-assigned. - } - - private void handleRegionOpenedEvent() { - try { - if(hbEventData == null) { - hbEventData = new RegionTransitionEventData(); - Writables.getWritable(serializedData, hbEventData); - } - } catch (IOException e) { - LOG.error("Could not deserialize additional args for Open region", e); - } - LOG.debug("RS " + hbEventData.getRsName() + " has opened region " + regionName); - HServerInfo serverInfo = serverManager.getServerInfo(hbEventData.getRsName()); - ArrayList returnMsgs = new ArrayList(); - serverManager.processRegionOpen(serverInfo, hbEventData.getHmsg().getRegionInfo(), returnMsgs); - if(returnMsgs.size() > 0) { - LOG.error("Open region tried to send message: " + returnMsgs.get(0).getType() + - " about " + returnMsgs.get(0).getRegionInfo().getRegionNameAsString()); - } - } -} Index: src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (working copy) @@ -1,94 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.handler; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; -import org.apache.hadoop.hbase.executor.HBaseEventHandler; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.util.Writables; - -/** - * This is the event handler for all events relating to closing regions on the - * HMaster. The following event types map to this handler: - * - RS_REGION_CLOSING - * - RS_REGION_CLOSED - */ -public class MasterCloseRegionHandler extends HBaseEventHandler -{ - private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class); - - private String regionName; - protected byte[] serializedData; - RegionTransitionEventData hbEventData; - ServerManager serverManager; - - public MasterCloseRegionHandler(HBaseEventType eventType, - ServerManager serverManager, - String serverName, - String regionName, - byte[] serializedData) { - super(false, serverName, eventType); - this.regionName = regionName; - this.serializedData = serializedData; - this.serverManager = serverManager; - } - - /** - * Handle the various events relating to closing regions. We can get the - * following events here: - * - RS_REGION_CLOSING : No-op - * - RS_REGION_CLOSED : The region is closed. If we are not in a shutdown - * state, find the RS to open this region. This could - * be a part of a region move, or just that the RS has - * died. Should result in a M_REQUEST_OPENREGION event - * getting created. - */ - @Override - public void process() - { - LOG.debug("Event = " + getHBEvent() + ", region = " + regionName); - // handle RS_REGION_CLOSED events - handleRegionClosedEvent(); - } - - private void handleRegionClosedEvent() { - try { - if(hbEventData == null) { - hbEventData = new RegionTransitionEventData(); - Writables.getWritable(serializedData, hbEventData); - } - } catch (IOException e) { - LOG.error("Could not deserialize additional args for Close region", e); - } - // process the region close - this will cause the reopening of the - // region as a part of the heartbeat of some RS - serverManager.processRegionClose(hbEventData.getHmsg().getRegionInfo()); - LOG.info("Processed close of region " + hbEventData.getHmsg().getRegionInfo().getRegionNameAsString()); - } - - public String getRegionName() { - return regionName; - } -} Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -72,8 +72,6 @@ import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; -import org.apache.hadoop.hbase.executor.HBaseExecutorService; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; @@ -252,18 +250,6 @@ new RegionServerOperationQueue(this.conf, this.closed); serverManager = new ServerManager(this); - - - // Start the unassigned watcher - which will create the unassigned region - // in ZK. This is needed before RegionManager() constructor tries to assign - // the root region. - ZKUnassignedWatcher.start(this.conf, this); - // start the "close region" executor service - HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString()); - // start the "open region" executor service - HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString()); - - // start the region manager regionManager = new RegionManager(this); @@ -565,7 +551,6 @@ this.rpcServer.stop(); this.regionManager.stop(); this.zooKeeperWrapper.close(); - HBaseExecutorService.shutdown(); LOG.info("HMaster main thread exiting"); } Index: src/main/java/org/apache/hadoop/hbase/master/RegionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (working copy) @@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.executor.RegionTransitionEventData; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; @@ -96,9 +94,6 @@ */ final SortedMap regionsInTransition = Collections.synchronizedSortedMap(new TreeMap()); - - // regions in transition are also recorded in ZK using the zk wrapper - final ZooKeeperWrapper zkWrapper; // How many regions to assign a server at a time. private final int maxAssignInOneGo; @@ -129,12 +124,10 @@ private final int zooKeeperNumRetries; private final int zooKeeperPause; - RegionManager(HMaster master) throws IOException { + RegionManager(HMaster master) { Configuration conf = master.getConfiguration(); this.master = master; - this.zkWrapper = - ZooKeeperWrapper.getInstance(conf, HMaster.class.getName()); this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10); this.loadBalancer = new LoadBalancer(conf); @@ -172,18 +165,10 @@ unsetRootRegion(); if (!master.getShutdownRequested().get()) { synchronized (regionsInTransition) { - String regionName = HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(); - byte[] data = null; - try { - data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); - } catch (IOException e) { - LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); - } - zkWrapper.createOrUpdateUnassignedRegion( - HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data); - LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); - RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, RegionState.State.UNASSIGNED); - regionsInTransition.put(regionName, s); + RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO, + RegionState.State.UNASSIGNED); + regionsInTransition.put( + HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s); LOG.info("ROOT inserted into regionsInTransition"); } } @@ -338,15 +323,6 @@ LOG.info("Assigning region " + regionName + " to " + sinfo.getServerName()); rs.setPendingOpen(sinfo.getServerName()); synchronized (this.regionsInTransition) { - byte[] data = null; - try { - data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); - } catch (IOException e) { - LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); - } - zkWrapper.createOrUpdateUnassignedRegion( - rs.getRegionInfo().getEncodedName(), data); - LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); this.regionsInTransition.put(regionName, rs); } @@ -987,17 +963,6 @@ synchronized(this.regionsInTransition) { s = regionsInTransition.get(info.getRegionNameAsString()); if (s == null) { - byte[] data = null; - try { - data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER)); - } catch (IOException e) { - // TODO: Review what we should do here. If Writables work this - // should never happen - LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e); - } - zkWrapper.createOrUpdateUnassignedRegion(info.getEncodedName(), data); - LOG.debug("Created/updated UNASSIGNED zNode " + info.getRegionNameAsString() + - " in state " + HBaseEventType.M2ZK_REGION_OFFLINE); s = new RegionState(info, RegionState.State.UNASSIGNED); regionsInTransition.put(info.getRegionNameAsString(), s); } @@ -1243,8 +1208,6 @@ public void setRootRegionLocation(HServerAddress address) { writeRootRegionLocationToZooKeeper(address); synchronized (rootRegionLocation) { - // the root region has been assigned, remove it from transition in ZK - zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName()); rootRegionLocation.set(new HServerAddress(address)); rootRegionLocation.notifyAll(); } Index: src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (working copy) @@ -74,7 +74,7 @@ } else if(type.equals(EventType.NodeCreated) && event.getPath().equals(this.zookeeper.clusterStateZNode)) { LOG.debug("Resetting watch on cluster state node."); - this.zookeeper.setClusterStateWatch(); + this.zookeeper.setClusterStateWatch(this); } } @@ -87,7 +87,7 @@ try { LOG.debug("Waiting for master address ZNode to be deleted " + "(Also watching cluster state node)"); - this.zookeeper.setClusterStateWatch(); + this.zookeeper.setClusterStateWatch(this); wait(); } catch (InterruptedException e) { } @@ -110,7 +110,7 @@ } if(this.zookeeper.writeMasterAddress(address)) { this.zookeeper.setClusterState(true); - this.zookeeper.setClusterStateWatch(); + this.zookeeper.setClusterStateWatch(this); // Watch our own node this.zookeeper.readMasterAddress(this); return true; Index: src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/executor/HBaseEventHandler.java (working copy) @@ -1,278 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.executor; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType; -import org.apache.hadoop.hbase.master.ServerManager; - - -/** - * Abstract base class for all HBase event handlers. Subclasses should - * implement the process() method where the actual handling of the event - * happens. - * - * HBaseEventType is a list of ALL events (which also corresponds to messages - - * either internal to one component or between components). The event type - * names specify the component from which the event originated, and the - * component which is supposed to handle it. - * - * Listeners can listen to all the events by implementing the interface - * HBaseEventHandlerListener, and by registering themselves as a listener. They - * will be called back before and after the process of every event. - * - * TODO: Rename HBaseEvent and HBaseEventType to EventHandler and EventType - * after ZK refactor as it currently would clash with EventType from ZK and - * make the code very confusing. - */ -public abstract class HBaseEventHandler implements Runnable -{ - private static final Log LOG = LogFactory.getLog(HBaseEventHandler.class); - // type of event this object represents - protected HBaseEventType eventType = HBaseEventType.NONE; - // is this a region server or master? - protected boolean isRegionServer; - // name of the server - this is needed for naming executors in case of tests - // where region servers may be co-located. - protected String serverName; - // listeners that are called before and after an event is processed - protected static List eventHandlerListeners = - Collections.synchronizedList(new ArrayList()); - - /** - * This interface provides hooks to listen to various events received by the - * queue. A class implementing this can listen to the updates by calling - * registerListener and stop receiving updates by calling unregisterListener - */ - public interface HBaseEventHandlerListener { - /** - * Called before any event is processed - */ - public void beforeProcess(HBaseEventHandler event); - /** - * Called after any event is processed - */ - public void afterProcess(HBaseEventHandler event); - } - - /** - * These are a list of HBase events that can be handled by the various - * HBaseExecutorService's. All the events are serialized as byte values. - */ - public enum HBaseEventType { - NONE (-1), - // Messages originating from RS (NOTE: there is NO direct communication from - // RS to Master). These are a result of RS updates into ZK. - RS2ZK_REGION_CLOSING (1), // RS is in process of closing a region - RS2ZK_REGION_CLOSED (2), // RS has finished closing a region - RS2ZK_REGION_OPENING (3), // RS is in process of opening a region - RS2ZK_REGION_OPENED (4), // RS has finished opening a region - - // Updates from master to ZK. This is done by the master and there is - // nothing to process by either Master or RS - M2ZK_REGION_OFFLINE (50); // Master adds this region as offline in ZK - - private final byte value; - - /** - * Called by the HMaster. Returns a name of the executor service given an - * event type. Every event type has en entry - if the event should not be - * handled just add the NONE executor. - * @return name of the executor service - */ - public HBaseExecutorServiceType getMasterExecutorForEvent() { - HBaseExecutorServiceType executorServiceType = null; - switch(this) { - - case RS2ZK_REGION_CLOSING: - case RS2ZK_REGION_CLOSED: - executorServiceType = HBaseExecutorServiceType.MASTER_CLOSEREGION; - break; - - case RS2ZK_REGION_OPENING: - case RS2ZK_REGION_OPENED: - executorServiceType = HBaseExecutorServiceType.MASTER_OPENREGION; - break; - - case M2ZK_REGION_OFFLINE: - executorServiceType = HBaseExecutorServiceType.NONE; - break; - - default: - throw new RuntimeException("Unhandled event type in the master."); - } - - return executorServiceType; - } - - /** - * Called by the RegionServer. Returns a name of the executor service given an - * event type. Every event type has en entry - if the event should not be - * handled just return a null executor name. - * @return name of the event service - */ - public static String getRSExecutorForEvent(String serverName) { - throw new RuntimeException("Unsupported operation."); - } - - /** - * Start the executor service that handles the passed in event type. The - * server that starts these event executor services wants to handle these - * event types. - */ - public void startMasterExecutorService(String serverName) { - HBaseExecutorServiceType serviceType = getMasterExecutorForEvent(); - if(serviceType == HBaseExecutorServiceType.NONE) { - throw new RuntimeException("Event type " + toString() + " not handled on master."); - } - serviceType.startExecutorService(serverName); - } - - public static void startRSExecutorService() { - - } - - HBaseEventType(int intValue) { - this.value = (byte)intValue; - } - - public byte getByteValue() { - return value; - } - - public static HBaseEventType fromByte(byte value) { - switch(value) { - case -1: return HBaseEventType.NONE; - case 1 : return HBaseEventType.RS2ZK_REGION_CLOSING; - case 2 : return HBaseEventType.RS2ZK_REGION_CLOSED; - case 3 : return HBaseEventType.RS2ZK_REGION_OPENING; - case 4 : return HBaseEventType.RS2ZK_REGION_OPENED; - case 50: return HBaseEventType.M2ZK_REGION_OFFLINE; - - default: - throw new RuntimeException("Invalid byte value for conversion to HBaseEventType"); - } - } - } - - /** - * Default base class constructor. - * - * TODO: isRegionServer and serverName will go away once we do the HMaster - * refactor. We will end up passing a ServerStatus which should tell us both - * the name and if it is a RS or master. - */ - public HBaseEventHandler(boolean isRegionServer, String serverName, HBaseEventType eventType) { - this.isRegionServer = isRegionServer; - this.eventType = eventType; - this.serverName = serverName; - } - - /** - * This is a wrapper around process, used to update listeners before and after - * events are processed. - */ - public void run() { - // fire all beforeProcess listeners - for(HBaseEventHandlerListener listener : eventHandlerListeners) { - listener.beforeProcess(this); - } - - // call the main process function - try { - process(); - } catch(Throwable t) { - LOG.error("Caught throwable while processing event " + eventType, t); - } - - // fire all afterProcess listeners - for(HBaseEventHandlerListener listener : eventHandlerListeners) { - LOG.debug("Firing " + listener.getClass().getName() + - ".afterProcess event listener for event " + eventType); - listener.afterProcess(this); - } - } - - /** - * This method is the main processing loop to be implemented by the various - * subclasses. - */ - public abstract void process(); - - /** - * Subscribe to updates before and after processing events - */ - public static void registerListener(HBaseEventHandlerListener listener) { - eventHandlerListeners.add(listener); - } - - /** - * Stop receiving updates before and after processing events - */ - public static void unregisterListener(HBaseEventHandlerListener listener) { - eventHandlerListeners.remove(listener); - } - - public boolean isRegionServer() { - return isRegionServer; - } - - /** - * Return the name for this event type. - * @return - */ - public HBaseExecutorServiceType getEventHandlerName() { - // TODO: check for isRegionServer here - return eventType.getMasterExecutorForEvent(); - } - - /** - * Return the event type - * @return - */ - public HBaseEventType getHBEvent() { - return eventType; - } - - /** - * Submits this event object to the correct executor service. This is causes - * this object to get executed by the correct ExecutorService. - */ - public void submit() { - HBaseExecutorServiceType serviceType = getEventHandlerName(); - if(serviceType == null) { - throw new RuntimeException("Event " + eventType + " not handled on this server " + serverName); - } - serviceType.getExecutor(serverName).submit(this); - } - - /** - * Executes this event object in the caller's thread. This is a synchronous - * way of executing the event. - */ - public void execute() { - this.run(); - } -} Index: src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (working copy) @@ -1,171 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.executor; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * This is a generic HBase executor service. This component abstract a - * threadpool, a queue to which jobs can be submitted and a Runnable that - * handles the object that is added to the queue. - * - * In order to create a new HBExecutorService, you need to do: - * HBExecutorService.startExecutorService("myService"); - * - * In order to use the service created above, you need to override the - * HBEventHandler class and create an event type that submits to this service. - * - */ -public class HBaseExecutorService -{ - private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class); - // default number of threads in the pool - private int corePoolSize = 1; - // max number of threads - maximum concurrency - private int maximumPoolSize = 5; - // how long to retain excess threads - private long keepAliveTimeInMillis = 1000; - // the thread pool executor that services the requests - ThreadPoolExecutor threadPoolExecutor; - // work queue to use - unbounded queue - BlockingQueue workQueue = new LinkedBlockingQueue(); - // name for this executor service - String name; - // hold the all the executors created in a map addressable by their names - static Map executorServicesMap = - Collections.synchronizedMap(new HashMap()); - - - /** - * The following is a list of names for the various executor services in both - * the master and the region server. - */ - public enum HBaseExecutorServiceType { - NONE (-1), - MASTER_CLOSEREGION (1), - MASTER_OPENREGION (2); - - private final int value; - - HBaseExecutorServiceType(int intValue) { - this.value = intValue; - } - - public void startExecutorService(String serverName) { - // if this is NONE then there is no executor to start - if(value == NONE.value) { - throw new RuntimeException("Cannot start NONE executor type."); - } - String name = getExecutorName(serverName); - if(HBaseExecutorService.isExecutorServiceRunning(name)) { - LOG.debug("Executor service " + toString() + " already running on " + serverName); - return; - } - HBaseExecutorService.startExecutorService(name); - } - - public HBaseExecutorService getExecutor(String serverName) { - // if this is NONE then there is no executor - if(value == NONE.value) { - return null; - } - return HBaseExecutorService.getExecutorService(getExecutorName(serverName)); - } - - public String getExecutorName(String serverName) { - // if this is NONE then there is no executor - if(value == NONE.value) { - return null; - } - return (this.toString() + "-" + serverName); - } - } - - - - /** - * Start an executor service with a given name. If there was a service already - * started with the same name, this throws a RuntimeException. - * @param name Name of the service to start. - */ - public static void startExecutorService(String name) { - if(executorServicesMap.get(name) != null) { - throw new RuntimeException("An executor service with the name " + name + " is already running!"); - } - HBaseExecutorService hbes = new HBaseExecutorService(name); - executorServicesMap.put(name, hbes); - LOG.debug("Starting executor service: " + name); - } - - public static boolean isExecutorServiceRunning(String name) { - return (executorServicesMap.containsKey(name)); - } - - /** - * This method is an accessor for all the HBExecutorServices running so far - * addressable by name. If there is no such service, then it returns null. - */ - public static HBaseExecutorService getExecutorService(String name) { - HBaseExecutorService executor = executorServicesMap.get(name); - if(executor == null) { - LOG.debug("Executor service [" + name + "] not found."); - } - return executor; - } - - public static void shutdown() { - for(Entry entry : executorServicesMap.entrySet()) { - entry.getValue().threadPoolExecutor.shutdown(); - } - executorServicesMap.clear(); - } - - protected HBaseExecutorService(String name) { - this.name = name; - // create the thread pool executor - threadPoolExecutor = new ThreadPoolExecutor( - corePoolSize, - maximumPoolSize, - keepAliveTimeInMillis, - TimeUnit.MILLISECONDS, - workQueue - ); - // name the threads for this threadpool - threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name)); - } - - /** - * Submit the event to the queue for handling. - * @param event - */ - public void submit(Runnable event) { - threadPoolExecutor.execute(event); - } -} Index: src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionEventData.java (working copy) @@ -1,92 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.executor; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; -import org.apache.hadoop.io.Writable; - -public class RegionTransitionEventData implements Writable { - private HBaseEventType hbEvent; - private String rsName; - private long timeStamp; - private HMsg hmsg; - - public RegionTransitionEventData() { - } - - public RegionTransitionEventData(HBaseEventType hbEvent, String rsName) { - this(hbEvent, rsName, null); - } - - public RegionTransitionEventData(HBaseEventType hbEvent, String rsName, HMsg hmsg) { - this.hbEvent = hbEvent; - this.rsName = rsName; - this.timeStamp = System.currentTimeMillis(); - this.hmsg = hmsg; - } - - public HBaseEventType getHbEvent() { - return hbEvent; - } - - public String getRsName() { - return rsName; - } - - public long getTimeStamp() { - return timeStamp; - } - - public HMsg getHmsg() { - return hmsg; - } - - @Override - public void readFields(DataInput in) throws IOException { - // the event type byte - hbEvent = HBaseEventType.fromByte(in.readByte()); - // the hostname of the RS sending the data - rsName = in.readUTF(); - // the timestamp - timeStamp = in.readLong(); - if(in.readBoolean()) { - // deserialized the HMsg from ZK - hmsg = new HMsg(); - hmsg.readFields(in); - } - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeByte(hbEvent.getByteValue()); - out.writeUTF(rsName); - out.writeLong(System.currentTimeMillis()); - out.writeBoolean((hmsg != null)); - if(hmsg != null) { - hmsg.write(out); - } - } - -} Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 997548) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -72,7 +72,7 @@ conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false); if (replication) { this.zkHelper = new ReplicationZookeeperWrapper( - ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf, + ZooKeeperWrapper.createInstance(conf, hsi.getServerName()), conf, this.replicating, hsi.getServerName()); this.replicationMaster = zkHelper.isReplicationMaster(); this.replicationManager = this.replicationMaster ?