diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto b/hbase-protocol-shaded/src/main/protobuf/HBase.proto index 29067f1..0af2ffd 100644 --- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto +++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto @@ -252,4 +252,19 @@ message CacheEvictionStats { optional int64 bytes_evicted = 2; optional int64 max_cache_size = 3; repeated RegionExceptionMessage exception = 4; +} + +message FlushedStoreSequenceId { + required bytes family = 1; + required uint64 seqId = 2; +} + +message FlushedRegionSequenceId { + required bytes regionEncodedName = 1; + required uint64 seqId = 2; + repeated FlushedStoreSequenceId stores = 3; +} + +message FlushedSequenceId { + repeated FlushedRegionSequenceId regionSequenceId = 1; } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index f20cc61..f4c17ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -857,6 +857,13 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Initializing ZK system trackers"); initializeZKBasedSystemTrackers(); + status.setStatus("Loading last flushed sequence id of regions"); + try { + this.serverManager.loadLastFlushedSequenceIds(); + } catch (IOException e) { + LOG.warn("Failed to load last flushed sequence id of regions" + + " from file system", e); + } // Set ourselves as active Master now our claim has succeeded up in zk. this.activeMaster = true; @@ -941,6 +948,7 @@ public class HMaster extends HRegionServer implements MasterServices { getChoreService().scheduleChore(normalizerChore); this.catalogJanitorChore = new CatalogJanitor(this); getChoreService().scheduleChore(catalogJanitorChore); + this.serverManager.startChore(); status.setStatus("Starting cluster schema service"); initClusterSchemaService(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 86e5530..9f65ad6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -38,13 +38,19 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -52,11 +58,14 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -66,6 +75,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; @@ -107,6 +117,17 @@ public class ServerManager { public static final String WAIT_ON_REGIONSERVERS_INTERVAL = "hbase.master.wait.on.regionservers.interval"; + public static final String PERSIST_FLUSHEDSEQUENCEID = + "hbase.master.persist.flushedsequenceid.enabled"; + + public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true; + + public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL = + "hbase.master.flushedsequenceid.flusher.interval"; + + public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT = + 3 * 60 * 60 * 1000; // 3 hours + private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class); // Set if we are to shutdown the cluster. @@ -118,6 +139,13 @@ public class ServerManager { private final ConcurrentNavigableMap flushedSequenceIdByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + private boolean persistFlushedSequenceId = true; + private volatile boolean isFlushSeqIdPersistInProgress = false; + /** File on hdfs to store last flushed sequence id of regions */ + private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids"; + private FlushedSequenceIdFlusher flushedSeqIdFlusher; + + /** * The last flushed sequence id for a store in a region. */ @@ -201,6 +229,7 @@ public class ServerManager { warningSkew = c.getLong("hbase.master.warningclockskew", 10000); this.connection = connect? master.getClusterConnection(): null; this.rpcControllerFactory = this.connection == null? null: connection.getRpcControllerFactory(); + persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT); } /** @@ -432,6 +461,11 @@ public class ServerManager { this.rsAdmins.remove(serverName); } + @VisibleForTesting + public ConcurrentNavigableMap getFlushedSequenceIdByRegion() { + return flushedSequenceIdByRegion; + } + public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) { RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder(); Long seqId = flushedSequenceIdByRegion.get(encodedRegionName); @@ -609,6 +643,10 @@ public class ServerManager { listener.serverRemoved(serverName); } } + // trigger a persist of flushedSeqId + if (flushedSeqIdFlusher != null) { + flushedSeqIdFlusher.triggerNow(); + } return true; } @@ -976,10 +1014,36 @@ public class ServerManager { } /** + * start chore in ServerManager + */ + public void startChore() { + Configuration c = master.getConfiguration(); + if (persistFlushedSequenceId) { + // when reach here, RegionStates should loaded, firstly, we call remove deleted regions + removeDeletedRegionFromLoadedFlushedSequenceIds(); + int flushPeriod = c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL, + FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT); + flushedSeqIdFlusher = new FlushedSequenceIdFlusher( + "FlushedSequenceIdFlusher", master, flushPeriod); + master.getChoreService().scheduleChore(flushedSeqIdFlusher); + } + } + + /** * Stop the ServerManager. */ public void stop() { - // Nothing to do. + if (flushedSeqIdFlusher != null) { + flushedSeqIdFlusher.cancel(); + } + if (persistFlushedSequenceId) { + try { + persistRegionLastFlushedSequenceIds(); + } catch (IOException e) { + LOG.warn("Failed to persist last flushed sequence id of regions" + + " to file system", e); + } + } } /** @@ -1068,4 +1132,144 @@ public class ServerManager { ServerMetrics serverMetrics = onlineServers.get(serverName); return serverMetrics != null ? serverMetrics.getVersionNumber() : 0; } + + /** + * Persist last flushed sequence id of each region to HDFS + * @throws IOException + */ + private void persistRegionLastFlushedSequenceIds() throws IOException { + if (isFlushSeqIdPersistInProgress) { + return; + } + isFlushSeqIdPersistInProgress = true; + try { + Configuration conf = master.getConfiguration(); + Path rootDir = FSUtils.getRootDir(conf); + Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE); + FileSystem fs = FileSystem.get(conf); + if (fs.exists(lastFlushedSeqIdPath)) { + LOG.info("Rewriting .lastflushedseqids file at: " + + lastFlushedSeqIdPath); + if (!fs.delete(lastFlushedSeqIdPath, false)) { + throw new IOException("Unable to remove existing " + + lastFlushedSeqIdPath); + } + } else { + LOG.info("Writing .lastflushedseqids file at: " + lastFlushedSeqIdPath); + } + FSDataOutputStream out = fs.create(lastFlushedSeqIdPath); + HBaseProtos.FlushedSequenceId.Builder flushedSequenceIdBuilder = + HBaseProtos.FlushedSequenceId.newBuilder(); + try { + for (Entry entry : flushedSequenceIdByRegion.entrySet()) { + HBaseProtos.FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder = + HBaseProtos.FlushedRegionSequenceId.newBuilder(); + flushedRegionSequenceIdBuilder.setRegionEncodedName( + ByteString.copyFrom(entry.getKey())); + flushedRegionSequenceIdBuilder.setSeqId(entry.getValue()); + ConcurrentNavigableMap storeSeqIds = + storeFlushedSequenceIdsByRegion.get(entry.getKey()); + if (storeSeqIds != null) { + for (Entry store : storeSeqIds.entrySet()) { + HBaseProtos.FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder = + HBaseProtos.FlushedStoreSequenceId.newBuilder(); + flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey())); + flushedStoreSequenceIdBuilder.setSeqId(store.getValue()); + flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder); + } + } + flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder); + } + flushedSequenceIdBuilder.build().writeDelimitedTo(out); + } finally { + if (out != null) out.close(); + } + } finally { + isFlushSeqIdPersistInProgress = false; + } + } + + /** + * Load last flushed sequence id of each region from HDFS, if persisted + */ + public void loadLastFlushedSequenceIds() throws IOException { + if (!persistFlushedSequenceId) { + return; + } + Configuration conf = master.getConfiguration(); + Path rootDir = FSUtils.getRootDir(conf); + Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE); + FileSystem fs = FileSystem.get(conf); + if (!fs.exists(lastFlushedSeqIdPath)) { + LOG.info("No .lastflushedseqids found at" + lastFlushedSeqIdPath + " will record last flushed sequence id" + + " for regions by regionserver report all over again"); + return; + } else { + LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath); + } + FSDataInputStream in = fs.open(lastFlushedSeqIdPath); + try { + HBaseProtos.FlushedSequenceId flushedSequenceId = + HBaseProtos.FlushedSequenceId.parseDelimitedFrom(in); + for (HBaseProtos.FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId + .getRegionSequenceIdList()) { + byte[] encodedRegionName = flushedRegionSequenceId + .getRegionEncodedName().toByteArray(); + flushedSequenceIdByRegion + .putIfAbsent(encodedRegionName, flushedRegionSequenceId.getSeqId()); + if (flushedRegionSequenceId.getStoresList() != null + && flushedRegionSequenceId.getStoresList().size() != 0) { + ConcurrentNavigableMap storeFlushedSequenceId = + computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName, + () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); + for (HBaseProtos.FlushedStoreSequenceId flushedStoreSequenceId : flushedRegionSequenceId + .getStoresList()) { + storeFlushedSequenceId + .put(flushedStoreSequenceId.getFamily().toByteArray(), + flushedStoreSequenceId.getSeqId()); + } + } + } + } finally { + in.close(); + } + } + + /** + * Regions may have been removed between latest persist of FlushedSequenceIds + * and master abort. So after loading FlushedSequenceIds from file, and after + * meta loaded, we need to remove the deleted region according to RegionStates. + */ + public void removeDeletedRegionFromLoadedFlushedSequenceIds() { + List regionsToDel = new ArrayList<>(); + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + for (Entry entry : flushedSequenceIdByRegion.entrySet()) { + if (regionStates.getRegionState(Bytes.toStringBinary(entry.getKey())) == null) { + regionsToDel.add(entry.getKey()); + } + } + for (byte[] regionEncodedName : regionsToDel) { + flushedSequenceIdByRegion.remove(regionEncodedName); + storeFlushedSequenceIdsByRegion.remove(regionEncodedName); + } + } + + + class FlushedSequenceIdFlusher extends ScheduledChore { + + public FlushedSequenceIdFlusher(String name, + Stoppable stopper, int p) { + super(name, stopper, p, 60 * 1000); //delay one minute before first execute + } + + @Override + protected void chore() { + try { + persistRegionLastFlushedSequenceIds(); + } catch (IOException e) { + LOG.warn("Failed to persist last flushed sequence id of regions" + + " to file system", e); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index 11df313..9b7efdd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -24,6 +24,10 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentNavigableMap; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -36,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Table; @@ -45,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -192,5 +198,35 @@ public class TestMaster { TEST_UTIL.deleteTable(tableName); } } + + @Test + public void testFlushedSequenceIdPersistLoad() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + int msgInterval = conf.getInt("hbase.regionserver.msginterval", 100); + // insert some data into META + TableName tableName = TableName.valueOf("testFlushSeqId"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(Bytes.toBytes("cf"))); + Table table = TEST_UTIL.createTable(desc, null); + // flush META region + TEST_UTIL.flush(TableName.META_TABLE_NAME); + // wait for regionserver report + Threads.sleep(msgInterval * 2); + // record flush seqid before cluster shutdown + Map regionMapBefore = + TEST_UTIL.getHBaseCluster().getMaster().getServerManager() + .getFlushedSequenceIdByRegion(); + // restart hbase cluster which will cause flushed sequence id persist and reload + TEST_UTIL.getMiniHBaseCluster().shutdown(); + TEST_UTIL.restartHBaseCluster(2); + TEST_UTIL.waitUntilNoRegionsInTransition(); + // check equality after reloading flushed sequence id map + Map regionMapAfter = + TEST_UTIL.getHBaseCluster().getMaster().getServerManager() + .getFlushedSequenceIdByRegion(); + assertTrue(regionMapBefore.equals(regionMapAfter)); + + + } }