diff --git a/build.xml b/build.xml
index 718b6d7..89cfa23 100644
--- a/build.xml
+++ b/build.xml
@@ -441,6 +441,7 @@
+
diff --git a/src/contrib/mdc_replication/bin/add_peer.rb b/src/contrib/mdc_replication/bin/add_peer.rb
new file mode 100644
index 0000000..352bea5
--- /dev/null
+++ b/src/contrib/mdc_replication/bin/add_peer.rb
@@ -0,0 +1,62 @@
+# Script to add a peer to a cluster
+# To see usage for this script, run:
+#
+# ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
+#
+
+include Java
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.EmptyWatcher
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
+
+# Name of this script
+NAME = "add_peer"
+
+# Print usage for this script
+def usage
+ puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
+ exit!
+end
+
+if ARGV.size != 2
+ usage
+end
+
+LOG = LogFactory.getLog(NAME)
+
+parts1 = ARGV[0].split(":")
+LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
+
+c2 = HBaseConfiguration.create()
+parts2 = ARGV[1].split(":")
+LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
+
+print "Are those info correct? [Y/n] "
+answer = $stdin.gets.chomp
+
+if answer.length != 0 || answer == "n" || answer == "no"
+ exit!
+end
+
+c1 = HBaseConfiguration.create()
+c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
+c1.set("hbase.zookeeper.property.clientPort", parts1[1])
+c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
+
+zkw1 = ZooKeeperWrapper.new(c1, EmptyWatcher.instance)
+zkw1.writeZNode(parts1[2], "replication", "a")
+zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
+zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
+zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
+
+
+c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
+c2.set("hbase.zookeeper.property.clientPort", parts2[1])
+c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
+
+zkw2 = ZooKeeperWrapper.new(c2, EmptyWatcher.instance)
+zkw2.writeZNode(parts2[2], "replication", "a")
+zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);
diff --git a/src/contrib/mdc_replication/build.xml b/src/contrib/mdc_replication/build.xml
new file mode 100644
index 0000000..a9a1aaf
--- /dev/null
+++ b/src/contrib/mdc_replication/build.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/contrib/mdc_replication/ivy.xml b/src/contrib/mdc_replication/ivy.xml
new file mode 100644
index 0000000..53ea3ac
--- /dev/null
+++ b/src/contrib/mdc_replication/ivy.xml
@@ -0,0 +1,107 @@
+
+
+
+
+
+
+
+ Hadoop Core
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java
new file mode 100644
index 0000000..921f901
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRPC.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * Helper class to add RPC-related configs for replication
+ */
+public class ReplicationRPC {
+
+ private static final byte RPC_CODE = 110;
+
+ private static boolean initialized = false;
+
+ public synchronized static void initialize() {
+ if (initialized) {
+ return;
+ }
+ HBaseRPC.addToMap(ReplicationRegionInterface.class, RPC_CODE);
+ initialized = true;
+ }
+
+ private ReplicationRPC() {
+ // Static helper class;
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java
new file mode 100644
index 0000000..5f65baa
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/ipc/ReplicationRegionInterface.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+import java.io.IOException;
+
+/**
+ * Interface that defines replication
+ */
+public interface ReplicationRegionInterface extends HRegionInterface {
+
+ /**
+ * Replicates the given entries. The guarantee is that the given entries
+ * will be durable on the slave cluster if this method returns without
+ * and exception.
+ * @param entries entries to replicate
+ * @throws IOException
+ */
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
+
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html
new file mode 100755
index 0000000..ab6d005
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/package.html
@@ -0,0 +1,135 @@
+
+
+
+
+
+
+Multi Data Center Replication
+This package provides replication between HBase clusters.
+
+
+
Table Of Contents
+
+ - Status
+ - Requirements
+ - Deployment
+
+
+
+
+Status
+
+
+This package isn't even alpha quality software and is only meant to be a base
+for future developments. The current implementation offers the following
+features:
+
+
+ - Master/Slave replication limited to 1 slave.
+ - Replication of all user tables.
+ - Start/stop replication stream.
+ - Supports cluters of different sizes.
+ - Re-replication of entries from failed region
+ servers on the master cluster.
+
+Please report bugs on the project's Jira when found.
+
+
+Requirements
+
+
+
+Before trying out replication, make sure to review the following requirements:
+
+
+ - Zookeeper should be handled by yourself, not by HBase, and should
+ always be available during the deployment.
+ - All machines from both clusters should be able to reach every
+ other machine since replication goes from any region server to any
+ other one on the slave cluster. That also includes the
+ Zookeeper clusters.
+ - Both clusters should have the same HBase and Hadoop major revision.
+ For example, having 0.21.1 on the master and 0.21.0 on the slave is
+ correct but not 0.21.1 and 0.22.0.
+ - Every table should exist with the exact same name and column
+ family names on both clusters.
+
+
+
+
+Deployment
+
+
+
+The following steps describe how to enable replication from a cluster
+to another. This must be done with both clusters offlined.
+
+ - Copy the hbase-0.21.0-dev-mdc_replication.jar file from the
+ $HBASE_HOME/contrib/mdc_replication/ folder to $HBASE_HOME/lib on
+ both clusters.
+ - Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
+ the following configurations:
+
+<property>
+ <name>hbase.regionserver.class</name>
+ <value>org.apache.hadoop.hbase.ipc.ReplicationRegionInterface</value>
+</property>
+<property>
+ <name>hbase.regionserver.impl</name>
+ <value>org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer</value>
+</property>
+
+ - Run the following command on any cluster:
+
+$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/src/contrib/mdc_replication/bin/add_peer.tb
+ This will show you the help to setup the replication stream between
+ both clusters. If both clusters use the same Zookeeper cluster, you have
+ to use a different zookeeper.znode.parent since they can't
+ write in the same folder.
+
+ - You can now start and stop the clusters with your preferred method.
+
+
+You can confirm that your setup works by looking at any region server's log
+on the master cluster and look for the following lines;
+
+
+Considering 1 rs, with ratio 0.1
+Getting 1 rs from peer cluster # 0
+Choosing peer 10.10.1.49:62020
+
+In this case it indicates that 1 region server from the slave cluster
+was chosen for replication.
+
+Should you want to stop the replication while the clusters are running, open
+the shell on the master cluster and issue this command:
+
+hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'
+
+Where you replace the znode parent with the one configured on your master
+cluster. Replication of already queued edits will still happen after you
+issued that command but new entries won't be. To start it back, simply replace
+"false" with "true" in the command.
+
+
+
+
+
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java
new file mode 100644
index 0000000..c68a35e
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.io.IOException;
+
+/**
+ * Specialized version of HRegion to handle replication. In particular,
+ * it replays all edits from the reconstruction log.
+ */
+public class ReplicationRegion extends HRegion {
+
+ static final Log LOG = LogFactory.getLog(ReplicationRegion.class);
+
+ private final ReplicationSource replicationSource;
+
+ public ReplicationRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
+ HRegionInfo regionInfo, FlushRequester flushListener,
+ ReplicationSource repSource) {
+ super(basedir, log, fs, conf, regionInfo, flushListener);
+ this.replicationSource = repSource;
+ }
+
+
+ protected void doReconstructionLog(final Path oldLogFile,
+ final long minSeqId, final long maxSeqId, final Progressable reporter)
+ throws UnsupportedEncodingException, IOException {
+ super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
+
+ if(this.replicationSource == null) {
+ return;
+ }
+
+ if (oldLogFile == null || !getFilesystem().exists(oldLogFile)) {
+ return;
+ }
+
+ FileStatus[] stats = getFilesystem().listStatus(oldLogFile);
+ if (stats == null || stats.length == 0) {
+ LOG.warn("Passed reconstruction log " + oldLogFile
+ + " is zero-length");
+ }
+
+ HLog.Reader reader = HLog.getReader(getFilesystem(), oldLogFile, getConf());
+ try {
+ HLog.Entry entry;
+ while ((entry = reader.next()) != null) {
+ HLogKey key = entry.getKey();
+ KeyValue val = entry.getEdit();
+ if (key.getLogSeqNum() < maxSeqId) {
+ continue;
+ }
+
+ // Don't replicate catalog entries and meta information like
+ // complete log flush.
+ if(!(Bytes.equals(key.getTablename(),ROOT_TABLE_NAME) ||
+ Bytes.equals(key.getTablename(),META_TABLE_NAME)) &&
+ !Bytes.equals(val.getFamily(), HLog.METAFAMILY)) {
+ this.replicationSource.enqueueLog(entry);
+ }
+
+ }
+ } finally {
+ reader.close();
+ }
+
+
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java
new file mode 100644
index 0000000..8c0584a
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegionServer.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.replication.ReplicationHLog;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.ReplicationRPC;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReplicationRegionServer extends HRegionServer
+ implements ReplicationRegionInterface {
+
+ static {
+ ReplicationRPC.initialize();
+ }
+
+ protected static final Log LOG =
+ LogFactory.getLog(ReplicationRegionServer.class);
+
+ private final ReplicationSource replicationSource;
+ private ReplicationSink replicationSink;
+ private final boolean isMaster;
+ private final AtomicBoolean isReplicating = new AtomicBoolean(true);
+
+ private final ReplicationZookeeperHelper zkHelper;
+
+
+ /**
+ * Starts a HRegionServer at the default location
+ *
+ * @param conf
+ * @throws java.io.IOException
+ */
+ public ReplicationRegionServer(Configuration conf) throws IOException {
+ super(conf);
+
+ this.zkHelper = new ReplicationZookeeperHelper(
+ this.getZooKeeperWrapper(), this.conf, this.isReplicating);
+ this.isMaster = zkHelper.isMaster();
+
+ this.replicationSink = null;
+ this.replicationSource = this.isMaster ? new ReplicationSource(this,
+ super.stopRequested, this.isReplicating) : null;
+ }
+
+ @Override
+ protected HLog instantiateHLog(Path logdir) throws IOException {
+ HLog newlog = new ReplicationHLog(super.getFileSystem(),
+ logdir, conf, super.getLogRoller(),
+ this.replicationSource);
+ return newlog;
+ }
+
+ @Override
+ protected void init(final MapWritable c) throws IOException {
+ super.init(c);
+ String n = Thread.currentThread().getName();
+
+ String repLogPathStr =
+ ReplicationSink.getRepLogPath(getHServerInfo().getServerName());
+ Path repLogPath = new Path(getRootDir(), repLogPathStr);
+
+
+ Thread.UncaughtExceptionHandler handler =
+ new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(final Thread t, final Throwable e) {
+ abort();
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ }
+ };
+ if(this.isMaster) {
+ Threads.setDaemonThreadRunning(
+ this.replicationSource, n + ".replicationSource", handler);
+ } else {
+ this.replicationSink =
+ new ReplicationSink(conf,super.stopRequested,
+ repLogPath, getFileSystem(), getThreadWakeFrequency());
+ Threads.setDaemonThreadRunning(
+ this.replicationSink, n + ".replicationSink", handler);
+ }
+ }
+
+ @Override
+ protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ HRegion r = new ReplicationRegion(HTableDescriptor.getTableDir(super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
+ .getFileSystem(), super.conf, regionInfo,
+ super.getFlushRequester(), this.replicationSource);
+
+ r.initialize(null, new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ return r;
+ }
+
+
+ @Override
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ this.replicationSink.replicateEntries(entries);
+ }
+
+ /**
+ *
+ * @param protocol
+ * @param clientVersion
+ * @return
+ * @throws IOException
+ */
+ public long getProtocolVersion(final String protocol,
+ final long clientVersion)
+ throws IOException {
+ if (protocol.equals(ReplicationRegionInterface.class.getName())) {
+ return HBaseRPCProtocolVersion.versionID;
+ }
+ throw new IOException("Unknown protocol to name node: " + protocol);
+ }
+
+ /**
+ *
+ * @return
+ */
+ public ReplicationZookeeperHelper getZkHelper() {
+ return zkHelper;
+ }
+
+ protected void join() {
+ super.join();
+ if(this.isMaster) {
+ Threads.shutdown(this.replicationSource);
+ } else {
+ Threads.shutdown(this.replicationSink);
+ }
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java
new file mode 100644
index 0000000..a300edb
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSink.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster. All edits are first put into a log that will
+ * be read later by the main thread.
+ *
+ * This replication process is currently waiting for the edits to be applied
+ * before any other entry can be appended to the log.
+ *
+ * The log is rolled but old ones aren't kept at the moment.
+ */
+public class ReplicationSink extends Thread {
+
+ public static final String REPLICATION_LOG_DIR = ".replogs";
+
+ static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+ private final Configuration conf;
+
+ private final HTablePool pool;
+
+ private final AtomicBoolean stop;
+
+ private HLog.Reader reader;
+
+ private HLog.Writer writer;
+
+ private final FileSystem fs;
+
+ private Path path;
+
+ private long position = 0;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition newData = lock.newCondition();
+
+ private final AtomicLong editsSize = new AtomicLong(0);
+
+ private long lastEditSize = 0;
+
+ private final long logrollsize;
+
+ private final long threadWakeFrequency;
+
+ /**
+ * Create a sink for replication
+ * @param conf conf object
+ * @param stopper boolean to tell this thread to stop
+ * @param path the path to the log
+ * @param fs the filesystem to use
+ * @param threadWakeFrequency how long should the thread wait for edits
+ * @throws IOException thrown when HDFS goes bad or bad file name
+ */
+ public ReplicationSink(final Configuration conf,
+ final AtomicBoolean stopper, Path path,
+ FileSystem fs, long threadWakeFrequency)
+ throws IOException {
+ this.conf = conf;
+ this.pool = new HTablePool(this.conf, 10);
+ this.stop = stopper;
+ this.fs = fs;
+ this.path = path;
+ long blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
+ this.fs.getDefaultBlockSize());
+ float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
+ this.logrollsize = (long)(blocksize * multi);
+ this.threadWakeFrequency = threadWakeFrequency;
+ rollLog();
+
+ }
+
+ /**
+ * Put this array of entries into a log that will be read later
+ * @param entries
+ * @throws IOException
+ */
+ public void replicateEntries(HLog.Entry[] entries)
+ throws IOException {
+
+ this.lock.lock();
+ if(!this.stop.get()) {
+ // add to WAL and defer actual inserts
+ try {
+ for(HLog.Entry entry : entries) {
+
+ this.writer.append(entry);
+ this.editsSize.addAndGet(entry.getKey().heapSize() +
+ entry.getEdit().heapSize());
+ }
+ this.writer.sync();
+
+ this.newData.signal();
+
+ } catch (IOException ioe) {
+ LOG.error("Unable to accept edit because", ioe);
+ throw ioe;
+ } finally {
+ this.lock.unlock();
+ }
+ } else {
+ LOG.info("Won't be replicating data as we are shutting down");
+ }
+ }
+
+ public void run() {
+
+ try {
+ this.lock.lock();
+ while (!this.stop.get()) {
+ this.newData.await(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
+ try {
+ if(this.lastEditSize == this.editsSize.get()) {
+ continue;
+ }
+ // There's no tailing in HDFS so we create a new reader
+ // and seek every time
+ this.reader = HLog.getReader(this.fs, this.path, this.conf);
+
+ if (position != 0) {
+ this.reader.seek(position);
+ }
+
+ byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+ List puts = new ArrayList();
+
+ // Very simple optimization where we batch sequences of rows going
+ // to the same table.
+ HLog.Entry entry = new HLog.Entry();
+ while (this.reader.next(entry) != null) {
+ KeyValue kv = entry.getEdit();
+
+ if (kv.isDelete()) {
+ Delete delete = new Delete(kv.getRow(), kv.getTimestamp(), null);
+ if (kv.isDeleteFamily()) {
+ delete.deleteFamily(kv.getFamily());
+ } else if (!kv.isEmptyColumn()) {
+ delete.deleteColumn(entry.getEdit().getFamily(),
+ kv.getQualifier());
+ }
+ pool.getTable(entry.getKey().getTablename()).delete(delete);
+
+ } else {
+ Put put = new Put(kv.getRow(), kv.getTimestamp(), null);
+ put.add(entry.getEdit().getFamily(),
+ kv.getQualifier(), kv.getValue());
+
+ // Switching table, flush
+ if (!Bytes.equals(lastTable, entry.getKey().getTablename())
+ && !puts.isEmpty()) {
+ pool.getTable(lastTable).put(puts);
+ puts.clear();
+ }
+ lastTable = entry.getKey().getTablename();
+ puts.add(put);
+ }
+ }
+
+ if (!puts.isEmpty()) {
+ pool.getTable(lastTable).put(puts);
+ }
+
+ position = this.reader.getPosition();
+
+ if(this.editsSize.get() > this.logrollsize) {
+ rollLog();
+ }
+ this.lastEditSize = editsSize.get();
+
+
+ } catch (EOFException eof) {
+ LOG.warn("Got EOF while reading, will continue on next notify");
+ } catch (TableNotFoundException ex) {
+ LOG.warn("Losing edits since: " + ex);
+ } finally {
+ this.newData.signal();
+ if(this.reader != null) {
+ this.reader.close();
+ }
+ this.reader = null;
+ }
+
+ }
+ close();
+ } catch (Exception ex) {
+ // Should we log rejected edits in a file for replay?
+ LOG.error("Unable to accept edit because", ex);
+ this.stop.set(true);
+ }
+ }
+
+ private void close() throws IOException {
+ this.writer.close();
+ if(reader != null) {
+ this.reader.close();
+ }
+ this.fs.delete(this.path,true);
+ }
+
+ // Delete the current log and start a new one with the same name
+ // TODO keep the old versions so that the writing thread isn't help up
+ // by the reading thead and this latter one could be reading older logs.
+ // At this point we are under the lock.
+ protected void rollLog() throws IOException {
+ if(! (this.editsSize.get() == 0)) {
+ this.writer.close();
+ if(this.reader != null) {
+ this.reader.close();
+ }
+ this.fs.delete(this.path,true);
+ }
+ this.writer = HLog.createWriter(this.fs, this.path, this.conf);
+ this.editsSize.set(0);
+ this.position = 0;
+ LOG.debug("New replication log");
+ }
+
+ /**
+ * Get the path of the file for this server
+ * @param serverName
+ * @return
+ */
+ public static String getRepLogPath(String serverName) {
+ StringBuilder dirName = new StringBuilder(REPLICATION_LOG_DIR);
+ dirName.append("/");
+ dirName.append(serverName);
+ return dirName.toString();
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java
new file mode 100644
index 0000000..f5fe5d8
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.ReplicationConnectionManager;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Class that handles the source of a replication stream
+ * Currently does not handle more than 1 slave
+ * For each slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ */
+public class ReplicationSource extends Chore implements HConstants {
+
+ static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+ private final LinkedBlockingQueue queue =
+ new LinkedBlockingQueue();
+ private final List tempArray = new ArrayList();
+ private final HLog.Entry[] dummyArray = new HLog.Entry[0];
+ private final ReplicationConnectionManager conn;
+ private final ReplicationZookeeperHelper zkHelper;
+ private final Configuration conf;
+ private final float ratio;
+ private final Random random;
+ private final AtomicBoolean isReplicating;
+
+ private List currentPeers;
+
+ /**
+ * Constructor used by region servers
+ * @param server the region server specialized in replication
+ * @param stopper the atomic boolean to use to stop the cluster
+ * @param isReplicating the atomic boolean that starts/stops replication
+ * @throws IOException
+ */
+ public ReplicationSource(final ReplicationRegionServer server,
+ final AtomicBoolean stopper,
+ final AtomicBoolean isReplicating)
+ throws IOException {
+ super(server.getThreadWakeFrequency(), stopper);
+ this.conf = server.getConfiguration();
+ this.conn = new ReplicationConnectionManager(this.conf);
+ this.zkHelper = server.getZkHelper();
+ this.ratio = this.conf.getFloat("replication.ratio", 0.1f);
+ currentPeers = new ArrayList();
+ this.random = new Random();
+ this.isReplicating = isReplicating;
+ }
+
+ @Override
+ protected boolean initialChore() {
+ this.chooseSinksForPeer(0);
+ return currentPeers.size() > 0;
+ }
+
+ /**
+ * Select a number of peers at random using the ratio. Mininum 1.
+ * @param index
+ */
+ private void chooseSinksForPeer(int index) {
+ this.currentPeers.clear();
+ List addresses = this.zkHelper.getPeersAddresses(index);
+ Map mapOfAdr =
+ new HashMap();
+ LOG.info("Considering " + addresses.size() +
+ " rs, with ratio " + (addresses.size()*ratio));
+ int nbPeers = (int)(Math.ceil (addresses.size()*ratio));
+ LOG.info("Getting " + nbPeers + " rs from peer cluster # " + index);
+ for(int i = 0; i < nbPeers; i++) {
+ HServerAddress adr =
+ addresses.get(this.random.nextInt(addresses.size()));
+ while(mapOfAdr.containsKey(adr.toString())) {
+ adr = addresses.get(this.random.nextInt(addresses.size()));
+ }
+ LOG.info("Choosing peer " + adr.toString());
+ mapOfAdr.put(adr.toString(), adr);
+ }
+ this.currentPeers.addAll(mapOfAdr.values());
+ }
+
+ /**
+ * Put a log entry in a replication queue if replication is enabled
+ * @param logEntry
+ */
+ public void enqueueLog(HLog.Entry logEntry) {
+ if(this.isReplicating.get()) {
+ this.queue.add(logEntry);
+ }
+ }
+
+ @Override
+ protected void chore() {
+ while(!super.stop.get()) {
+ // Drain the edits accumulated in the queue, select a node at random
+ // and send the edits. If it fails, get a new set of nodes and chose
+ // a new one to replicate to.
+ try {
+ this.queue.drainTo(this.tempArray);
+ if(this.tempArray.size() > 0) {
+ HServerAddress adr =
+ currentPeers.get(random.nextInt(this.currentPeers.size()));
+ ReplicationRegionInterface rrs = this.conn.getHRegionConnection(adr);
+ LOG.info("Replicating " + this.tempArray.size()
+ + " to " + adr.toString());
+ rrs.replicateLogEntries(this.tempArray.toArray(dummyArray));
+ this.tempArray.clear();
+ }
+ return;
+ }
+ catch (IOException ioe) {
+ LOG.warn("Unable to replicate, retrying with a new node", ioe);
+
+ try{
+ Thread.sleep(1000);
+ } catch (InterruptedException e){
+ // continue
+ }
+
+ // Should wait in a backoff fashion?
+ // make sure we don't retry with the same node
+ chooseSinksForPeer(0);
+ }
+ }
+ }
+
+
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java
new file mode 100644
index 0000000..420c2e6
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal.replication;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationSource;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+
+/**
+ * HLog specialized in replication. It replicates every entry from every
+ * user table at the moment.
+ */
+public class ReplicationHLog extends HLog {
+
+ static final Log LOG = LogFactory.getLog(ReplicationHLog.class);
+
+ private ReplicationSource replicationSource;
+
+ private final boolean isReplicator;
+
+ /**
+ * New constructor used for replication
+ * @param fs filesystem to use
+ * @param dir directory to store the wal
+ * @param conf conf ot use
+ * @param listener log listener to pass to super class
+ * @param replicationSource where to put the entries
+ * @throws IOException
+ */
+ public ReplicationHLog(final FileSystem fs, final Path dir,
+ final Configuration conf,
+ final LogRollListener listener,
+ ReplicationSource replicationSource)
+ throws IOException {
+ super(fs, dir, conf, listener);
+ this.replicationSource = replicationSource;
+ this.isReplicator = this.replicationSource != null;
+ }
+
+ @Override
+ protected void doWrite(HRegionInfo info, HLogKey logKey,
+ KeyValue logEdit, long now)
+ throws IOException {
+ super.doWrite(info, logKey, logEdit, now);
+ if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion())) {
+ this.replicationSource.enqueueLog(new Entry(logKey, logEdit));
+ }
+
+ }
+
+ public ReplicationSource getReplicationSource() {
+ return this.replicationSource;
+ }
+
+
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java
new file mode 100644
index 0000000..0ed84a2
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationConnectionManager.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.ipc.RemoteException;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Connection manager to communicate with the other clusters.
+ */
+public class ReplicationConnectionManager implements HConstants {
+
+ private final int numRetries;
+ private final int maxRPCAttempts;
+ private final long rpcTimeout;
+ private final Map servers =
+ new ConcurrentHashMap();
+ private final
+ Class extends ReplicationRegionInterface> serverInterfaceClass;
+ private final Configuration conf;
+
+ /**
+ * Constructor that sets up RPC to other clusters
+ * @param conf
+ */
+ public ReplicationConnectionManager(Configuration conf) {
+ this.conf = conf;
+ String serverClassName =
+ conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);
+ this.numRetries = conf.getInt("hbase.client.retries.number", 10);
+ this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
+ this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
+ try {
+ this.serverInterfaceClass =
+ (Class extends ReplicationRegionInterface>)
+ Class.forName(serverClassName);
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find region server interface " + serverClassName, e);
+ }
+ }
+
+ /**
+ * Get a connection to a distant region server for replication
+ * @param regionServer the address to use
+ * @return the connection to the region server
+ * @throws IOException
+ */
+ public ReplicationRegionInterface getHRegionConnection(
+ HServerAddress regionServer)
+ throws IOException {
+ ReplicationRegionInterface server;
+ synchronized (this.servers) {
+ // See if we already have a connection
+ server = this.servers.get(regionServer.toString());
+ if (server == null) { // Get a connection
+ try {
+ server = (ReplicationRegionInterface) HBaseRPC.waitForProxy(
+ serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf,
+ this.maxRPCAttempts, this.rpcTimeout);
+ } catch (RemoteException e) {
+ throw RemoteExceptionHandler.decodeRemoteException(e);
+ }
+ this.servers.put(regionServer.toString(), server);
+ }
+ }
+ return server;
+ }
+}
diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java
new file mode 100644
index 0000000..1f40808
--- /dev/null
+++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+
+/**
+ * This class servers as a helper for all things related to zookeeper
+ * in the replication contrib.
+ */
+public class ReplicationZookeeperHelper implements HConstants, Watcher {
+
+ static final Log LOG = LogFactory.getLog(ReplicationZookeeperHelper.class);
+
+ private final ZooKeeperWrapper zookeeperWrapper;
+
+ private final List peerClusters;
+
+ private final String replicationZNode;
+ private final String peersZNode;
+
+ private final String replicationStateNodeName;
+
+ private final boolean isMaster;
+
+ private final Configuration conf;
+
+ private final AtomicBoolean isReplicating;
+
+ /**
+ * Constructor used by region servers
+ * @param zookeeperWrapper zkw to wrap
+ * @param conf conf to use
+ * @param isReplicating atomic boolean to start/stop replication
+ * @throws IOException
+ */
+ public ReplicationZookeeperHelper(
+ ZooKeeperWrapper zookeeperWrapper, Configuration conf,
+ final AtomicBoolean isReplicating) throws IOException{
+ this.zookeeperWrapper = zookeeperWrapper;
+ this.conf = conf;
+ String replicationZNodeName =
+ conf.get("zookeeper.znode.replication", "replication");
+ String peersZNodeName =
+ conf.get("zookeeper.znode.peers", "peers");
+ String repMasterZNodeName =
+ conf.get("zookeeper.znode.master", "master");
+ this.replicationStateNodeName =
+ conf.get("zookeeper.znode.state", "state");
+
+
+ this.peerClusters = new ArrayList();
+ this.replicationZNode = zookeeperWrapper.getZNode(
+ zookeeperWrapper.parentZNode,replicationZNodeName);
+ this.peersZNode =
+ zookeeperWrapper.getZNode(replicationZNode,peersZNodeName);
+
+ List znodes =
+ this.zookeeperWrapper.listZnodes(this.peersZNode, this);
+ if(znodes != null) {
+ for(String znode : znodes) {
+ connectToPeer(znode);
+ }
+ }
+ String address = this.zookeeperWrapper.getData(this.replicationZNode,
+ repMasterZNodeName);
+
+ String thisCluster = this.conf.get(ZOOKEEPER_QUORUM)+":"+
+ this.conf.get("hbase.zookeeper.property.clientPort") +":" +
+ this.conf.get(ZOOKEEPER_ZNODE_PARENT);
+
+ this.isMaster = thisCluster.equals(address);
+
+ LOG.info("This cluster (" + thisCluster + ") is a "
+ + (this.isMaster ? "master" : "slave") + " for replication" +
+ ", compared with (" + address + ")");
+
+ this.isReplicating = isReplicating;
+
+ setIsReplicating();
+ }
+
+ /**
+ * Returns all region servers from given peer
+ * @param clusterIndex the cluster to interrogate
+ * @return addresses of all region servers
+ */
+ public List getPeersAddresses(int clusterIndex) {
+ return this.peerClusters.size() == 0 ?
+ null : this.peerClusters.get(clusterIndex).scanRSDirectory();
+ }
+
+ // This method connects this cluster to another one and registers it
+ private void connectToPeer(String znode) throws IOException {
+ String[] quorum =
+ this.zookeeperWrapper.getData(this.peersZNode, znode).split(":");
+ if(quorum.length == 3) {
+ Configuration otherConf = new Configuration(this.conf);
+ otherConf.set(ZOOKEEPER_QUORUM, quorum[0]);
+ otherConf.set("hbase.zookeeper.property.clientPort", quorum[1]);
+ otherConf.set(ZOOKEEPER_ZNODE_PARENT, quorum[2]);
+ this.peerClusters.add(new ZooKeeperWrapper(otherConf, this));
+ LOG.info("Added new peer cluster " + StringUtils.arrayToString(quorum));
+ } else {
+ LOG.error("Wrong format of cluster address: " +
+ this.zookeeperWrapper.getData(this.peersZNode, znode));
+ }
+ }
+
+ /**
+ * Tells if this cluster replicates or not
+ * @return
+ */
+ public boolean isMaster() {
+ return isMaster;
+ }
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ Event.EventType type = watchedEvent.getType();
+ LOG.info(("Got event " + type + " with path " + watchedEvent.getPath()));
+ if (type.equals(Event.EventType.NodeDataChanged)) {
+ setIsReplicating();
+ }
+ }
+
+ /**
+ * This reads the state znode for replication and sets the atomic boolean
+ */
+ private void setIsReplicating() {
+ String value = this.zookeeperWrapper.getDataAndWatch(
+ this.replicationZNode, this.replicationStateNodeName, this);
+ if(value != null) {
+ isReplicating.set(value.equals("true"));
+ LOG.info("Replication is now " + (isReplicating.get() ?
+ "started" : "stopped"));
+ }
+ }
+}
diff --git a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java
new file mode 100644
index 0000000..3e62efc
--- /dev/null
+++ b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/regionserver/replication/TestReplicationSink.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.fs.Path;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestReplicationSink {
+
+ protected static final Log LOG =
+ LogFactory.getLog(TestReplicationSink.class);
+
+ private static final int BATCH_SIZE = 10;
+
+ private final static Configuration conf = HBaseConfiguration.create();
+
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private static ReplicationSink SINK;
+
+ private static final byte[] TABLE_NAME1 =
+ Bytes.toBytes("table1");
+ private static final byte[] TABLE_NAME2 =
+ Bytes.toBytes("table2");
+
+ private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+ private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+
+ private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
+
+ private static HTable table1;
+
+ private static HTable table2;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ TEST_UTIL.startMiniCluster(3);
+ Path repLogPath = new Path(TEST_UTIL.getTestDir(),
+ ReplicationSink.getRepLogPath("test_rep_sink"));
+ SINK = new ReplicationSink(conf,STOPPER,
+ TEST_UTIL.getTestDir(),
+ TEST_UTIL.getDFSCluster().getFileSystem(), 1000);
+ SINK.start();
+ table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
+ table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ STOPPER.set(true);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
+ table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {}
+
+ //@Test
+ public void testBatchSink() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Thread.sleep(500);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(scanRes.next(BATCH_SIZE).length, BATCH_SIZE);
+ }
+
+ //@Test
+ public void testMixedPutDelete() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
+ for(int i = 0; i < BATCH_SIZE; i+=2) {
+ entries[i/2] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Thread.sleep(500);
+
+ entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i,
+ i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
+ }
+
+ SINK.replicateEntries(entries);
+ Thread.sleep(500);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(BATCH_SIZE/2,scanRes.next(BATCH_SIZE).length);
+ }
+
+ //@Test
+ public void testMixedPutTables() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] =
+ createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
+ i, KeyValue.Type.Put);
+ }
+
+ SINK.replicateEntries(entries);
+ Thread.sleep(500);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table2.getScanner(scan);
+ for(Result res : scanRes) {
+ assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+ }
+ }
+
+ //@Test
+ public void testMixedDeletes() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[3];
+
+ for(int i = 0; i < 3; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Thread.sleep(500);
+
+ entries = new HLog.Entry[3];
+
+ entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
+ entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
+ entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
+
+ SINK.replicateEntries(entries);
+ Thread.sleep(500);
+
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(0, scanRes.next(3).length);
+ }
+
+ @Test
+ public void testRolling() throws Exception {
+ testMixedDeletes();
+ SINK.rollLog();
+ testMixedDeletes();
+ SINK.rollLog();
+ testMixedPutTables();
+ }
+
+ private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
+ byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
+ byte[] rowBytes = Bytes.toBytes(row);
+ final long now = System.currentTimeMillis();
+ KeyValue kv = null;
+ if(type.getCode() == KeyValue.Type.Put.getCode()) {
+ kv = new KeyValue(rowBytes, fam, fam, now,
+ KeyValue.Type.Put, Bytes.toBytes(row));
+ } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
+ kv = new KeyValue(rowBytes, fam, fam,
+ now, KeyValue.Type.DeleteColumn);
+ } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
+ kv = new KeyValue(rowBytes, fam, null,
+ now, KeyValue.Type.DeleteFamily);
+ }
+
+ HLogKey key = new HLogKey(table, table, now, now);
+
+ return new HLog.Entry(key, kv);
+ }
+}
diff --git a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
new file mode 100644
index 0000000..affbf7a
--- /dev/null
+++ b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
+
+import org.junit.*;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+
+public class TestReplication implements HConstants{
+
+ protected static final Log LOG = LogFactory.getLog(TestReplication.class);
+
+ private Configuration conf1;
+ private Configuration conf2;
+
+ private ZooKeeperWrapper zkw1;
+ private ZooKeeperWrapper zkw2;
+
+ private HBaseTestingUtility utility1;
+ private HBaseTestingUtility utility2;
+
+ private final int NB_ROWS_IN_BATCH = 100;
+ private final long SLEEP_TIME = 500;
+ private final int NB_RETRIES = 10;
+
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ try {
+ conf1 = HBaseConfiguration.create();
+ conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+ .getName());
+ conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+ .getName());
+ conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
+
+ utility1 = new HBaseTestingUtility(conf1);
+ utility1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+ zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
+ zkw1.writeZNode("/1", "replication", "");
+ zkw1.writeZNode("/1/replication", "master",
+ conf1.get(ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+ setIsReplication("true");
+
+
+ LOG.info("Setup first Zk");
+
+ conf2 = HBaseConfiguration.create();
+ conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
+ .getName());
+ conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
+ .getName());
+ conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
+
+ utility2 = new HBaseTestingUtility(conf2);
+ utility2.setZkCluster(miniZK);
+ zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
+ zkw2.writeZNode("/2", "replication", "");
+ zkw2.writeZNode("/2/replication", "master",
+ conf1.get(ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+
+ zkw1.writeZNode("/1/replication/peers", "test",
+ conf2.get(ZOOKEEPER_QUORUM)+":" +
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+
+ LOG.info("Setup second Zk");
+ } catch (Exception ex) { ex.printStackTrace(); throw ex; }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {}
+
+ @Test
+ public void testReplication() throws Exception {
+ utility1.startMiniCluster();
+ utility2.startMiniCluster();
+
+ byte[] tableName = Bytes.toBytes("test");
+ byte[] famName = Bytes.toBytes("f");
+ byte[] row = Bytes.toBytes("row");
+
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ table.addFamily(fam);
+
+ HBaseAdmin admin1 = new HBaseAdmin(conf1);
+ HBaseAdmin admin2 = new HBaseAdmin(conf2);
+ admin1.createTable(table);
+ admin2.createTable(table);
+
+ Put put = new Put(row);
+ put.add(famName, row, row);
+
+ HTable table1 = new HTable(conf1, tableName);
+ table1.put(put);
+
+ HTable table2 = new HTable(conf2, tableName);
+ Get get = new Get(row);
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = table2.get(get);
+ if(res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ Delete del = new Delete(row);
+ table1.delete(del);
+
+ table2 = new HTable(conf2, tableName);
+ get = new Get(row);
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for del replication");
+ }
+ Result res = table2.get(get);
+ if(res.size() >= 1) {
+ LOG.info("Row not deleted");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+
+ // normal Batch tests
+ table1.setAutoFlush(false);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ put = new Put(Bytes.toBytes(i));
+ put.add(famName, row, row);
+ table1.put(put);
+ }
+ table1.flushCommits();
+
+ Scan scan = new Scan();
+
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ ResultScanner scanner = table2.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+ scanner.close();
+ if(res.length != NB_ROWS_IN_BATCH) {
+ LOG.info("Only got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+
+ table1.setAutoFlush(true);
+
+ // Test stopping replication
+ setIsReplication("false");
+
+ // Takes some ms for ZK to fire the watcher
+ Thread.sleep(100);
+
+
+ put = new Put(Bytes.toBytes("stop start"));
+ put.add(famName, row, row);
+ table1.put(put);
+
+ get = new Get(Bytes.toBytes("stop start"));
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ break;
+ }
+ Result res = table2.get(get);
+ if(res.size() >= 1) {
+ fail("Replication wasn't stopped");
+
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ // Test restart replication
+
+ setIsReplication("true");
+
+ Thread.sleep(100);
+
+ table1.put(put);
+
+ for(int i = 0; i < NB_RETRIES; i++) {
+ if(i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = table2.get(get);
+ if(res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ }
+
+ private void setIsReplication(String bool) throws Exception{
+ zkw1.writeZNode("/1/replication", "state", bool);
+ }
+}
diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java
index 04340f5..7d957ae 100644
--- a/src/java/org/apache/hadoop/hbase/HConstants.java
+++ b/src/java/org/apache/hadoop/hbase/HConstants.java
@@ -93,6 +93,11 @@ public interface HConstants {
/** Default ZooKeeper pause value. In milliseconds. */
static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
+ /** Parameter name for the root dir in ZK for this cluster */
+ static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
+
+ static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase";
+
/** Parameter name for port region server listens on. */
static final String REGIONSERVER_PORT = "hbase.regionserver.port";
diff --git a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
index 901ac67..379a6a0 100644
--- a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
+++ b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
@@ -168,11 +168,12 @@ public class HConnectionManager implements HConstants {
*/
public static synchronized ClientZKWatcher getClientZooKeeperWatcher(
Configuration conf) throws IOException {
- if (!ZK_WRAPPERS.containsKey(conf.get(HConstants.ZOOKEEPER_QUORUM))) {
- ZK_WRAPPERS.put(conf.get(HConstants.ZOOKEEPER_QUORUM),
+ if (!ZK_WRAPPERS.containsKey(
+ ZooKeeperWrapper.getZookeeperClusterKey(conf))) {
+ ZK_WRAPPERS.put(ZooKeeperWrapper.getZookeeperClusterKey(conf),
new ClientZKWatcher(conf));
}
- return ZK_WRAPPERS.get(conf.get(HConstants.ZOOKEEPER_QUORUM));
+ return ZK_WRAPPERS.get(ZooKeeperWrapper.getZookeeperClusterKey(conf));
}
/**
diff --git a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
index 4c2998a..e8e35a7 100644
--- a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
+++ b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -151,6 +153,9 @@ public class HbaseObjectWritable implements Writable, Configurable {
addToMap(FirstKeyOnlyFilter.class, code++);
addToMap(Delete [].class, code++);
+ addToMap(HLog.Entry.class, code++);
+ addToMap(HLog.Entry[].class, code++);
+ addToMap(HLogKey.class, code++);
}
private Class> declaredClass;
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 68670c1..1853577 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1177,7 +1177,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
if (writeToWAL) {
- this.log.append(regionInfo.getRegionName(),
+ this.log.append(regionInfo,
regionInfo.getTableDesc().getName(), kvs, now);
}
flush = isFlushSize(size);
@@ -1450,7 +1450,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
try {
if (writeToWAL) {
long now = System.currentTimeMillis();
- this.log.append(regionInfo.getRegionName(),
+ this.log.append(regionInfo,
regionInfo.getTableDesc().getName(), edits, now);
}
long size = 0;
@@ -2355,7 +2355,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
long now = System.currentTimeMillis();
List edits = new ArrayList(1);
edits.add(newKv);
- this.log.append(regionInfo.getRegionName(),
+ this.log.append(regionInfo,
regionInfo.getTableDesc().getName(), edits, now);
}
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4f605ab..3dcbd9e 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1182,7 +1182,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
* Wait on all threads to finish.
* Presumption is that all closes and stops have already been called.
*/
- void join() {
+ protected void join() {
Threads.shutdown(this.majorCompactionChecker);
Threads.shutdown(this.workerThread);
Threads.shutdown(this.cacheFlusher);
@@ -2395,6 +2395,14 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
/**
+ * Interval at which threads should run
+ * @return the interval
+ */
+ public int getThreadWakeFrequency() {
+ return threadWakeFrequency;
+ }
+
+ /**
* @param args
*/
public static void main(String [] args) {
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index 99cf373..ddb953a 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -19,10 +19,7 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -61,6 +58,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.Writable;
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
@@ -125,6 +123,10 @@ public class HLog implements HConstants, Syncable {
Entry next(Entry reuse) throws IOException;
+ void seek(long pos) throws IOException;
+
+ long getPosition() throws IOException;
+
}
public interface Writer {
@@ -649,7 +651,7 @@ public class HLog implements HConstants, Syncable {
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
- doWrite(logKey, logEdit, logKey.getWriteTime());
+ doWrite(regionInfo, logKey, logEdit, logKey.getWriteTime());
this.unflushedEntries.incrementAndGet();
this.numEntries.incrementAndGet();
}
@@ -677,15 +679,16 @@ public class HLog implements HConstants, Syncable {
* synchronized prevents appends during the completion of a cache flush or for
* the duration of a log roll.
*
- * @param regionName
+ * @param info
* @param tableName
* @param edits
* @param now
* @throws IOException
*/
- public void append(byte [] regionName, byte [] tableName, List edits,
+ public void append(HRegionInfo info, byte [] tableName, List edits,
final long now)
throws IOException {
+ byte[] regionName = info.getRegionName();
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
@@ -700,7 +703,7 @@ public class HLog implements HConstants, Syncable {
int counter = 0;
for (KeyValue kv: edits) {
HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now);
- doWrite(logKey, kv, now);
+ doWrite(info, logKey, kv, now);
this.numEntries.incrementAndGet();
}
@@ -847,7 +850,7 @@ public class HLog implements HConstants, Syncable {
}
}
- private void doWrite(HLogKey logKey, KeyValue logEdit, final long now)
+ protected void doWrite(HRegionInfo info, HLogKey logKey, KeyValue logEdit, final long now)
throws IOException {
if (!this.enabled) {
return;
@@ -1243,7 +1246,7 @@ public class HLog implements HConstants, Syncable {
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
*/
- public static class Entry {
+ public static class Entry implements Writable {
private KeyValue edit;
private HLogKey key;
@@ -1281,6 +1284,18 @@ public class HLog implements HConstants, Syncable {
public String toString() {
return this.key + "=" + this.edit;
}
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ this.key.write(dataOutput);
+ this.edit.write(dataOutput);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ this.key.readFields(dataInput);
+ this.edit.readFields(dataInput);
+ }
}
/**
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index 99a09de..b27f432 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -107,4 +107,14 @@ public class SequenceFileLogReader implements HLog.Reader {
return null;
}
+ @Override
+ public void seek(long pos) throws IOException {
+ reader.seek(pos);
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return reader.getPosition();
+ }
+
}
diff --git a/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
index 5b1170f..d9ac509 100644
--- a/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ b/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -67,7 +67,8 @@ public class SequenceFileLogWriter implements HLog.Writer {
public void sync() throws IOException {
this.writer.sync();
if (this.writer_out != null) {
- this.writer_out.sync();
+ //this.writer_out.sync();
+ this.writer_out.hflush();
}
}
diff --git a/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java b/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
index 57de37b..e199cb4 100644
--- a/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
+++ b/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
@@ -26,9 +26,7 @@ import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
@@ -64,10 +62,10 @@ public class ZooKeeperWrapper implements HConstants {
private final ZooKeeper zooKeeper;
- private final String parentZNode;
- private final String rootRegionZNode;
- private final String rsZNode;
- private final String masterElectionZNode;
+ public final String parentZNode;
+ public final String rootRegionZNode;
+ public final String rsZNode;
+ public final String masterElectionZNode;
public final String clusterStateZNode;
/**
@@ -93,7 +91,8 @@ public class ZooKeeperWrapper implements HConstants {
throw new IOException(e);
}
- parentZNode = conf.get("zookeeper.znode.parent", "/hbase");
+ parentZNode = conf.get(ZOOKEEPER_ZNODE_PARENT,
+ DEFAULT_ZOOKEEPER_ZNODE_PARENT);
String rootServerZNodeName = conf.get("zookeeper.znode.rootserver",
"root-region-server");
@@ -579,18 +578,7 @@ public class ZooKeeperWrapper implements HConstants {
* @return A list of server addresses
*/
public List scanRSDirectory() {
- List addresses = new ArrayList();
- try {
- List nodes = zooKeeper.getChildren(rsZNode, false);
- for (String node : nodes) {
- addresses.add(readAddress(rsZNode + ZNODE_PATH_SEPARATOR + node, null));
- }
- } catch (KeeperException e) {
- LOG.warn("Failed to read " + rsZNode + " znode in ZooKeeper: " + e);
- } catch (InterruptedException e) {
- LOG.warn("Failed to read " + rsZNode + " znode in ZooKeeper: " + e);
- }
- return addresses;
+ return scanAddressDirectory(rsZNode, null);
}
/**
@@ -636,7 +624,7 @@ public class ZooKeeperWrapper implements HConstants {
}
}
- private String getZNode(String parentZNode, String znodeName) {
+ public String getZNode(String parentZNode, String znodeName) {
return znodeName.charAt(0) == ZNODE_PATH_SEPARATOR ?
znodeName : joinPath(parentZNode, znodeName);
}
@@ -652,6 +640,83 @@ public class ZooKeeperWrapper implements HConstants {
public String getMasterElectionZNode() {
return masterElectionZNode;
}
+
+ /**
+ * Scan a directory of address data.
+ * @param znode The parent node
+ * @param watcher The watcher to put on the found znodes, if not null
+ * @return The directory contents
+ */
+ public List scanAddressDirectory(String znode,
+ Watcher watcher) {
+ List list = new ArrayList();
+ List nodes = this.listZnodes(znode, watcher);
+ if(nodes == null) {
+ return list;
+ }
+ for (String node : nodes) {
+ String path = joinPath(znode, node);
+ list.add(readAddress(path, watcher));
+ }
+ return list;
+ }
+
+ public List listZnodes(String znode,
+ Watcher watcher) {
+ List nodes = null;
+ try {
+ if (checkExistenceOf(znode)) {
+ nodes = zooKeeper.getChildren(znode, watcher);
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
+ }
+ return nodes;
+ }
+
+ public String getData(String parentZNode, String znode) {
+ return getDataAndWatch(parentZNode, znode, null);
+ }
+
+ public String getDataAndWatch(String parentZNode,
+ String znode, Watcher watcher) {
+ String data = null;
+ try {
+ String path = joinPath(parentZNode, znode);
+ if (checkExistenceOf(path)) {
+ data = Bytes.toString(zooKeeper.getData(path, watcher, null));
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
+ }
+ return data;
+ }
+
+ public void writeZNode(String parentPath, String child, String strData)
+ throws InterruptedException, KeeperException {
+ String path = joinPath(parentPath, child);
+ if (!ensureExists(parentPath)) {
+ LOG.error("unable to ensure parent exists: " + parentPath);
+ }
+ byte[] data = Bytes.toBytes(strData);
+ try {
+ this.zooKeeper.create(path, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOG.debug("Created " + path);
+ } catch (KeeperException.NodeExistsException ex) {
+ this.zooKeeper.setData(path, data, -1);
+ LOG.debug("Updated " + path);
+ }
+ }
+
+ public static String getZookeeperClusterKey(Configuration conf) {
+ return conf.get(ZOOKEEPER_QUORUM)+":"+
+ conf.get(ZOOKEEPER_ZNODE_PARENT);
+ }
}
diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
index ad509cb..608d822 100644
--- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
+++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
@@ -404,7 +404,9 @@ public abstract class HBaseTestCase extends TestCase {
/**
* A class that makes a {@link Incommon} out of a {@link HRegion}
*/
- public static class HRegionIncommon implements Incommon, FlushCache {
+ public static class
+
+ HRegionIncommon implements Incommon, FlushCache {
final HRegion region;
/**
diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
index d3107eb..1866220 100644
--- a/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -62,14 +62,22 @@ import org.apache.zookeeper.ZooKeeper;
public class HBaseTestingUtility {
private final Log LOG = LogFactory.getLog(getClass());
-
- private final Configuration conf = HBaseConfiguration.create();
+
+ private final Configuration conf;
private MiniZooKeeperCluster zkCluster = null;
private MiniDFSCluster dfsCluster = null;
private MiniHBaseCluster hbaseCluster = null;
private MiniMRCluster mrCluster = null;
private File clusterTestBuildDir = null;
private HBaseAdmin hbaseAdmin = null;
+
+ public HBaseTestingUtility() {
+ this(HBaseConfiguration.create());
+ }
+
+ public HBaseTestingUtility(Configuration conf) {
+ this.conf = conf;
+ }
/** System property key to get test directory value.
*/
@@ -106,6 +114,15 @@ public class HBaseTestingUtility {
startMiniCluster(1);
}
+ public void startMiniZKCluster() throws Exception {
+ // Note that this is done before we create the MiniHBaseCluster because we
+ // need to edit the config to add the ZooKeeper servers.
+ this.zkCluster = new MiniZooKeeperCluster();
+ int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
+ this.conf.set("hbase.zookeeper.property.clientPort",
+ Integer.toString(clientPort));
+ }
+
/**
* Start up a minicluster of hbase, optinally dfs, and zookeeper.
* Modifies Configuration. Homes the cluster data directory under a random
@@ -156,12 +173,10 @@ public class HBaseTestingUtility {
this.conf.set("fs.defaultFS", fs.getUri().toString());
this.dfsCluster.waitClusterUp();
- // Note that this is done before we create the MiniHBaseCluster because we
- // need to edit the config to add the ZooKeeper servers.
- this.zkCluster = new MiniZooKeeperCluster();
- int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
- this.conf.set("hbase.zookeeper.property.clientPort",
- Integer.toString(clientPort));
+ // It could be created before the cluster
+ if(this.zkCluster == null) {
+ startMiniZKCluster();
+ }
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory());
@@ -305,6 +320,21 @@ public class HBaseTestingUtility {
}
/**
+ * Provide an existing table name to truncate
+ * @param tableName existing table
+ * @return HTable to that new table
+ * @throws IOException
+ */
+ public HTable truncateTable(byte [] tableName) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(getConfiguration());
+ HTableDescriptor desc = admin.getTableDescriptor(tableName);
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ admin.createTable(desc);
+ return new HTable(getConfiguration(), tableName);
+ }
+
+ /**
* Load table with rows from 'aaa' to 'zzz'.
* @param t Table
* @param f Family
@@ -575,4 +605,16 @@ public class HBaseTestingUtility {
HRegionLocation hrl = table.getRegionLocation(row);
closeRegion(hrl.getRegionInfo().getRegionName());
}
+
+ public MiniZooKeeperCluster getZkCluster() {
+ return zkCluster;
+ }
+
+ public void setZkCluster(MiniZooKeeperCluster zkCluster) {
+ this.zkCluster = zkCluster;
+ }
+
+ public MiniDFSCluster getDFSCluster() {
+ return dfsCluster;
+ }
}
\ No newline at end of file
diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java b/src/test/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
index 7803f1e..a192051 100644
--- a/src/test/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
+++ b/src/test/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
@@ -115,7 +115,7 @@ public class TestStoreReconstruction {
byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
- log.append(regionName, tableName, edit,
+ log.append(info, tableName, edit,
System.currentTimeMillis());
edit.clear();
}
@@ -126,7 +126,7 @@ public class TestStoreReconstruction {
// Add an edit to another family, should be skipped.
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
System.currentTimeMillis(), rowName));
- log.append(regionName, tableName, edit,
+ log.append(info, tableName, edit,
System.currentTimeMillis());
log.sync();
diff --git a/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index 21f5ed7..e96370e 100644
--- a/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -28,11 +28,7 @@ import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestCase;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -43,8 +39,6 @@ public class TestHLog extends HBaseTestCase implements HConstants {
@Override
public void setUp() throws Exception {
- // Enable append for these tests.
- this.conf.setBoolean("dfs.support.append", true);
// Make block sizes small.
this.conf.setInt("dfs.blocksize", 1024 * 1024);
this.conf.setInt("hbase.regionserver.flushlogentries", 1);
@@ -74,14 +68,21 @@ public class TestHLog extends HBaseTestCase implements HConstants {
* @throws IOException
*/
public void testSplit() throws IOException {
+
final byte [] tableName = Bytes.toBytes(getName());
final byte [] rowName = tableName;
HLog log = new HLog(this.fs, this.dir, this.conf, null);
final int howmany = 3;
+ HRegionInfo[] infos = new HRegionInfo[3];
+ for(int i = 0; i < howmany; i++) {
+ infos[i] = new HRegionInfo(new HTableDescriptor(tableName),
+ Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
+ }
// Add edits for three regions.
try {
for (int ii = 0; ii < howmany; ii++) {
for (int i = 0; i < howmany; i++) {
+
for (int j = 0; j < howmany; j++) {
List edit = new ArrayList();
byte [] family = Bytes.toBytes("column");
@@ -90,10 +91,11 @@ public class TestHLog extends HBaseTestCase implements HConstants {
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
System.out.println("Region " + i + ": " + edit);
- log.append(Bytes.toBytes("" + i), tableName, edit,
+ log.append(infos[i], tableName, edit,
System.currentTimeMillis());
}
}
+ log.hflush();
log.rollWriter();
}
List splits =
@@ -128,10 +130,14 @@ public class TestHLog extends HBaseTestCase implements HConstants {
Path subdir = new Path(this.dir, "hlogdir");
HLog wal = new HLog(this.fs, subdir, this.conf, null);
final int total = 20;
+
+ HRegionInfo info = new HRegionInfo(new HTableDescriptor(bytes),
+ null,null, false);
+
for (int i = 0; i < total; i++) {
List kvs = new ArrayList();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
- wal.append(bytes, bytes, kvs, System.currentTimeMillis());
+ wal.append(info, bytes, kvs, System.currentTimeMillis());
}
// Now call sync and try reading. Opening a Reader before you sync just
// gives you EOFE.
@@ -149,7 +155,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
for (int i = 0; i < total; i++) {
List kvs = new ArrayList();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
- wal.append(bytes, bytes, kvs, System.currentTimeMillis());
+ wal.append(info, bytes, kvs, System.currentTimeMillis());
}
reader = HLog.getReader(fs, walPath, conf);
count = 0;
@@ -168,7 +174,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
for (int i = 0; i < total; i++) {
List kvs = new ArrayList();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value));
- wal.append(bytes, bytes, kvs, System.currentTimeMillis());
+ wal.append(info, bytes, kvs, System.currentTimeMillis());
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
@@ -248,7 +254,6 @@ public class TestHLog extends HBaseTestCase implements HConstants {
*/
public void testEditAdd() throws IOException {
final int COL_COUNT = 10;
- final byte [] regionName = Bytes.toBytes("regionname");
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
HLog.Reader reader = null;
@@ -263,7 +268,10 @@ public class TestHLog extends HBaseTestCase implements HConstants {
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') }));
}
- log.append(regionName, tableName, cols, System.currentTimeMillis());
+ HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName),
+ row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
+ final byte [] regionName = info.getRegionName();
+ log.append(info, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();