diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 8af9555..67c5265 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -105,6 +105,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String tableLockZNode; // znode containing the state of recovering regions public String recoveringRegionsZNode; + // znode containing region open sequence id when a region opens(now only used for + // recovering regions) + public String regionOpenSeqIdZNode; // znode containing namespace descriptors public static String namespaceZNode = "namespace"; @@ -175,6 +178,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); ZKUtil.createAndFailSilent(this, tableLockZNode); ZKUtil.createAndFailSilent(this, recoveringRegionsZNode); + ZKUtil.createAndFailSilent(this, regionOpenSeqIdZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -226,6 +230,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { conf.get("zookeeper.znode.tableLock", "table-lock")); recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.recovering.regions", "recovering-regions")); + regionOpenSeqIdZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.regions.open.seqid", "region-open-seqids")); namespaceZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.namespace", "namespace")); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index bbf372c..34cbe0a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -144,8 +144,12 @@ import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -760,7 +764,12 @@ public class HRegion implements HeapSize { // , Writable{ // In distributedLogReplay mode, we don't know the last change sequence number because region // is opened before recovery completes. So we add a safety bumper to avoid new sequence number // overlaps used sequence numbers - nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million + try { + nextSeqid = ZKSplitLog.writeRegionOpenSequenceIdInZK(this.rsServices.getZooKeeper(), + this.getRegionInfo().getEncodedName(), nextSeqid, (this.flushPerChanges + 10000000)); + } catch (KeeperException e) { + throw new IOException("Failed to write region open sequence id", e); + } } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + @@ -1225,6 +1234,17 @@ public class HRegion implements HeapSize { // , Writable{ throw ioe; } } + + // delete region open sequence id after flush because store files have the latest SeqId + if(!this.isRecovering && this.rsServices != null) { + try { + ZooKeeperWatcher zkw = this.rsServices.getZooKeeper(); + ZKUtil.deleteNodeRecursively(zkw, ZKUtil.joinZNode(zkw.regionOpenSeqIdZNode, + this.getRegionInfo().getEncodedName())); + } catch (KeeperException e) { + LOG.warn("Failed to delete region open sequence id znode", e); + } + } } Map> result = diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index ac6042f..59983d7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -27,13 +27,19 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; /** @@ -143,6 +149,7 @@ public class ZKSplitLog { boolean result = false; String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName); + zkw.sync(nodePath); byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath); if (node != null) { result = true; @@ -221,4 +228,51 @@ public class ZKSplitLog { } return result; } + + /** + * Create a znode under regionOpenSeqIdZNode// with name as + * region open sequence id and remove old ones + * @param zkw + * @param encodedRegionName + * @param newSeqId + * @param saftyBumper + * @return long new sequence Id value + * @throws IOException + * @throws KeeperException + */ + public static long writeRegionOpenSequenceIdInZK(ZooKeeperWatcher zkw, String encodedRegionName, + long newSeqId, long saftyBumper) throws KeeperException { + + String znode = ZKUtil.joinZNode(zkw.regionOpenSeqIdZNode, encodedRegionName); + List ids = ZKUtil.listChildrenNoWatch(zkw, znode); + long maxSeqId = 0; + if (ids != null) { + for (String id : ids) { + try { + Long tmpSeqId = Long.parseLong(id); + maxSeqId = Math.max(tmpSeqId, maxSeqId); + } catch (NumberFormatException ex) { + LOG.warn("Invalid SeqId ZNode=" + id); + } + } + } + + if (maxSeqId > newSeqId) { + newSeqId = maxSeqId; + } + newSeqId += saftyBumper; // bump up SeqId + + // create a new seqId znode + String newSeqIdZNode = ZKUtil.joinZNode(znode, String.valueOf(newSeqId)); + ZKUtil.createWithParents(zkw, newSeqIdZNode); + + // remove old ones + if (ids != null) { + for (String id : ids) { + String nodePath = ZKUtil.joinZNode(znode, id); + ZKUtil.deleteNodeFailSilent(zkw, nodePath); + } + } + return newSeqId; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index cf9f726..d601896 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -98,6 +99,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -1363,6 +1365,31 @@ public class TestDistributedLogSplitting { ht.close(); } + @Test(timeout = 300000) + public void testReadWriteSeqIdZNodes() throws Exception { + LOG.info("testReadWriteSeqIdFiles"); + startCluster(2); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", 10); + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); + List regionDirs = FSUtils.getRegionDirs(fs, tableDir); + ZKSplitLog.writeRegionOpenSequenceIdInZK(zkw, regionDirs.get(0).getName() , 1L, 1000L); + // current SeqId file has seqid=1001 + ZKSplitLog.writeRegionOpenSequenceIdInZK(zkw, regionDirs.get(0).getName() , 1L, 1000L); + // current SeqId file has seqid=2001 + assertEquals(3001, ZKSplitLog.writeRegionOpenSequenceIdInZK(zkw, regionDirs.get(0).getName(), + 3L, 1000L)); + + List ids = ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(zkw.regionOpenSeqIdZNode, + regionDirs.get(0).getName())); + assertTrue(ids != null); + + // only one seqid file should exist + assertEquals(1, ids.size()); + ht.close(); + } + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception { return installTable(zkw, tname, fname, nrs, 0); }