From 151ddb3d994ef32678d5a2d630172ca430426965 Mon Sep 17 00:00:00 2001 From: Ashu Pachauri Date: Mon, 7 Aug 2017 18:10:33 -0700 Subject: [PATCH] HBASE-18398: Snapshot operation fails with FileNotFoundException --- .../apache/hadoop/hbase/regionserver/HRegion.java | 21 ++- .../apache/hadoop/hbase/regionserver/HStore.java | 10 ++ .../apache/hadoop/hbase/regionserver/Region.java | 9 +- .../apache/hadoop/hbase/regionserver/Store.java | 12 ++ .../snapshot/FlushSnapshotSubprocedure.java | 31 ++-- .../hadoop/hbase/snapshot/SnapshotManifest.java | 30 +++- .../hbase/snapshot/TestRegionSnapshotTask.java | 199 +++++++++++++++++++++ 7 files changed, 290 insertions(+), 22 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 31357adac4..39d859d419 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7922,7 +7922,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi case DELETE: case BATCH_MUTATE: case COMPACT_REGION: - // when a region is in recovering state, no read, split or merge is allowed + case SNAPSHOT: + // when a region is in recovering state, no read, split, merge or snapshot is allowed if (isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() + @@ -7946,6 +7947,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi lock.readLock().unlock(); throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed"); } + // The unit for snapshot is a region. So, all stores for this region must be + // prepared for snapshot operation before proceeding. + if (op == Operation.SNAPSHOT) { + for (Store store : stores.values()) { + store.preSnapshotOperation(); + } + } try { if (coprocessorHost != null) { coprocessorHost.postStartRegionOperation(op); @@ -7961,12 +7969,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi closeRegionOperation(Operation.ANY); } - /** - * Closes the lock. This needs to be called in the finally block corresponding - * to the try block of {@link #startRegionOperation(Operation)} - * @throws IOException - */ + @Override public void closeRegionOperation(Operation operation) throws IOException { + if (operation == Operation.SNAPSHOT) { + for (Store store: stores.values()) { + store.postSnapshotOperation(); + } + } lock.readLock().unlock(); if (coprocessorHost != null) { coprocessorHost.postCloseRegionOperation(operation); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 2d9e0f21fa..bd62ae6185 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2487,6 +2487,16 @@ public class HStore implements Store { } @Override + public void preSnapshotOperation() { + archiveLock.lock(); + } + + @Override + public void postSnapshotOperation() { + archiveLock.unlock(); + } + + @Override public synchronized void closeAndArchiveCompactedFiles() throws IOException { // ensure other threads do not attempt to archive the same files on close() archiveLock.lock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 48672a30a5..fe17cb29f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -247,7 +247,7 @@ public interface Region extends ConfigurationObserver { */ enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT + REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT } /** @@ -277,6 +277,13 @@ public interface Region extends ConfigurationObserver { */ void closeRegionOperation() throws IOException; + /** + * Closes the region operation lock. This needs to be called in the finally block corresponding + * to the try block of {@link #startRegionOperation(Operation)} + * @throws IOException + */ + void closeRegionOperation(Operation op) throws IOException; + // Row write locks /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index fd9de9b794..c4387f0583 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -577,4 +577,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @return true if the memstore may need some extra memory space */ boolean isSloppyMemstore(); + + /** + * Sets the store up for a region level snapshot operation. + * @see #postSnapshotOperation() + */ + void preSnapshotOperation(); + + /** + * Perform tasks needed after the completion of snapshot operation. + * @see #preSnapshotOperation() + */ + void postSnapshotOperation(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index b30d622eaa..281de186ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; @@ -74,10 +75,18 @@ public class FlushSnapshotSubprocedure extends Subprocedure { /** * Callable for adding files to snapshot manifest working dir. Ready for multithreading. */ - private class RegionSnapshotTask implements Callable { - Region region; - RegionSnapshotTask(Region region) { + public static class RegionSnapshotTask implements Callable { + private Region region; + private boolean skipFlush; + private ForeignExceptionDispatcher monitor; + private SnapshotDescription snapshotDesc; + + public RegionSnapshotTask(Region region, SnapshotDescription snapshotDesc, + boolean skipFlush, ForeignExceptionDispatcher monitor) { this.region = region; + this.skipFlush = skipFlush; + this.monitor = monitor; + this.snapshotDesc = snapshotDesc; } @Override @@ -87,10 +96,10 @@ public class FlushSnapshotSubprocedure extends Subprocedure { // snapshots that involve multiple regions and regionservers. It is still possible to have // an interleaving such that globally regions are missing, so we still need the verification // step. - LOG.debug("Starting region operation on " + region); - region.startRegionOperation(); + LOG.debug("Starting snapshot operation on " + region); + region.startRegionOperation(Operation.SNAPSHOT); try { - if (snapshotSkipFlush) { + if (skipFlush) { /* * This is to take an online-snapshot without force a coordinated flush to prevent pause * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure @@ -123,15 +132,15 @@ public class FlushSnapshotSubprocedure extends Subprocedure { throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts"); } } - ((HRegion)region).addRegionToSnapshot(snapshot, monitor); - if (snapshotSkipFlush) { + ((HRegion)region).addRegionToSnapshot(snapshotDesc, monitor); + if (skipFlush) { LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed."); } else { LOG.debug("... Flush Snapshotting region " + region.toString() + " completed."); } } finally { - LOG.debug("Closing region operation on " + region); - region.closeRegionOperation(); + LOG.debug("Closing snapshot operation on " + region); + region.closeRegionOperation(Operation.SNAPSHOT); } return null; } @@ -155,7 +164,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure { // Add all hfiles already existing in region. for (Region region : regions) { // submit one task per region for parallelize by region. - taskManager.submitTask(new RegionSnapshotTask(region)); + taskManager.submitTask(new RegionSnapshotTask(region, snapshot, snapshotSkipFlush, monitor)); monitor.rethrowException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java index 86687d9ccb..f70fe9e0aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -112,6 +113,7 @@ public final class SnapshotManifest { final Path workingDir, final SnapshotDescription desc, final ForeignExceptionSnare monitor) { return new SnapshotManifest(conf, fs, workingDir, desc, monitor); + } /** @@ -162,9 +164,15 @@ public final class SnapshotManifest { } public void addMobRegion(HRegionInfo regionInfo) throws IOException { - // 0. Get the ManifestBuilder/RegionVisitor + // Get the ManifestBuilder/RegionVisitor RegionVisitor visitor = createRegionVisitor(desc); + // Visit the region and add it to the manifest + addMobRegion(regionInfo, visitor); + } + + @VisibleForTesting + protected void addMobRegion(HRegionInfo regionInfo, RegionVisitor visitor) throws IOException { // 1. dump region meta info into the snapshot directory LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot."); Object regionData = visitor.regionOpen(regionInfo); @@ -203,9 +211,15 @@ public final class SnapshotManifest { * This is used by the "online snapshot" when the table is enabled. */ public void addRegion(final HRegion region) throws IOException { - // 0. Get the ManifestBuilder/RegionVisitor + // Get the ManifestBuilder/RegionVisitor RegionVisitor visitor = createRegionVisitor(desc); + // Visit the region and add it to the manifest + addRegion(region, visitor); + } + + @VisibleForTesting + protected void addRegion(final HRegion region, RegionVisitor visitor) throws IOException { // 1. dump region meta info into the snapshot directory LOG.debug("Storing '" + region + "' region-info for snapshot."); Object regionData = visitor.regionOpen(region.getRegionInfo()); @@ -216,7 +230,8 @@ public final class SnapshotManifest { for (Store store : region.getStores()) { // 2.1. build the snapshot reference for the store - Object familyData = visitor.familyOpen(regionData, store.getColumnFamilyDescriptor().getName()); + Object familyData = visitor.familyOpen(regionData, + store.getColumnFamilyDescriptor().getName()); monitor.rethrowException(); List storeFiles = new ArrayList<>(store.getStorefiles()); @@ -243,9 +258,16 @@ public final class SnapshotManifest { * This is used by the "offline snapshot" when the table is disabled. */ public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException { - // 0. Get the ManifestBuilder/RegionVisitor + // Get the ManifestBuilder/RegionVisitor RegionVisitor visitor = createRegionVisitor(desc); + // Visit the region and add it to the manifest + addRegion(tableDir, regionInfo, visitor); + } + + @VisibleForTesting + protected void addRegion(final Path tableDir, final HRegionInfo regionInfo, RegionVisitor visitor) + throws IOException { boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo); try { Path baseDir = tableDir; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java new file mode 100644 index 0000000000..6c61f373cd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRegionSnapshotTask.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.snapshot; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +/** + * Testing the region snapshot task on a cluster. + * @see org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure.RegionSnapshotTask + */ +@Category({MediumTests.class, RegionServerTests.class}) +public class TestRegionSnapshotTask { + private final Log LOG = LogFactory.getLog(getClass()); + + private static HBaseTestingUtility TEST_UTIL; + private static Configuration conf; + private static FileSystem fs; + private static Path rootDir; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + + conf = TEST_UTIL.getConfiguration(); + + // Try to frequently clean up compacted files + conf.setInt("hbase.hfile.compaction.discharger.interval", 1000); + conf.setInt("hbase.master.hfilecleaner.ttl", 1000); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); + + rootDir = FSUtils.getRootDir(conf); + fs = TEST_UTIL.getTestFileSystem(); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests adding a region to the snapshot manifest while compactions are running on the region. + * The idea is to slow down the process of adding a store file to the manifest while + * triggering compactions on the region, allowing the store files to be marked for archival while + * snapshot operation is running. + * This test checks for the correct behavior in such a case that the compacted files should + * not be moved around if a snapshot operation is in progress. + * See HBASE-18398 + */ + @Test(timeout = 30000) + public void testAddRegionWithCompactions() throws Exception { + final TableName tableName = TableName.valueOf("test_table"); + Table table = setupTable(tableName); + + List hRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName); + + final SnapshotProtos.SnapshotDescription snapshot = + SnapshotProtos.SnapshotDescription.newBuilder() + .setTable(tableName.getNameAsString()) + .setType(SnapshotProtos.SnapshotDescription.Type.FLUSH) + .setName("test_table_snapshot") + .setVersion(SnapshotManifestV2.DESCRIPTOR_VERSION) + .build(); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); + + final HRegion region = spy(hRegions.get(0)); + + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); + final SnapshotManifest manifest = + SnapshotManifest.create(conf, fs, workingDir, snapshot, monitor); + manifest.addTableDescriptor(table.getTableDescriptor()); + + if (!fs.exists(workingDir)) { + fs.mkdirs(workingDir); + } + assertTrue(fs.exists(workingDir)); + SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, fs); + + doAnswer(__ -> { + addRegionToSnapshot(snapshot, region, manifest); + return null; + }).when(region).addRegionToSnapshot(snapshot, monitor); + + FlushSnapshotSubprocedure.RegionSnapshotTask snapshotTask = + new FlushSnapshotSubprocedure.RegionSnapshotTask(region, snapshot, true, monitor); + ExecutorService executor = Executors.newFixedThreadPool(1); + Future f = executor.submit(snapshotTask); + + // Trigger major compaction and wait for snaphot operation to finish + LOG.info("Starting major compaction"); + region.compact(true); + LOG.info("Finished major compaction"); + f.get(); + + // Consolidate region manifests into a single snapshot manifest + manifest.consolidate(); + + // Make sure that the region manifest exists, which means the snapshot operation succeeded + assertNotNull(manifest.getRegionManifests()); + // Sanity check, there should be only one region + assertEquals(1, manifest.getRegionManifests().size()); + + // Make sure that no files went missing after the snapshot operation + SnapshotReferenceUtil.verifySnapshot(conf, fs, manifest); + } + + private void addRegionToSnapshot(SnapshotProtos.SnapshotDescription snapshot, + HRegion region, SnapshotManifest manifest) throws Exception { + LOG.info("Adding region to snapshot: " + region.getRegionInfo().getRegionNameAsString()); + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); + SnapshotManifest.RegionVisitor visitor = createRegionVisitorWithDelay(snapshot, workingDir); + manifest.addRegion(region, visitor); + LOG.info("Added the region to snapshot: " + region.getRegionInfo().getRegionNameAsString()); + } + + private SnapshotManifest.RegionVisitor createRegionVisitorWithDelay( + SnapshotProtos.SnapshotDescription desc, Path workingDir) { + return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir) { + @Override + public void storeFile(final SnapshotProtos.SnapshotRegionManifest.Builder region, + final SnapshotProtos.SnapshotRegionManifest.FamilyFiles.Builder family, + final StoreFileInfo storeFile) throws IOException { + try { + LOG.debug("Introducing delay before adding store file to manifest"); + Thread.sleep(2000); + } catch (InterruptedException ex) { + LOG.error("Interrupted due to error: " + ex); + } + super.storeFile(region, family, storeFile); + } + }; + } + + private Table setupTable(TableName tableName) throws Exception { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + // Flush many files, but do not compact immediately + builder + .setMemStoreFlushSize(5000) + .setConfiguration("hbase.hstore.compactionThreshold", "250"); + TableDescriptor td = builder.build(); + byte[] fam = Bytes.toBytes("fam"); + Table table = TEST_UTIL.createTable(td, new byte[][] {fam}, + TEST_UTIL.getConfiguration()); + TEST_UTIL.loadTable(table, fam); + return table; + } +} -- 2.13.0