Index: src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionSplitsTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionSplitsTracker.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionSplitsTracker.java (revision 0) @@ -0,0 +1,103 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRegionSplitsTracker { + private static final Log LOG = LogFactory.getLog(TestRegionSplitsTracker.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testBasicFunctionality() + throws ZooKeeperConnectionException, IOException, KeeperException, InterruptedException { + Abortable abortable = new StubAbortable(); + ZooKeeperWatcher watcher = + new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "testBasicFunctionality", abortable); + RegionSplitsTracker.SplitHandler handler = + new RegionSplitsTracker.SplitHandler() { + @Override + public void handleSplit(HServerInfo hsi, HRegionInfo parent, + HRegionInfo a, HRegionInfo b) { + LOG.info("Handled split of " + parent.getRegionNameAsString() + + " into a=" + a.getRegionNameAsString() + " and b=" + + b.getRegionNameAsString()); + } + }; + final AtomicBoolean cleared = new AtomicBoolean(false); + RegionSplitsTracker rst = new RegionSplitsTracker(watcher, + TEST_UTIL.getConfiguration(), handler) { + @Override + public void nodeDeleted(String path) { + LOG.info("nodeDeleted called on " + path); + cleared.set(true); + } + }; + rst.start(); + Thread.sleep(1000); + HTableDescriptor htd = new HTableDescriptor("test"); + RegionSplitsTracker.messageSplitHappened(watcher, TEST_UTIL.getConfiguration(), + new HServerInfo(new HServerAddress("example.org:1234"), 5678, "wahwah"), + new HRegionInfo(htd, Bytes.toBytes("a"), Bytes.toBytes("c")), + new HRegionInfo(htd, Bytes.toBytes("a"), Bytes.toBytes("b")), + new HRegionInfo(htd, Bytes.toBytes("b"), Bytes.toBytes("c"))); + while (!cleared.get()) Thread.currentThread().join(1); + rst.stop(); + } + + public static class StubAbortable implements Abortable { + @Override + public void abort(final String msg, final Throwable t) {} + } + + public static class StubWatcher implements Watcher { + @Override + public void process(WatchedEvent event) {} + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1066564) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -177,23 +177,39 @@ * Set the local variable node names using the specified configuration. */ private void setNodeNames(Configuration conf) { - baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); - rootServerZNode = ZKUtil.joinZNode(baseZNode, - conf.get("zookeeper.znode.rootserver", "root-region-server")); - rsZNode = ZKUtil.joinZNode(baseZNode, - conf.get("zookeeper.znode.rs", "rs")); - masterAddressZNode = ZKUtil.joinZNode(baseZNode, - conf.get("zookeeper.znode.master", "master")); - clusterStateZNode = ZKUtil.joinZNode(baseZNode, - conf.get("zookeeper.znode.state", "shutdown")); - assignmentZNode = ZKUtil.joinZNode(baseZNode, - conf.get("zookeeper.znode.unassigned", "unassigned")); - tableZNode = ZKUtil.joinZNode(baseZNode, - conf.get("zookeeper.znode.tableEnableDisable", "table")); + this.baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + this.rootServerZNode = getZNodeRelativeToBase(conf, + "zookeeper.znode.rootserver", "root-region-server"); + this.rsZNode = getZNodeRelativeToBase(conf, + "zookeeper.znode.rs", "rs"); + this.masterAddressZNode = getZNodeRelativeToBase(conf, + "zookeeper.znode.master", "master"); + this.clusterStateZNode = getZNodeRelativeToBase(conf, + "zookeeper.znode.state", "shutdown"); + this.assignmentZNode = getZNodeRelativeToBase(conf, + "zookeeper.znode.unassigned", "unassigned"); + this.tableZNode = getZNodeRelativeToBase(conf, + "zookeeper.znode.tableEnableDisable", "table"); } /** + * Creates a znode path that is under this watchers baseZNode. + * Use to obtain znode path that is a subdirectory of this watchers' + * base znode. + * @param c + * @param configurationKey + * @param subZnodeDefault + * @return Returns result of ZKUtil.joinZNode(this.baseZNode, + * c.get(configurationKey, subZnodeDefault)); + */ + public String getZNodeRelativeToBase(final Configuration c, + final String configurationKey, final String subZnodeDefault) { + return ZKUtil.joinZNode(this.baseZNode, + c.get(configurationKey, subZnodeDefault)); + } + + /** * Register the specified listener to receive ZooKeeper events. * @param listener */ Index: src/main/java/org/apache/hadoop/hbase/zookeeper/RegionSplitsTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/RegionSplitsTracker.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/RegionSplitsTracker.java (revision 0) @@ -0,0 +1,168 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks activity in the region splits directory node up in ZK. + * Regionservers tell others about their splits by putting a znode up in + * a 'splits' directory in zk. The master is recipient and will clean the + * message from zk after reading it. After creating an instance, call + * {@link #start()} to get it going. + * @see #messageSplitHappened(ZooKeeperWatcher, Configuration, HServerInfo, HRegionInfo, HRegionInfo, HRegionInfo) + */ +public class RegionSplitsTracker extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(RegionSplitsTracker.class); + private final String regionSplitsZNodeDir; + private final SplitHandler handler; + + public RegionSplitsTracker(final ZooKeeperWatcher watcher, + final Configuration c, final SplitHandler handler) { + super(watcher); + this.handler = handler; + this.regionSplitsZNodeDir = getRegionSplitsZNodeDir(watcher, c); + } + + /** + * @param watcher + * @param c + * @return Path to the region splits dir, usually /hbase/splits. + */ + public static String getRegionSplitsZNodeDir(final ZooKeeperWatcher watcher, + final Configuration c) { + return watcher.getZNodeRelativeToBase(c, + "zookeeper.znode.regionSplits", "splits"); + } + + /** + * Starts the tracking of region splits directory up in zk. + *

Splits tracking will begin after this method is called. + * @throws KeeperException + * @throws IOException + */ + public synchronized void start() throws KeeperException, IOException { + this.watcher.registerListener(this); + ZKUtil.createAndFailSilent(this.watcher, this.regionSplitsZNodeDir); + List datas = + ZKUtil.watchAndGetNewChildren(this.watcher, this.regionSplitsZNodeDir); + // If any, clean them up. + if (datas != null) { + for (NodeAndData data: datas) { + LOG.debug("Processing found " + data.getNode()); + processNode(data.getData()); + } + } + } + + private void processNode(final byte [] data) throws IOException { + DataInputBuffer in = new DataInputBuffer(); + in.reset(data, 0); + HServerInfo hsi = new HServerInfo(); + hsi.readFields(in); + HRegionInfo parent = getHRegionInfo(in); + HRegionInfo a = getHRegionInfo(in); + HRegionInfo b = getHRegionInfo(in); + this.handler.handleSplit(hsi, parent, a, b); + } + + private synchronized HRegionInfo getHRegionInfo(final DataInput in) + throws IOException { + HRegionInfo hri = new HRegionInfo(); + hri.readFields(in); + return hri; + } + + public synchronized void stop() { + // Noop + } + + @Override + public synchronized void nodeCreated(String path) { + if (!path.startsWith(this.regionSplitsZNodeDir)) return; + try { + byte [] data = + ZKUtil.getDataAndWatch(this.watcher, this.regionSplitsZNodeDir); + // Handle it + processNode(data); + // Done, remove it. + ZKUtil.deleteNode(this.watcher, path); + } catch (KeeperException e) { + this.watcher.abort("Failed process nodeCreated", e); + } catch (IOException e) { + this.watcher.abort("Failed process nodeCreated", e); + } + } + + /** + * Implementation is called to handle report of split to zk. + */ + public interface SplitHandler { + /** + * Update inmemory structures. + * @param hsi Server that reported the split + * @param parent Parent region that was split + * @param a Daughter region A + * @param b Daughter region B + */ + public void handleSplit(final HServerInfo hsi, + final HRegionInfo parent, final HRegionInfo a, final HRegionInfo b); + } + + /** + * Utility method for writing split message up into zookeeper. The created + * znode will be noticed by this {@link RegionSplitsTracker} class. + * @param watcher Watcher to use communicating with zk. + * @param c Configuration + * @param hsi ServerInfo that this message originated on. + * @param p Parent region, the region that was split. + * @param a First daughter of split. + * @param b Other daughter of split. + * @throws IOException + * @throws KeeperException + */ + public static synchronized void messageSplitHappened(final ZooKeeperWatcher watcher, + final Configuration c, final HServerInfo hsi, + final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) + throws IOException, KeeperException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + hsi.write(out); + p.write(out); + a.write(out); + b.write(out); + out.close(); + String znode = getRegionSplitsZNodeDir(watcher, c); + ZKUtil.createSetData(watcher, znode, baos.toByteArray()); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (revision 1066564) +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (working copy) @@ -431,7 +431,7 @@ } // Look for any exception - for (Future future : futures) { + for (Future future : futures) { try { future.get(); } catch (InterruptedException e) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1066564) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -169,7 +169,7 @@ } return; } - + this.server.getZooKeeper(). // Now tell the master about the new regions. If we fail here, its OK. // Basescanner will do fix up. And reporting split to master is going away. // TODO: Verify this still holds in new master rewrite.