f -- parentdir is family,
// then the directory above is the region name.
- String parentRegionName = regionInfo.getEncodedName();
+ String parentRegionName = regionInfoForFs.getEncodedName();
// Write reference with same file id only with the other region name as
// suffix and into the new region location (under same family).
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
@@ -636,12 +644,12 @@ public class HRegionFileSystem {
Path referenceDir = new Path(new Path(mergedDir,
mergedRegion.getEncodedName()), familyName);
// A whole reference to the store file.
- Reference r = Reference.createTopReference(regionInfo.getStartKey());
+ Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
// Add the referred-to regions name as a dot separated suffix.
// See REF_NAME_REGEX regex above. The referred-to regions name is
// up in the path of the passed in f -- parentdir is family,
// then the directory above is the region name.
- String mergingRegionName = regionInfo.getEncodedName();
+ String mergingRegionName = regionInfoForFs.getEncodedName();
// Write reference with same file id only with the other region name as
// suffix and into the new region location (under same family).
Path p = new Path(referenceDir, f.getPath().getName() + "."
@@ -653,7 +661,7 @@ public class HRegionFileSystem {
* Commit a merged region, moving it from the merges temporary directory to
* the proper location in the filesystem.
* @param mergedRegionInfo merged region {@link HRegionInfo}
- * @throws IOException
+ * @throws IOException
*/
void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws IOException {
Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
@@ -731,7 +739,7 @@ public class HRegionFileSystem {
// pb version is much shorter -- we write now w/o the toString version -- so checking length
// only should be sufficient. I don't want to read the file every time to check if it pb
// serialized.
- byte[] content = getRegionInfoFileContent(regionInfo);
+ byte[] content = getRegionInfoFileContent(regionInfoForFs);
try {
Path regionInfoFile = new Path(getRegionDir(), REGION_INFO_FILE);
@@ -747,7 +755,7 @@ public class HRegionFileSystem {
throw new IOException("Unable to remove existing " + regionInfoFile);
}
} catch (FileNotFoundException e) {
- LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfo.getEncodedName());
+ LOG.warn(REGION_INFO_FILE + " file not found for region: " + regionInfoForFs.getEncodedName());
}
// Write HRI to a file in case we need to recover hbase:meta
@@ -759,7 +767,7 @@ public class HRegionFileSystem {
* @param useTempDir indicate whether or not using the region .tmp dir for a safer file creation.
*/
private void writeRegionInfoOnFilesystem(boolean useTempDir) throws IOException {
- byte[] content = getRegionInfoFileContent(regionInfo);
+ byte[] content = getRegionInfoFileContent(regionInfoForFs);
writeRegionInfoOnFilesystem(content, useTempDir);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 039ced7..465d691 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -50,7 +50,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
-import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -235,6 +234,7 @@ import org.cliffc.high_scale_lib.Counter;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -482,6 +482,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
*/
private final int scannerLeaseTimeoutPeriod;
+ // chore for refreshing store files for secondary regions
+ private StorefileRefresherChore storefileRefresher;
+
/**
* The reference to the priority extraction function
*/
@@ -821,6 +824,12 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.isa.getAddress(), 0));
this.pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
+
+ int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
+ , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
+ if (storefileRefreshPeriod > 0) {
+ this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
+ }
}
/**
@@ -946,6 +955,12 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
if (this.nonceManagerChore != null) {
this.nonceManagerChore.interrupt();
}
+ if (this.healthCheckChore != null) {
+ this.healthCheckChore.interrupt();
+ }
+ if (this.storefileRefresher != null) {
+ this.storefileRefresher.interrupt();
+ }
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
rspmHost.stop(this.abortRequested || this.killed);
@@ -1608,6 +1623,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), n + ".nonceCleaner",
uncaughtExceptionHandler);
}
+ if (this.storefileRefresher != null) {
+ Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), n + ".storefileRefresher",
+ uncaughtExceptionHandler);
+ }
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@@ -1894,6 +1913,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.replicationSinkHandler.stopReplicationService();
}
}
+ if (this.storefileRefresher != null) {
+ Threads.shutdown(this.storefileRefresher.getThread());
+ }
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f7dfb17..4df3df4 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -26,6 +26,8 @@ import java.security.KeyException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
@@ -90,6 +92,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* A Store holds a column family in a Region. Its a memstore and a set of zero
@@ -474,10 +477,13 @@ public class HStore implements Store {
*/
private ListIt works by processing a compaction that's been written to disk. + * + *
It is usually invoked at the end of a compaction, but might also be + * invoked at HStore startup, if the prior execution died midway through. + * + *
Moving the compacted TreeMap into place means: + *
+ * 1) Unload all replaced StoreFile, close and collect list to delete. + * 2) Compute new store size + *+ * + * @param compactedFiles list of files that were compacted + * @param newFile StoreFile that is the result of the compaction + */ + @VisibleForTesting + protected void completeCompaction(final Collection
firstValue.
- *
+ *
* @param r
- *
+ *
* @param fs
- *
+ *
* @param firstValue
- *
+ *
* @throws IOException
*/
private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
new file mode 100644
index 0000000..0b9bf2b
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
+ * cluster. See {@link TestRegionServerNoMaster}.
+ */
+@Category(MediumTests.class)
+public class TestRegionReplicas {
+ private static final int NB_SERVERS = 1;
+ private static HTable table;
+ private static final byte[] row = "TestRegionReplicas".getBytes();
+
+ private static HRegionInfo hriPrimary;
+ private static HRegionInfo hriSecondary;
+
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+ private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+ @BeforeClass
+ public static void before() throws Exception {
+ HTU.startMiniCluster(NB_SERVERS);
+ final byte[] tableName = Bytes.toBytes(TestRegionReplicas.class.getSimpleName());
+
+ // Create table then get the single region for our new table.
+ table = HTU.createTable(tableName, f);
+
+ hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
+
+ // mock a secondary region info to open
+ hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
+ hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
+
+ // No master
+ HTU.getHBaseCluster().getMaster().stopMaster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ table.close();
+ HTU.shutdownMiniCluster();
+ }
+
+ @After
+ public void after() throws Exception {
+ // Clean the state if the test failed before cleaning the znode
+ // It does not manage all bad failures, so if there are multiple failures, only
+ // the first one should be looked at.
+ ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
+ }
+
+ private HRegionServer getRS() {
+ return HTU.getMiniHBaseCluster().getRegionServer(0);
+ }
+
+ private void openRegion(HRegionInfo hri) throws Exception {
+ ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
+ // first version is '0'
+ AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(hri, 0, null);
+ AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
+ Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
+ Assert.assertTrue(responseOpen.getOpeningState(0).
+ equals(AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED));
+ checkRegionIsOpened(hri.getEncodedName());
+ }
+
+ private void closeRegion(HRegionInfo hri) throws Exception {
+ ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
+
+ AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
+ hri.getEncodedName(), true);
+ AdminProtos.CloseRegionResponse responseClose = getRS().closeRegion(null, crr);
+ Assert.assertTrue(responseClose.getClosed());
+
+ checkRegionIsClosed(hri.getEncodedName());
+
+ ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), getRS().getServerName());
+ }
+
+ private void checkRegionIsOpened(String encodedRegionName) throws Exception {
+
+ while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+ Thread.sleep(1);
+ }
+
+ Assert.assertTrue(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
+
+ Assert.assertTrue(
+ ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), encodedRegionName, getRS().getServerName()));
+ }
+
+
+ private void checkRegionIsClosed(String encodedRegionName) throws Exception {
+
+ while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+ Thread.sleep(1);
+ }
+
+ try {
+ Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
+ } catch (NotServingRegionException expected) {
+ // That's how it work: if the region is closed we have an exception.
+ }
+
+ // We don't delete the znode here, because there is not always a znode.
+ }
+
+ @Test(timeout = 60000)
+ public void testOpenRegionReplica() throws Exception {
+ openRegion(hriSecondary);
+ try {
+ //load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
+
+ // assert that we can read back from primary
+ Assert.assertEquals(1000, HTU.countRows(table));
+ } finally {
+ HTU.deleteNumericRows(table, f, 0, 1000);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ /** Tests that the meta location is saved for secondary regions */
+ @Test(timeout = 60000)
+ public void testRegionReplicaUpdatesMetaLocation() throws Exception {
+ openRegion(hriSecondary);
+ HTable meta = null;
+ try {
+ meta = new HTable(HTU.getConfiguration(), TableName.META_TABLE_NAME);
+ TestMetaReaderEditor.assertMetaLocation(meta, hriPrimary.getRegionName()
+ , getRS().getServerName(), -1, 1, false);
+ } finally {
+ if (meta != null ) meta.close();
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRegionReplicaGets() throws Exception {
+ try {
+ //load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
+ // assert that we can read back from primary
+ Assert.assertEquals(1000, HTU.countRows(table));
+ // flush so that region replica can read
+ HTU.getHBaseAdmin().flush(table.getTableName());
+
+ openRegion(hriSecondary);
+
+ // first try directly against region
+ HRegion region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
+ assertGet(region, 42, true);
+
+ assertGetRpc(hriSecondary, 42, true);
+
+ } finally {
+ HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ private void assertGet(HRegion region, int value, boolean expect) throws IOException {
+ byte[] row = Bytes.toBytes(String.valueOf(value));
+ Get get = new Get(row);
+ Result result = region.get(get);
+ if (expect) {
+ Assert.assertArrayEquals(row, result.getValue(f, null));
+ } else {
+ result.isEmpty();
+ }
+ }
+
+ // build a mock rpc
+ private void assertGetRpc(HRegionInfo info, int value, boolean expect) throws IOException, ServiceException {
+ byte[] row = Bytes.toBytes(String.valueOf(value));
+ Get get = new Get(row);
+ ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
+ ClientProtos.GetResponse getResp = getRS().get(null, getReq);
+ Result result = ProtobufUtil.toResult(getResp.getResult());
+ if (expect) {
+ Assert.assertArrayEquals(row, result.getValue(f, null));
+ } else {
+ result.isEmpty();
+ }
+ }
+
+ private void restartRegionServer() throws Exception {
+ afterClass();
+ before();
+ }
+
+ @Test(timeout = 300000)
+ public void testRefreshStoreFiles() throws Exception {
+ // enable store file refreshing
+ final int refreshPeriod = 2000; // 2 sec
+ HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
+ HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
+ // restart the region server so that it starts the refresher chore
+ restartRegionServer();
+
+ try {
+ openRegion(hriSecondary);
+
+ //load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
+ // assert that we can read back from primary
+ Assert.assertEquals(1000, HTU.countRows(table));
+ // flush so that region replica can read
+ HTU.getHBaseAdmin().flush(table.getTableName());
+
+ // ensure that chore is run
+ Threads.sleep(4 * refreshPeriod);
+
+ assertGetRpc(hriSecondary, 42, true);
+ assertGetRpc(hriSecondary, 1042, false);
+
+ //load some data to primary
+ HTU.loadNumericRows(table, f, 1000, 1100);
+ HTU.getHBaseAdmin().flush(table.getTableName());
+
+ HTU.loadNumericRows(table, f, 2000, 2100);
+ HTU.getHBaseAdmin().flush(table.getTableName());
+
+ // ensure that chore is run
+ Threads.sleep(4 * refreshPeriod);
+
+ assertGetRpc(hriSecondary, 42, true);
+ assertGetRpc(hriSecondary, 1042, true);
+ assertGetRpc(hriSecondary, 2042, true);
+
+ // ensure that we are see the 3 store files
+ HRegion secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
+ Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
+
+ // force compaction
+ HTU.compact(table.getName(), true);
+
+ long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod;
+ while (System.currentTimeMillis() < wakeUpTime) {
+ assertGetRpc(hriSecondary, 42, true);
+ assertGetRpc(hriSecondary, 1042, true);
+ assertGetRpc(hriSecondary, 2042, true);
+ Threads.sleep(10);
+ }
+
+ // ensure that we see the compacted file only
+ Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
+
+ } finally {
+ HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
+ closeRegion(hriSecondary);
+ }
+ }
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 793b839..3494b4f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -19,6 +19,11 @@
package org.apache.hadoop.hbase.regionserver;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
@@ -78,6 +83,8 @@ import org.apache.hadoop.util.Progressable;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
+import com.google.common.collect.Lists;
+
/**
* Test class for the Store
*/
@@ -130,7 +137,7 @@ public class TestStore extends TestCase {
}
private void init(String methodName) throws IOException {
- init(methodName, HBaseConfiguration.create());
+ init(methodName, TEST_UTIL.getConfiguration());
}
private void init(String methodName, Configuration conf)
@@ -203,7 +210,7 @@ public class TestStore extends TestCase {
int ttl = 4;
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManagerTestHelper.injectEdge(edge);
-
+
Configuration conf = HBaseConfiguration.create();
// Enable the expired store file deletion
conf.setBoolean("hbase.store.delete.expired.storefile", true);
@@ -258,7 +265,7 @@ public class TestStore extends TestCase {
FileSystem fs = FileSystem.get(conf);
// Initialize region
init(getName(), conf);
-
+
int storeFileNum = 4;
for (int i = 1; i <= storeFileNum; i++) {
LOG.info("Adding some data for the store file #"+i);
@@ -278,12 +285,12 @@ public class TestStore extends TestCase {
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
}
-
- private static long getLowestTimeStampFromFS(FileSystem fs,
+
+ private static long getLowestTimeStampFromFS(FileSystem fs,
final Collection