Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 995849) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -609,6 +609,18 @@ return createMultiRegions(getConfiguration(), table, columnFamily); } + public static final byte[][] KEYS = { + HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), + Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), + Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), + Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), + Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), + Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), + Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), + Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), + Bytes.toBytes("xxx"), Bytes.toBytes("yyy") + }; + /** * Creates many regions names "aaa" to "zzz". * @param c Configuration to use. @@ -620,17 +632,6 @@ public int createMultiRegions(final Configuration c, final HTable table, final byte[] columnFamily) throws IOException { - byte[][] KEYS = { - HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), - Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), - Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), - Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), - Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), - Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), - Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), - Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), - Bytes.toBytes("xxx"), Bytes.toBytes("yyy") - }; return createMultiRegions(c, table, columnFamily, KEYS); } Index: src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java (revision 995849) +++ src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java (working copy) @@ -1,419 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; - -public class TestMultiParallel extends MultiRegionTable { - // This test needs to be rewritten to use HBaseTestingUtility -- St.Ack 20100910 - private static final Log LOG = LogFactory.getLog(TestMultiParallel.class); - private static final byte[] VALUE = Bytes.toBytes("value"); - private static final byte[] QUALIFIER = Bytes.toBytes("qual"); - private static final String FAMILY = "family"; - private static final String TEST_TABLE = "multi_test_table"; - private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); - private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); - - List keys = new ArrayList(); - - public TestMultiParallel() { - super(2, FAMILY); - desc = new HTableDescriptor(TEST_TABLE); - desc.addFamily(new HColumnDescriptor(FAMILY)); - makeKeys(); - } - - private void makeKeys() { - // Create a "non-uniform" test set with the following characteristics: - // a) Unequal number of keys per region - - // Don't use integer as a multiple, so that we have a number of keys that is - // not a multiple of the number of regions - int numKeys = (int) ((float) KEYS.length * 10.33F); - - for (int i = 0; i < numKeys; i++) { - int kIdx = i % KEYS.length; - byte[] k = KEYS[kIdx]; - byte[] cp = new byte[k.length + 1]; - System.arraycopy(k, 0, cp, 0, k.length); - cp[k.length] = new Integer(i % 256).byteValue(); - keys.add(cp); - } - - // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which - // should work) - // c) keys are not in sorted order (within a region), to ensure that the - // sorting code and index mapping doesn't break the functionality - for (int i = 0; i < 100; i++) { - int kIdx = i % KEYS.length; - byte[] k = KEYS[kIdx]; - byte[] cp = new byte[k.length + 1]; - System.arraycopy(k, 0, cp, 0, k.length); - cp[k.length] = new Integer(i % 256).byteValue(); - keys.add(cp); - } - } - - public void testBatchWithGet() throws Exception { - LOG.info("test=testBatchWithGet"); - HTable table = new HTable(conf, TEST_TABLE); - - // load test data - List puts = constructPutRequests(); - table.batch(puts); - - // create a list of gets and run it - List gets = new ArrayList(); - for (byte[] k : keys) { - Get get = new Get(k); - get.addColumn(BYTES_FAMILY, QUALIFIER); - gets.add(get); - } - Result[] multiRes = new Result[gets.size()]; - table.batch(gets, multiRes); - - // Same gets using individual call API - List singleRes = new ArrayList(); - for (Row get : gets) { - singleRes.add(table.get((Get) get)); - } - - // Compare results - assertEquals(singleRes.size(), multiRes.length); - for (int i = 0; i < singleRes.size(); i++) { - assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); - KeyValue[] singleKvs = singleRes.get(i).raw(); - KeyValue[] multiKvs = multiRes[i].raw(); - for (int j = 0; j < singleKvs.length; j++) { - assertEquals(singleKvs[j], multiKvs[j]); - assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j] - .getValue())); - } - } - } - - /** - * Only run one Multi test with a forced RegionServer abort. Otherwise, the - * unit tests will take an unnecessarily long time to run. - * - * @throws Exception - */ - public void testFlushCommitsWithAbort() throws Exception { - LOG.info("test=testFlushCommitsWithAbort"); - doTestFlushCommits(true); - } - - public void testFlushCommitsNoAbort() throws Exception { - doTestFlushCommits(false); - } - - public void doTestFlushCommits(boolean doAbort) throws Exception { - LOG.info("test=doTestFlushCommits"); - // Load the data - Configuration newconf = new Configuration(conf); - newconf.setInt("hbase.client.retries.number", 10); - HTable table = new HTable(newconf, TEST_TABLE); - table.setAutoFlush(false); - table.setWriteBufferSize(10 * 1024 * 1024); - - List puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } - table.flushCommits(); - - if (doAbort) { - cluster.abortRegionServer(0); - - // try putting more keys after the abort. same key/qual... just validating - // no exceptions thrown - puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } - - table.flushCommits(); - } - - validateLoadedData(table); - - // Validate server and region count - HBaseAdmin admin = new HBaseAdmin(conf); - ClusterStatus cs = admin.getClusterStatus(); - assertEquals((doAbort ? 1 : 2), cs.getServers()); - for (HServerInfo info : cs.getServerInfo()) { - System.out.println(info); - assertTrue(info.getLoad().getNumberOfRegions() > 10); - } - } - - public void testBatchWithPut() throws Exception { - LOG.info("test=testBatchWithPut"); - Configuration newconf = new Configuration(conf); - newconf.setInt("hbase.client.retries.number", 10); - HTable table = new HTable(newconf, TEST_TABLE); - - // put multiple rows using a batch - List puts = constructPutRequests(); - - Result[] results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); - - if (true) { - cluster.abortRegionServer(0); - - puts = constructPutRequests(); - results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); - } - - validateLoadedData(table); - } - - public void testBatchWithDelete() throws Exception { - LOG.info("test=testBatchWithDelete"); - HTable table = new HTable(conf, TEST_TABLE); - - // Load some data - List puts = constructPutRequests(); - Result[] results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); - - // Deletes - List deletes = new ArrayList(); - for (int i = 0; i < keys.size(); i++) { - Delete delete = new Delete(keys.get(i)); - delete.deleteFamily(BYTES_FAMILY); - deletes.add(delete); - } - results = table.batch(deletes); - validateSizeAndEmpty(results, keys.size()); - - // Get to make sure ... - for (byte[] k : keys) { - Get get = new Get(k); - get.addColumn(BYTES_FAMILY, QUALIFIER); - assertFalse(table.exists(get)); - } - - } - - public void testHTableDeleteWithList() throws Exception { - LOG.info("test=testHTableDeleteWithList"); - HTable table = new HTable(conf, TEST_TABLE); - - // Load some data - List puts = constructPutRequests(); - Result[] results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); - - // Deletes - ArrayList deletes = new ArrayList(); - for (int i = 0; i < keys.size(); i++) { - Delete delete = new Delete(keys.get(i)); - delete.deleteFamily(BYTES_FAMILY); - deletes.add(delete); - } - table.delete(deletes); - assertTrue(deletes.isEmpty()); - - // Get to make sure ... - for (byte[] k : keys) { - Get get = new Get(k); - get.addColumn(BYTES_FAMILY, QUALIFIER); - assertFalse(table.exists(get)); - } - - } - - public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { - LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); - HTable table = new HTable(conf, TEST_TABLE); - - List puts = new ArrayList(); - for (int i = 0; i < 100; i++) { - Put put = new Put(ONE_ROW); - byte[] qual = Bytes.toBytes("column" + i); - put.add(BYTES_FAMILY, qual, VALUE); - puts.add(put); - } - Result[] results = table.batch(puts); - - // validate - validateSizeAndEmpty(results, 100); - - // get the data back and validate that it is correct - List gets = new ArrayList(); - for (int i = 0; i < 100; i++) { - Get get = new Get(ONE_ROW); - byte[] qual = Bytes.toBytes("column" + i); - get.addColumn(BYTES_FAMILY, qual); - gets.add(get); - } - - Result[] multiRes = table.batch(gets); - - int idx = 0; - for (Result r : multiRes) { - byte[] qual = Bytes.toBytes("column" + idx); - validateResult(r, qual, VALUE); - idx++; - } - - } - - public void testBatchWithMixedActions() throws Exception { - LOG.info("test=testBatchWithMixedActions"); - HTable table = new HTable(conf, TEST_TABLE); - - // Load some data to start - Result[] results = table.batch(constructPutRequests()); - validateSizeAndEmpty(results, keys.size()); - - // Batch: get, get, put(new col), delete, get, get of put, get of deleted, - // put - List actions = new ArrayList(); - - byte[] qual2 = Bytes.toBytes("qual2"); - byte[] val2 = Bytes.toBytes("putvalue2"); - - // 0 get - Get get = new Get(keys.get(10)); - get.addColumn(BYTES_FAMILY, QUALIFIER); - actions.add(get); - - // 1 get - get = new Get(keys.get(11)); - get.addColumn(BYTES_FAMILY, QUALIFIER); - actions.add(get); - - // 2 put of new column - Put put = new Put(keys.get(10)); - put.add(BYTES_FAMILY, qual2, val2); - actions.add(put); - - // 3 delete - Delete delete = new Delete(keys.get(20)); - delete.deleteFamily(BYTES_FAMILY); - actions.add(delete); - - // 4 get - get = new Get(keys.get(30)); - get.addColumn(BYTES_FAMILY, QUALIFIER); - actions.add(get); - - // 5 get of the put in #2 (entire family) - get = new Get(keys.get(10)); - get.addFamily(BYTES_FAMILY); - actions.add(get); - - // 6 get of the delete from #3 - get = new Get(keys.get(20)); - get.addColumn(BYTES_FAMILY, QUALIFIER); - actions.add(get); - - // 7 put of new column - put = new Put(keys.get(40)); - put.add(BYTES_FAMILY, qual2, val2); - actions.add(put); - - results = table.batch(actions); - - // Validation - - validateResult(results[0]); - validateResult(results[1]); - validateEmpty(results[2]); - validateEmpty(results[3]); - validateResult(results[4]); - validateResult(results[5]); - validateResult(results[5], qual2, val2); // testing second column in #5 - validateEmpty(results[6]); // deleted - validateEmpty(results[7]); - - // validate last put, externally from the batch - get = new Get(keys.get(40)); - get.addColumn(BYTES_FAMILY, qual2); - Result r = table.get(get); - validateResult(r, qual2, val2); - } - - // // Helper methods //// - - private void validateResult(Result r) { - validateResult(r, QUALIFIER, VALUE); - } - - private void validateResult(Result r, byte[] qual, byte[] val) { - assertTrue(r.containsColumn(BYTES_FAMILY, qual)); - assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual))); - } - - private List constructPutRequests() { - List puts = new ArrayList(); - for (byte[] k : keys) { - Put put = new Put(k); - put.add(BYTES_FAMILY, QUALIFIER, VALUE); - puts.add(put); - } - return puts; - } - - private void validateLoadedData(HTable table) throws IOException { - // get the data back and validate that it is correct - for (byte[] k : keys) { - Get get = new Get(k); - get.addColumn(BYTES_FAMILY, QUALIFIER); - Result r = table.get(get); - assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); - assertEquals(0, Bytes.compareTo(VALUE, r - .getValue(BYTES_FAMILY, QUALIFIER))); - } - } - - private void validateEmpty(Result result) { - assertTrue(result != null); - assertTrue(result.getRow() == null); - assertEquals(0, result.raw().length); - } - - private void validateSizeAndEmpty(Result[] results, int expectedSize) { - // Validate got back the same number of Result objects, all empty - assertEquals(expectedSize, results.length); - for (Result result : results) { - validateEmpty(result); - } - } - -} Index: src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 995849) +++ src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -223,6 +223,8 @@ try { LOG.info("Hook closing fs=" + this.fs); this.fs.close(); + } catch (NullPointerException npe) { + LOG.debug("Need to fix these: " + npe.toString()); } catch (IOException e) { LOG.warn("Running hook", e); } Index: src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (revision 0) @@ -0,0 +1,479 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.catalog; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.MultiPut; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiResponse; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Progressable; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test {@link CatalogTracker} + */ +public class TestCatalogTracker { + private static final Log LOG = LogFactory.getLog(TestCatalogTracker.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final HServerAddress HSA = + new HServerAddress("example.org:1234"); + private ZooKeeperWatcher watcher; + private Abortable abortable; + + @BeforeClass public static void beforeClass() throws Exception { + UTIL.startMiniZKCluster(); + } + + @AfterClass public static void afterClass() throws IOException { + UTIL.getZkCluster().shutdown(); + } + + @Before public void before() throws IOException { + this.abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + }; + this.watcher = new ZooKeeperWatcher(UTIL.getConfiguration(), + this.getClass().getSimpleName(), this.abortable); + } + + @After public void after() { + this.watcher.close(); + } + + private CatalogTracker constructAndStartCatalogTracker() + throws IOException, InterruptedException { + return constructAndStartCatalogTracker(null); + } + + private CatalogTracker constructAndStartCatalogTracker(final HConnection c) + throws IOException, InterruptedException { + CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable); + ct.start(); + return ct; + } + + @Test (expected = NotAllMetaRegionsOnlineException.class) + public void testTimeoutWaitForRoot() + throws IOException, InterruptedException { + final CatalogTracker ct = constructAndStartCatalogTracker(); + ct.waitForRoot(100); + } + + @Test (expected = NotAllMetaRegionsOnlineException.class) + public void testTimeoutWaitForMeta() + throws IOException, InterruptedException { + final CatalogTracker ct = constructAndStartCatalogTracker(); + ct.waitForMeta(100); + } + + /** + * Test waiting on root w/ no timeout specified. + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test public void testNoTimeoutWaitForRoot() + throws IOException, InterruptedException, KeeperException { + final CatalogTracker ct = constructAndStartCatalogTracker(); + HServerAddress hsa = ct.getRootLocation(); + Assert.assertNull(hsa); + + // Now test waiting on root location getting set. + + Thread t = new WaitOnMetaThread(ct); + startWaitAliveThenWaitItLives(t, 1000); + // Set a root location. + hsa = setRootLocation(); + // Join the thread... should exit shortly. + t.join(); + // Now root is available. + Assert.assertTrue(ct.getRootLocation().equals(hsa)); + + // Now test Progressable + RootLocationEditor.deleteRootLocation(this.watcher); + CountingProgressable p = new CountingProgressable(); + t = new WaitOnMetaThread(ct, p); + startWaitAliveThenWaitItLives(t, 1000); + // Set a root location. + RootLocationEditor.setRootLocation(this.watcher, new HServerAddress("example.org:1234")); + // Join the thread... should exit shortly. + t.join(); + LOG.info("Counter=" + p.counter.get()); + Assert.assertTrue(p.counter.get() > 0); + } + + private HServerAddress setRootLocation() throws KeeperException { + RootLocationEditor.setRootLocation(this.watcher, HSA); + return HSA; + } + + /** + * A do-nothing {@link HRegionInterface} implementation + */ + class HRegionInterfaceImplementation implements HRegionInterface { + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void stop(String why) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isStopped() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void abort(String why, Throwable e) { + // TODO Auto-generated method stub + + } + + @Override + public HRegionInfo getRegionInfo(byte[] regionName) + throws NotServingRegionException { + // We always return the meta region. + return HRegionInfo.FIRST_META_REGIONINFO; + } + + @Override + public Result getClosestRowBefore(byte[] regionName, byte[] row, + byte[] family) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result get(byte[] regionName, Get get) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean exists(byte[] regionName, Get get) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void put(byte[] regionName, Put put) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int put(byte[] regionName, List puts) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void delete(byte[] regionName, Delete delete) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int delete(byte[] regionName, List deletes) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family, + byte[] qualifier, byte[] value, Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public long incrementColumnValue(byte[] regionName, byte[] row, + byte[] family, byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long openScanner(byte[] regionName, Scan scan) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Result next(long scannerId) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result[] next(long scannerId, int numberOfRows) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void close(long scannerId) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public long lockRow(byte[] regionName, byte[] row) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void unlockRow(byte[] regionName, long lockId) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public NavigableSet getOnlineRegions() { + // TODO Auto-generated method stub + return null; + } + + @Override + public HServerInfo getHServerInfo() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public MultiResponse multi(MultiAction multi) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void bulkLoadHFile(String hfilePath, byte[] regionName, + byte[] familyName) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void openRegion(HRegionInfo region) { + // TODO Auto-generated method stub + + } + + @Override + public boolean closeRegion(HRegionInfo region) + throws NotServingRegionException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void flushRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException { + // TODO Auto-generated method stub + + } + + @Override + public void splitRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException { + // TODO Auto-generated method stub + + } + + @Override + public void compactRegion(HRegionInfo regionInfo, boolean major) + throws NotServingRegionException, IOException { + // TODO Auto-generated method stub + + } + + @Override + public void replicateLogEntries(Entry[] entries) throws IOException { + // TODO Auto-generated method stub + + } + + } + + /** + * Test waiting on meta w/ no timeout specified. + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test public void testNoTimeoutWaitForMeta() + throws IOException, InterruptedException, KeeperException { + HConnection connection = Mockito.mock(HConnection.class); + Mockito.when(connection.getHRegionConnection((HServerAddress)Mockito.anyObject(), Mockito.anyBoolean())). + thenReturn(new HRegionInterfaceImplementation()); + final CatalogTracker ct = constructAndStartCatalogTracker(connection); + HServerAddress hsa = ct.getMetaLocation(); + Assert.assertNull(hsa); + + // Now test waiting on meta location getting set. + + Thread t = new WaitOnMetaThread(ct) { + @Override + void doWaiting() throws InterruptedException { + this.ct.waitForMeta(); + } + }; + startWaitAliveThenWaitItLives(t, 1000); + // Set a root location else setting meta will fail w/ NPE. + hsa = setRootLocation(); + HServerInfo hsi = new HServerInfo(hsa, System.currentTimeMillis(), 1, "example.org"); + MetaEditor.updateMetaLocation(ct, HRegionInfo.FIRST_META_REGIONINFO, hsi); + // Join the thread... should exit shortly. + t.join(); + // Now root is available. + Assert.assertTrue(ct.getRootLocation().equals(hsa)); + + // Now test Progressable + RootLocationEditor.deleteRootLocation(this.watcher); + final AtomicInteger counter = new AtomicInteger(0); + CountingProgressable p = new CountingProgressable(); + t = new WaitOnMetaThread(ct, p) { + @Override + void doWaiting() throws InterruptedException { + this.ct.waitForMeta(this.p); + } + }; + startWaitAliveThenWaitItLives(t, 1000); + // Set a root location. + RootLocationEditor.setRootLocation(this.watcher, new HServerAddress("example.org:1234")); + // Join the thread... should exit shortly. + t.join(); + LOG.info("Counter=" + counter.get()); + Assert.assertTrue(counter.get() > 0); + } + + private void startWaitAliveThenWaitItLives(final Thread t, final int ms) { + t.start(); + while(!t.isAlive()) { + // Wait + } + // Wait one second. + Threads.sleep(ms); + Assert.assertTrue("Assert " + t.getName() + " still waiting", t.isAlive()); + } + + class CountingProgressable implements Progressable { + final AtomicInteger counter = new AtomicInteger(0); + @Override + public void progress() { + this.counter.incrementAndGet(); + } + } + + /** + * Wait on META. + * Default is wait on -ROOT-. + */ + class WaitOnMetaThread extends Thread { + final CatalogTracker ct; + Progressable p; + + WaitOnMetaThread(final CatalogTracker ct) { + this(ct, null); + } + + WaitOnMetaThread(final CatalogTracker ct, final Progressable p) { + super("WaitOnMeta"); + this.ct = ct; + this.p = p; + } + + @Override + public void run() { + try { + doWaiting(); + } catch (InterruptedException e) { + throw new RuntimeException("Failed wait on root", e); + } + LOG.info("Exiting " + getName()); + } + + void doWaiting() throws InterruptedException { + if (this.p == null) { + this.ct.waitForRoot(); + } else { + this.ct.waitForRoot(this.p); + } + } + } +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (revision 0) @@ -0,0 +1,421 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMultiParallel { + private static final Log LOG = LogFactory.getLog(TestMultiParallel.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] QUALIFIER = Bytes.toBytes("qual"); + private static final String FAMILY = "family"; + private static final String TEST_TABLE = "multi_test_table"; + private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); + private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); + private static final byte [][] KEYS = makeKeys(); + + @BeforeClass public static void beforeClass() throws Exception { + UTIL.startMiniCluster(2); + HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY)); + UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY)); + } + + @AfterClass public static void afterClass() throws IOException { + UTIL.getMiniHBaseCluster().shutdown(); + } + + private static byte[][] makeKeys() { + byte [][] starterKeys = HBaseTestingUtility.KEYS; + // Create a "non-uniform" test set with the following characteristics: + // a) Unequal number of keys per region + + // Don't use integer as a multiple, so that we have a number of keys that is + // not a multiple of the number of regions + int numKeys = (int) ((float) starterKeys.length * 10.33F); + + List keys = new ArrayList(); + for (int i = 0; i < numKeys; i++) { + int kIdx = i % starterKeys.length; + byte[] k = starterKeys[kIdx]; + byte[] cp = new byte[k.length + 1]; + System.arraycopy(k, 0, cp, 0, k.length); + cp[k.length] = new Integer(i % 256).byteValue(); + keys.add(cp); + } + + // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which + // should work) + // c) keys are not in sorted order (within a region), to ensure that the + // sorting code and index mapping doesn't break the functionality + for (int i = 0; i < 100; i++) { + int kIdx = i % starterKeys.length; + byte[] k = starterKeys[kIdx]; + byte[] cp = new byte[k.length + 1]; + System.arraycopy(k, 0, cp, 0, k.length); + cp[k.length] = new Integer(i % 256).byteValue(); + keys.add(cp); + } + return keys.toArray(new byte [][] {new byte [] {}}); + } + + @Test public void testBatchWithGet() throws Exception { + LOG.info("test=testBatchWithGet"); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + + // load test data + List puts = constructPutRequests(); + table.batch(puts); + + // create a list of gets and run it + List gets = new ArrayList(); + for (byte[] k : KEYS) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + gets.add(get); + } + Result[] multiRes = new Result[gets.size()]; + table.batch(gets, multiRes); + + // Same gets using individual call API + List singleRes = new ArrayList(); + for (Row get : gets) { + singleRes.add(table.get((Get) get)); + } + + // Compare results + Assert.assertEquals(singleRes.size(), multiRes.length); + for (int i = 0; i < singleRes.size(); i++) { + Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); + KeyValue[] singleKvs = singleRes.get(i).raw(); + KeyValue[] multiKvs = multiRes[i].raw(); + for (int j = 0; j < singleKvs.length; j++) { + Assert.assertEquals(singleKvs[j], multiKvs[j]); + Assert.assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j] + .getValue())); + } + } + } + + /** + * Only run one Multi test with a forced RegionServer abort. Otherwise, the + * unit tests will take an unnecessarily long time to run. + * + * @throws Exception + */ + @Test public void testFlushCommitsWithAbort() throws Exception { + LOG.info("test=testFlushCommitsWithAbort"); + doTestFlushCommits(true); + } + + @Test public void testFlushCommitsNoAbort() throws Exception { + doTestFlushCommits(false); + } + + private void doTestFlushCommits(boolean doAbort) throws Exception { + LOG.info("test=doTestFlushCommits"); + // Load the data + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + table.setAutoFlush(false); + table.setWriteBufferSize(10 * 1024 * 1024); + + List puts = constructPutRequests(); + for (Row put : puts) { + table.put((Put) put); + } + table.flushCommits(); + + if (doAbort) { + UTIL.getMiniHBaseCluster().abortRegionServer(0); + + // try putting more keys after the abort. same key/qual... just validating + // no exceptions thrown + puts = constructPutRequests(); + for (Row put : puts) { + table.put((Put) put); + } + + table.flushCommits(); + } + + validateLoadedData(table); + + // Validate server and region count + HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); + ClusterStatus cs = admin.getClusterStatus(); + Assert.assertEquals((doAbort ? 1 : 2), cs.getServers()); + for (HServerInfo info : cs.getServerInfo()) { + System.out.println(info); + Assert.assertTrue(info.getLoad().getNumberOfRegions() > 10); + } + } + + @Test public void testBatchWithPut() throws Exception { + LOG.info("test=testBatchWithPut"); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + + // put multiple rows using a batch + List puts = constructPutRequests(); + + Result[] results = table.batch(puts); + validateSizeAndEmpty(results, KEYS.length); + + if (true) { + UTIL.getMiniHBaseCluster().abortRegionServer(0); + + puts = constructPutRequests(); + results = table.batch(puts); + validateSizeAndEmpty(results, KEYS.length); + } + + validateLoadedData(table); + } + + @Test public void testBatchWithDelete() throws Exception { + LOG.info("test=testBatchWithDelete"); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + + // Load some data + List puts = constructPutRequests(); + Result[] results = table.batch(puts); + validateSizeAndEmpty(results, KEYS.length); + + // Deletes + List deletes = new ArrayList(); + for (int i = 0; i < KEYS.length; i++) { + Delete delete = new Delete(KEYS[i]); + delete.deleteFamily(BYTES_FAMILY); + deletes.add(delete); + } + results = table.batch(deletes); + validateSizeAndEmpty(results, KEYS.length); + + // Get to make sure ... + for (byte[] k : KEYS) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + Assert.assertFalse(table.exists(get)); + } + + } + + @Test public void testHTableDeleteWithList() throws Exception { + LOG.info("test=testHTableDeleteWithList"); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + + // Load some data + List puts = constructPutRequests(); + Result[] results = table.batch(puts); + validateSizeAndEmpty(results, KEYS.length); + + // Deletes + ArrayList deletes = new ArrayList(); + for (int i = 0; i < KEYS.length; i++) { + Delete delete = new Delete(KEYS[i]); + delete.deleteFamily(BYTES_FAMILY); + deletes.add(delete); + } + table.delete(deletes); + Assert.assertTrue(deletes.isEmpty()); + + // Get to make sure ... + for (byte[] k : KEYS) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + Assert.assertFalse(table.exists(get)); + } + + } + + @Test public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { + LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + + List puts = new ArrayList(); + for (int i = 0; i < 100; i++) { + Put put = new Put(ONE_ROW); + byte[] qual = Bytes.toBytes("column" + i); + put.add(BYTES_FAMILY, qual, VALUE); + puts.add(put); + } + Result[] results = table.batch(puts); + + // validate + validateSizeAndEmpty(results, 100); + + // get the data back and validate that it is correct + List gets = new ArrayList(); + for (int i = 0; i < 100; i++) { + Get get = new Get(ONE_ROW); + byte[] qual = Bytes.toBytes("column" + i); + get.addColumn(BYTES_FAMILY, qual); + gets.add(get); + } + + Result[] multiRes = table.batch(gets); + + int idx = 0; + for (Result r : multiRes) { + byte[] qual = Bytes.toBytes("column" + idx); + validateResult(r, qual, VALUE); + idx++; + } + + } + + @Test public void testBatchWithMixedActions() throws Exception { + LOG.info("test=testBatchWithMixedActions"); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); + + // Load some data to start + Result[] results = table.batch(constructPutRequests()); + validateSizeAndEmpty(results, KEYS.length); + + // Batch: get, get, put(new col), delete, get, get of put, get of deleted, + // put + List actions = new ArrayList(); + + byte[] qual2 = Bytes.toBytes("qual2"); + byte[] val2 = Bytes.toBytes("putvalue2"); + + // 0 get + Get get = new Get(KEYS[10]); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 1 get + get = new Get(KEYS[11]); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 2 put of new column + Put put = new Put(KEYS[10]); + put.add(BYTES_FAMILY, qual2, val2); + actions.add(put); + + // 3 delete + Delete delete = new Delete(KEYS[20]); + delete.deleteFamily(BYTES_FAMILY); + actions.add(delete); + + // 4 get + get = new Get(KEYS[30]); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 5 get of the put in #2 (entire family) + get = new Get(KEYS[10]); + get.addFamily(BYTES_FAMILY); + actions.add(get); + + // 6 get of the delete from #3 + get = new Get(KEYS[20]); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 7 put of new column + put = new Put(KEYS[40]); + put.add(BYTES_FAMILY, qual2, val2); + actions.add(put); + + results = table.batch(actions); + + // Validation + + validateResult(results[0]); + validateResult(results[1]); + validateEmpty(results[2]); + validateEmpty(results[3]); + validateResult(results[4]); + validateResult(results[5]); + validateResult(results[5], qual2, val2); // testing second column in #5 + validateEmpty(results[6]); // deleted + validateEmpty(results[7]); + + // validate last put, externally from the batch + get = new Get(KEYS[40]); + get.addColumn(BYTES_FAMILY, qual2); + Result r = table.get(get); + validateResult(r, qual2, val2); + } + + // // Helper methods //// + + private void validateResult(Result r) { + validateResult(r, QUALIFIER, VALUE); + } + + private void validateResult(Result r, byte[] qual, byte[] val) { + Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual)); + Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual))); + } + + private List constructPutRequests() { + List puts = new ArrayList(); + for (byte[] k : KEYS) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + puts.add(put); + } + return puts; + } + + private void validateLoadedData(HTable table) throws IOException { + // get the data back and validate that it is correct + for (byte[] k : KEYS) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + Result r = table.get(get); + Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); + Assert.assertEquals(0, Bytes.compareTo(VALUE, r + .getValue(BYTES_FAMILY, QUALIFIER))); + } + } + + private void validateEmpty(Result result) { + Assert.assertTrue(result != null); + Assert.assertTrue(result.getRow() == null); + Assert.assertEquals(0, result.raw().length); + } + + private void validateSizeAndEmpty(Result[] results, int expectedSize) { + // Validate got back the same number of Result objects, all empty + Assert.assertEquals(expectedSize, results.length); + for (Result result : results) { + validateEmpty(result); + } + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 995849) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -356,7 +356,7 @@ // create the catalog tracker and start it this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, - this, this.conf.getInt("hbase.regionserver.catalog.timeout", -1)); + this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this); @@ -1147,7 +1147,6 @@ RootLocationEditor.setRootLocation(getZooKeeper(), getServerInfo().getServerAddress()); } else if (r.getRegionInfo().isMetaRegion()) { - // TODO: doh, this has weird naming between RootEditor/MetaEditor MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo()); } else { if (daughter) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 995849) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -155,7 +155,7 @@ // Finally, Transition ZK node to OPENED try { - if(ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo, + if (ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo, server.getServerName(), openingVersion) == -1) { LOG.warn("Completed the OPEN of a region but when transitioning from " + " OPENING to OPENED got a version mismatch, someone else clashed " + Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 995849) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -245,7 +245,7 @@ this.serverManager = new ServerManager(this, this); this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, - this, conf.getInt("hbase.master.catalog.timeout", -1)); + this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, Index: src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (revision 995849) +++ src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Progressable; import org.apache.zookeeper.KeeperException; /** @@ -53,6 +54,9 @@ public class CatalogTracker { private static final Log LOG = LogFactory.getLog(CatalogTracker.class); + // How long we sleep while waiting on flag in a loop. + static final int SLEEP_PERIOD = 100; + private final HConnection connection; private final ZooKeeperWatcher zookeeper; @@ -74,7 +78,7 @@ /** * Constructs the catalog tracker. Find current state of catalog tables and - * begin active tracking by executing {@link #start()}. + * begin active tracking by executing {@link #start()}. Times out immediately. * @param zk * @param connection server connection * @param abortable if fatal exception @@ -92,7 +96,8 @@ * @param zk * @param connection server connection * @param abortable if fatal exception - * @param defaultTimeout Timeout to use. + * @param defaultTimeout Timeout to use. Set to {@link Integer.MAX_VALUE} for + * no timeout. * @throws IOException */ public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection, @@ -148,10 +153,24 @@ */ public void waitForRoot() throws InterruptedException { - rootRegionTracker.getRootRegionLocation(); + waitForRoot(null); } /** + * Waits indefinitely for availability of -ROOT-. Used during + * cluster startup. + * @param p We'll call {@link Progressable#progress()} while waiting. + * @throws InterruptedException if interrupted while waiting + */ + public void waitForRoot(final Progressable p) + throws InterruptedException { + while (!this.rootRegionTracker.isLocationAvailable()) { + Thread.sleep(SLEEP_PERIOD); + if (p != null) p.progress(); + } + } + + /** * Gets the current location for -ROOT- if available and waits * for up to the specified timeout if not immediately available. Returns null * if the timeout elapses before root is available. @@ -235,27 +254,27 @@ */ private HRegionInterface getMetaServerConnection(boolean refresh) throws IOException, InterruptedException { - synchronized(metaAvailable) { - if(metaAvailable.get()) { + synchronized (metaAvailable) { + if (metaAvailable.get()) { HRegionInterface current = getCachedConnection(metaLocation); - if(!refresh) { + if (!refresh) { return current; } - if(verifyRegionLocation(current, META_REGION)) { + if (verifyRegionLocation(current, META_REGION)) { return current; } resetMetaLocation(); } HRegionInterface rootConnection = getRootServerConnection(); - if(rootConnection == null) { + if (rootConnection == null) { return null; } HServerAddress newLocation = MetaReader.readMetaLocation(rootConnection); - if(newLocation == null) { + if (newLocation == null) { return null; } HRegionInterface newConnection = getCachedConnection(newLocation); - if(verifyRegionLocation(newConnection, META_REGION)) { + if (verifyRegionLocation(newConnection, META_REGION)) { setMetaLocation(newLocation); return newConnection; } @@ -269,9 +288,23 @@ * @throws InterruptedException if interrupted while waiting */ public void waitForMeta() throws InterruptedException { + waitForMeta(null); + } + + /** + * Waits indefinitely for availability of .META.. Used during + * cluster startup. + * @param p We'll call {@link Progressable#progress()} while waiting. + * @throws InterruptedException if interrupted while waiting + */ + public void waitForMeta(final Progressable p) throws InterruptedException { synchronized(metaAvailable) { - while(!metaAvailable.get()) { - metaAvailable.wait(); + while (!metaAvailable.get()) { + if (p == null) this.metaAvailable.wait(); + else { + this.metaAvailable.wait(SLEEP_PERIOD); + p.progress(); + } } } } @@ -290,15 +323,15 @@ public HServerAddress waitForMeta(long timeout) throws InterruptedException, IOException, NotAllMetaRegionsOnlineException { long stop = System.currentTimeMillis() + timeout; - synchronized(metaAvailable) { - if(getMetaServerConnection(true) != null) { + synchronized (metaAvailable) { + if (getMetaServerConnection(true) != null) { return metaLocation; } while(!metaAvailable.get() && (timeout == 0 || System.currentTimeMillis() < stop)) { metaAvailable.wait(timeout); } - if(getMetaServerConnection(true) == null) { + if (getMetaServerConnection(true) == null) { throw new NotAllMetaRegionsOnlineException( "Timed out (" + timeout + "ms)"); } Index: src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (revision 995849) +++ src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.catalog; import java.io.IOException; +import java.net.ConnectException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -115,11 +116,15 @@ * @param regionInfo region to update location of * @param serverInfo server the region is located on * @throws IOException + * @throws ConnectException Usually because the regionserver carrying .META. + * is down. + * @throws NullPointerException Because no -ROOT- server connection */ public static void updateMetaLocation(CatalogTracker catalogTracker, HRegionInfo regionInfo, HServerInfo serverInfo) - throws IOException { + throws IOException, ConnectException { HRegionInterface server = catalogTracker.waitForRootServerConnectionDefault(); + if (server == null) throw new NullPointerException("No server for -ROOT-"); updateLocation(server, CatalogTracker.ROOT_REGION, regionInfo, serverInfo); } @@ -152,9 +157,10 @@ * @param catalogRegionName name of catalog region being updated * @param regionInfo region to update location of * @param serverInfo server the region is located on - * @throws IOException + * @throws IOException In particular could throw {@link java.net.ConnectException} + * if the server is down on other end. */ - public static void updateLocation(HRegionInterface server, + private static void updateLocation(HRegionInterface server, byte [] catalogRegionName, HRegionInfo regionInfo, HServerInfo serverInfo) throws IOException { Put put = new Put(regionInfo.getRegionName()); Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 995849) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -1151,7 +1151,7 @@ if (e.getCause() instanceof DoNotRetryIOException) { throw (DoNotRetryIOException) e.getCause(); } - + if (singletonList) { // be richer for reporting in a 1 row case. singleRowCause = e.getCause(); Index: src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (revision 995849) +++ src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (working copy) @@ -273,7 +273,9 @@ byte [] lastKey = null; for(byte [] splitKey : splitKeys) { if(lastKey != null && Bytes.equals(splitKey, lastKey)) { - throw new IllegalArgumentException("All split keys must be unique, found duplicate"); + throw new IllegalArgumentException("All split keys must be unique, " + + "found duplicate: " + Bytes.toStringBinary(splitKey) + + ", " + Bytes.toStringBinary(lastKey)); } lastKey = splitKey; }