serverTimestampMap = table.readRegionServerLastLogRollResult();
+ String host = rss.getServerName().getHostname();
+ String sts = serverTimestampMap.get(host);
+ if( sts != null && Long.parseLong(sts) > filenum){
+ LOG.warn("Won't update server's last roll log result: current="+sts + " new=" + filenum);
+ return null;
+ }
+ table.writeRegionServerLastLogRollResult(host, Long.toString(filenum));
+ //TODO: potential leak of HBase connection
+ //BackupSystemTable.close();
+ return null;
+ }
+
+ }
+
+ private void rolllog() throws ForeignException {
+
+ monitor.rethrowException();
+
+ taskManager.submitTask(new RSRollLogTask());
+ monitor.rethrowException();
+
+ // wait for everything to complete.
+ taskManager.waitForOutstandingTasks();
+ monitor.rethrowException();
+
+ }
+
+ @Override
+ public void acquireBarrier() throws ForeignException {
+ // do nothing, executing in inside barrier step.
+ }
+
+ /**
+ * do a log roll.
+ * @return
+ */
+ @Override
+ public byte[] insideBarrier() throws ForeignException {
+ rolllog();
+ // FIXME
+ return null;
+ }
+
+ /**
+ * Cancel threads if they haven't finished.
+ */
+ @Override
+ public void cleanup(Exception e) {
+ taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
+ }
+
+ /**
+ * Hooray!
+ */
+ public void releaseBarrier() {
+ // NO OP
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/RegionServerBackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/RegionServerBackupManager.java
new file mode 100644
index 0000000..26abe0e
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/RegionServerBackupManager.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * This manager class handles the work dealing with backup for a {@link HRegionServer}.
+ *
+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
+ * responsible by this region server. If any failures occur with the subprocedure, the manager's
+ * procedure member notifies the procedure coordinator to abort all others.
+ *
+ * On startup, requires {@link #start()} to be called.
+ *
+ * On shutdown, requires {@link #org.apache.hadoop.hbase.procedure.ProcedureMember.close()} to be
+ * called
+ */
+public class RegionServerBackupManager extends RegionServerProcedureManager {
+
+ private static final Log LOG = LogFactory.getLog(RegionServerBackupManager.class);
+
+ /** Conf key for number of request threads to start backup on regionservers */
+ public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
+ /** # of threads for backup work on the rs. */
+ public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
+
+ public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
+ public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+ /** Conf key for millis between checks to see if backup work completed or if there are errors */
+ public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
+ /** Default amount of time to check for errors while regions finish backup work */
+ private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+ private RegionServerServices rss;
+ private ProcedureMemberRpcs memberRpcs;
+ private ProcedureMember member;
+
+ /**
+ * Create a default backup procedure manager
+ */
+ public RegionServerBackupManager() {
+ }
+
+ /**
+ * Start accepting backup procedure requests.
+ */
+ @Override
+ public void start() {
+ this.memberRpcs.start(rss.getServerName().toString(), member);
+ LOG.info("Started region server backup manager.");
+ }
+
+ /**
+ * Close this and all running backup procedure tasks
+ * @param force forcefully stop all running tasks
+ * @throws IOException
+ */
+ @Override
+ public void stop(boolean force) throws IOException {
+ String mode = force ? "abruptly" : "gracefully";
+ LOG.info("Stopping RegionServerBackupManager " + mode + ".");
+
+ try {
+ this.member.close();
+ } finally {
+ this.memberRpcs.close();
+ }
+ }
+
+ /**
+ * If in a running state, creates the specified subprocedure for handling a backup procedure.
+ * @return Subprocedure to submit to the ProcedureMemeber.
+ */
+ public Subprocedure buildSubprocedure() {
+
+ // don't run a backup if the parent is stop(ping)
+ if (rss.isStopping() || rss.isStopped()) {
+ throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
+ + ", because stopping/stopped!");
+ }
+
+ LOG.info("Attempting to run a roll log procedure for backup.");
+ ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+ Configuration conf = rss.getConfiguration();
+ long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ long wakeMillis =
+ conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
+
+ BackupSubprocedurePool taskManager =
+ new BackupSubprocedurePool(rss.getServerName().toString(), conf);
+ return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
+ taskManager);
+
+ }
+
+ /**
+ * Build the actual backup procedure runner that will do all the 'hard' work
+ */
+ public class BackupSubprocedureBuilder implements SubprocedureFactory {
+
+ @Override
+ public Subprocedure buildSubprocedure(String name, byte[] data) {
+ return RegionServerBackupManager.this.buildSubprocedure();
+ }
+ }
+
+ @Override
+ public void initialize(RegionServerServices rss) throws IOException {
+ this.rss = rss;
+ //ZooKeeperWatcher zkw = rss.getZooKeeper();
+ BaseCoordinatedStateManager coordManager =
+ (BaseCoordinatedStateManager)
+ CoordinatedStateManagerFactory.getCoordinatedStateManager(rss.getConfiguration());
+ coordManager.initialize(rss);
+ this.memberRpcs =
+ coordManager.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+ //new ZKProcedureMemberRpcs(zkw, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+
+ // read in the backup handler configuration properties
+ Configuration conf = rss.getConfiguration();
+ long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
+ // create the actual cohort member
+ ThreadPoolExecutor pool =
+ ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
+ this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return "backup-proc";
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index ae36f08..ef10010 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Server;
@@ -55,4 +59,17 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
+ /**
+ * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
+ */
+ public abstract ProcedureCoordinatorRpcs
+ getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
+
+ /**
+ * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc}
+ */
+ public abstract ProcedureMemberRpcs
+ getProcedureMemberRpcs(String procType) throws IOException;
+
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 3e89be7..3b48e11 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,9 +17,15 @@
*/
package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@@ -54,4 +60,15 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
public SplitLogManagerCoordination getSplitLogManagerCoordination() {
return splitLogManagerCoordination;
}
+
+ @Override
+ public ProcedureCoordinatorRpcs
+ getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException {
+ return new ZKProcedureCoordinatorRpcs( watcher, procType, coordNode);
+ }
+
+ @Override
+ public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException {
+ return new ZKProcedureMemberRpcs( watcher, procType);
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index c067fc3..23b5851 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -85,6 +85,9 @@ public class WALPlayer extends Configured implements Tool {
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+ public WALPlayer(){
+ }
+
protected WALPlayer(final Configuration c) {
super(c);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index ff5cb64..d2c76a3 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.backup.BackupManager;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
@@ -359,7 +360,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
Replication.decorateMasterConfiguration(this.conf);
-
+ BackupManager.decorateMasterConfiguration(this.conf);
+
// Hack! Maps DFSClient => Master for logs. HDFS made this
// config param for task trackers, but we can piggyback off of it.
if (this.conf.get("mapreduce.task.attempt.id") == null) {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
index 95c3ffe..b6e11ea 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
@@ -37,7 +37,7 @@ public abstract class RegionServerProcedureManager extends ProcedureManager {
* @param rss Region Server service interface
* @throws KeeperException
*/
- public abstract void initialize(RegionServerServices rss) throws KeeperException;
+ public abstract void initialize(RegionServerServices rss) throws IOException;
/**
* Start accepting procedure requests.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
index 0f4ea64..ae111c5 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
@@ -39,7 +39,7 @@ public class RegionServerProcedureManagerHost extends
private static final Log LOG = LogFactory
.getLog(RegionServerProcedureManagerHost.class);
- public void initialize(RegionServerServices rss) throws KeeperException {
+ public void initialize(RegionServerServices rss) throws IOException {
for (RegionServerProcedureManager proc : procedures) {
LOG.debug("Procedure " + proc.getProcedureSignature() + " is initializing");
proc.initialize(rss);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
index fedd595..078943b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
@@ -54,7 +54,7 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
* @throws KeeperException if an unexpected zk error occurs
*/
public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
- String procedureClass, String coordName) throws KeeperException {
+ String procedureClass, String coordName) throws IOException {
this.watcher = watcher;
this.procedureType = procedureClass;
this.coordName = coordName;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
index 2e03a60..fff75a7 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
@@ -68,49 +68,54 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
* @throws KeeperException if we can't reach zookeeper
*/
public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
- throws KeeperException {
- this.zkController = new ZKProcedureUtil(watcher, procType) {
- @Override
- public void nodeCreated(String path) {
- if (!isInProcedurePath(path)) {
- return;
- }
+ throws IOException {
+ try {
+ this.zkController = new ZKProcedureUtil(watcher, procType) {
+ @Override
+ public void nodeCreated(String path) {
+ if (!isInProcedurePath(path)) {
+ return;
+ }
- LOG.info("Received created event:" + path);
- // if it is a simple start/end/abort then we just rewatch the node
- if (isAcquiredNode(path)) {
- waitForNewProcedures();
- return;
- } else if (isAbortNode(path)) {
- watchForAbortedProcedures();
- return;
+ LOG.info("Received created event:" + path);
+ // if it is a simple start/end/abort then we just rewatch the node
+ if (isAcquiredNode(path)) {
+ waitForNewProcedures();
+ return;
+ } else if (isAbortNode(path)) {
+ watchForAbortedProcedures();
+ return;
+ }
+ String parent = ZKUtil.getParent(path);
+ // if its the end barrier, the procedure can be completed
+ if (isReachedNode(parent)) {
+ receivedReachedGlobalBarrier(path);
+ return;
+ } else if (isAbortNode(parent)) {
+ abort(path);
+ return;
+ } else if (isAcquiredNode(parent)) {
+ startNewSubprocedure(path);
+ } else {
+ LOG.debug("Ignoring created notification for node:" + path);
+ }
}
- String parent = ZKUtil.getParent(path);
- // if its the end barrier, the procedure can be completed
- if (isReachedNode(parent)) {
- receivedReachedGlobalBarrier(path);
- return;
- } else if (isAbortNode(parent)) {
- abort(path);
- return;
- } else if (isAcquiredNode(parent)) {
- startNewSubprocedure(path);
- } else {
- LOG.debug("Ignoring created notification for node:" + path);
- }
- }
- @Override
- public void nodeChildrenChanged(String path) {
- if (path.equals(this.acquiredZnode)) {
- LOG.info("Received procedure start children changed event: " + path);
- waitForNewProcedures();
- } else if (path.equals(this.abortZnode)) {
- LOG.info("Received procedure abort children changed event: " + path);
- watchForAbortedProcedures();
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.equals(this.acquiredZnode)) {
+ LOG.info("Received procedure start children changed event: " + path);
+ waitForNewProcedures();
+ } else if (path.equals(this.abortZnode)) {
+ LOG.info("Received procedure abort children changed event: " + path);
+ watchForAbortedProcedures();
+ }
}
- }
- };
+ };
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ throw new IOException(e);
+ }
}
public ZKProcedureUtil getZkController() {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index a441a6b..e1711ec 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -316,7 +316,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
* @throws KeeperException if the zookeeper cannot be reached
*/
@Override
- public void initialize(RegionServerServices rss) throws KeeperException {
+ public void initialize(RegionServerServices rss) throws IOException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7666057..b81e0a6 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -791,8 +791,8 @@ public class HRegionServer extends HasThread implements
rspmHost = new RegionServerProcedureManagerHost();
rspmHost.loadProcedures(conf);
rspmHost.initialize(this);
- } catch (KeeperException e) {
- this.abort("Failed to reach zk cluster when creating procedure handler.", e);
+ } catch (IOException e) {
+ this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
}
// register watcher for recovering regions
this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index f04feb1..259abee 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -389,7 +389,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
* @throws KeeperException if the zookeeper cluster cannot be reached
*/
@Override
- public void initialize(RegionServerServices rss) throws KeeperException {
+ public void initialize(RegionServerServices rss) throws IOException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index b118ecd..adbb7cd 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -922,6 +922,16 @@ public class FSHLog implements WAL {
return computeFilename(this.filenum.get());
}
+
+ /**
+ * To support old API compatibility
+ * @return current file number (timestamp)
+ */
+ public long getFilenum()
+ {
+ return filenum.get();
+ }
+
@Override
public String toString() {
return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotCopy.java hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotCopy.java
new file mode 100644
index 0000000..0360000
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotCopy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.hadoop.hbase.backup.BackupHandler;
+
+/* this class will be extended in future jira to support progress report */
+public class SnapshotCopy extends ExportSnapshot {
+ private BackupHandler backupHandler;
+ private String table;
+
+ public SnapshotCopy(BackupHandler backupHandler, String table) {
+ super();
+ this.backupHandler = backupHandler;
+ this.table = table;
+ }
+
+ public BackupHandler getBackupHandler() {
+ return this.backupHandler;
+ }
+
+ public String getTable() {
+ return this.table;
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index b41bbfb..4f3db1e 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -192,13 +192,18 @@ public class DefaultWALProvider implements WALProvider {
@VisibleForTesting
public static long extractFileNumFromWAL(final WAL wal) {
final Path walName = ((FSHLog)wal).getCurrentFileName();
+ return extractFileNumFromWAL(walName);
+ }
+
+ @VisibleForTesting
+ public static long extractFileNumFromWAL(final Path walName) {
if (walName == null) {
throw new IllegalArgumentException("The WAL path couldn't be null");
}
final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2:1)]);
}
-
+
/**
* Pattern used to validate a WAL file name
* see {@link #validateWALFilename(String)} for description.
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
new file mode 100644
index 0000000..efc42f9
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupHandler.BACKUPSTATUS;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * This class is only a base for other integration-level backup tests.
+ * Do not add tests here.
+ * TestBackupSmallTests is where tests that don't require bring machines up/down should go
+ * All other tests should have their own classes and extend this one
+ */
+public class TestBackupBase {
+
+ private static final Log LOG = LogFactory.getLog(TestBackupBase.class);
+
+ protected static Configuration conf1;
+ protected static Configuration conf2;
+
+ protected static HBaseTestingUtility TEST_UTIL;
+ protected static HBaseTestingUtility TEST_UTIL2;
+
+ protected static TableName table1;
+ protected static TableName table2;
+ protected static TableName table3;
+ protected static TableName table4;
+
+ protected static String table1_restore = "table1_restore";
+ protected static String table2_restore = "table2_restore";
+ protected static String table3_restore = "table3_restore";
+ protected static String table4_restore = "table4_restore";
+
+ protected static final int NB_ROWS_IN_BATCH = 100;
+ protected static final byte[] qualName = Bytes.toBytes("q1");
+ protected static final byte[] famName = Bytes.toBytes("f");
+
+ protected static String BACKUP_ROOT_DIR = "/backupUT";
+ protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT";
+
+ protected static final String BACKUP_ZNODE = "/backup/hbase";
+ protected static final String BACKUP_SUCCEED_NODE = "complete";
+ protected static final String BACKUP_FAILED_NODE = "failed";
+
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().set("hbase.procedure.regionserver.classes",
+ "org.apache.hadoop.hbase.backup.regionserver.RegionServerBackupManager");
+ TEST_UTIL.getConfiguration().set("hbase.procedure.master.classes",
+ "org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager");
+ TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ TEST_UTIL.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
+
+ conf1 = TEST_UTIL.getConfiguration();
+ conf2 = HBaseConfiguration.create(conf1);
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ TEST_UTIL2 = new HBaseTestingUtility(conf2);
+ TEST_UTIL2.setZkCluster(miniZK);
+ TEST_UTIL.startMiniCluster();
+ TEST_UTIL2.startMiniCluster();
+ conf1 = TEST_UTIL.getConfiguration();
+
+ TEST_UTIL.startMiniMapReduceCluster();
+ BACKUP_ROOT_DIR = TEST_UTIL.getConfiguration().get("fs.defaultFS") + "/backupUT";
+ LOG.info("ROOTDIR " + BACKUP_ROOT_DIR);
+ BACKUP_REMOTE_ROOT_DIR = TEST_UTIL2.getConfiguration().get("fs.defaultFS") + "/backupUT";
+ LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
+
+ BackupClient.setConf(conf1);
+ RestoreClient.setConf(conf1);
+ createTables();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
+ SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+ //zkw1.close();
+ TEST_UTIL2.shutdownMiniCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ }
+
+ protected static void loadTable(HTable table) throws Exception {
+
+ Put p; // 100 + 1 row to t1_syncup
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p = new Put(Bytes.toBytes("row" + i));
+ p.add(famName, qualName, Bytes.toBytes("val" + i));
+ table.put(p);
+ }
+ }
+
+ protected static void createTables() throws Exception {
+
+ long tid = System.currentTimeMillis();
+ table1 = TableName.valueOf("test-" + tid);
+
+ HBaseAdmin ha = TEST_UTIL.getHBaseAdmin();
+ HTableDescriptor desc = new HTableDescriptor(table1);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ desc.addFamily(fam);
+ ha.createTable(desc);
+ Connection conn = ConnectionFactory.createConnection(conf1);
+
+ HTable table = (HTable) conn.getTable(table1);
+ loadTable(table);
+ table.close();
+
+ table2 = TableName.valueOf("test-" + tid + 1);
+ desc = new HTableDescriptor(table2);
+ desc.addFamily(fam);
+ ha.createTable(desc);
+ table = (HTable) conn.getTable(table2);
+ loadTable(table);
+ table.close();
+
+ table3 = TableName.valueOf("test-" + tid + 2);
+ table = TEST_UTIL.createTable(table3.getName(), famName);
+ table.close();
+
+ table4 = TableName.valueOf("test-" + tid + 3);
+ table = TEST_UTIL.createTable(table4.getName(), famName);
+ table.close();
+ ha.close();
+ conn.close();
+ }
+
+ protected boolean checkSucceeded(String backupId) throws IOException
+ {
+ BackupContext status = getBackupContext(backupId);
+ if(status == null) return false;
+ return status.getFlag() == BACKUPSTATUS.COMPLETE;
+ }
+
+ protected boolean checkFailed(String backupId) throws IOException
+ {
+ BackupContext status = getBackupContext(backupId);
+ if(status == null) return false;
+ return status.getFlag() == BACKUPSTATUS.FAILED;
+ }
+
+ private BackupContext getBackupContext(String backupId) throws IOException
+ {
+ Configuration conf = BackupClient.getConf();
+ BackupSystemTable table = BackupSystemTable.getTable(conf);
+ BackupContext status = table.readBackupStatus(backupId);
+ return status;
+ }
+}
+
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java
new file mode 100644
index 0000000..7f5031e
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestBackupBoundaryTests extends TestBackupBase {
+
+ private static final Log LOG = LogFactory.getLog(TestBackupBoundaryTests.class);
+
+ /**
+ * Verify that full backup is created on a single empty table correctly.
+ * @throws Exception
+ */
+ @Test
+ public void testFullBackupSingleEmpty() throws Exception {
+
+ LOG.info("create full backup image on single table");
+
+ String backupId =
+ BackupClient.create("full", BACKUP_ROOT_DIR, table3.getNameAsString(), null);
+ LOG.info("Finished Backup");
+ assertTrue(checkSucceeded(backupId));
+
+ }
+
+ /**
+ * Verify that full backup is created on multiple empty tables correctly.
+ * @throws Exception
+ */
+ @Test
+ public void testFullBackupMultipleEmpty() throws Exception {
+ LOG.info("create full backup image on mulitple empty tables");
+ String tableset =
+ table3.getNameAsString() + BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND
+ + table4.getNameAsString();
+ String backupId = BackupClient.create("full", BACKUP_ROOT_DIR, tableset, null);
+ assertTrue(checkSucceeded(backupId));
+
+ }
+
+ /**
+ * Verify that full backup fails on a single table that does not exist.
+ * @throws Exception
+ */
+ @Test(expected = RuntimeException.class)
+ public void testFullBackupSingleDNE() throws Exception {
+
+ LOG.info("test full backup fails on a single table that does not exist");
+ BackupClient.create("full", BACKUP_ROOT_DIR, "tabledne", null);
+ }
+
+ /**
+ * Verify that full backup fails on multiple tables that do not exist.
+ * @throws Exception
+ */
+ @Test(expected = RuntimeException.class)
+ public void testFullBackupMultipleDNE() throws Exception {
+
+ LOG.info("test full backup fails on multiple tables that do not exist");
+ BackupClient.create("full", BACKUP_ROOT_DIR, "table1dne,table2dne", null);
+ }
+
+ /**
+ * Verify that full backup fails on tableset containing real and fake tables.
+ * @throws Exception
+ */
+ @Test(expected = RuntimeException.class)
+ public void testFullBackupMixExistAndDNE() throws Exception {
+ LOG.info("create full backup fails on tableset containing real and fake table");
+ String tableset =
+ table1.getNameAsString() + BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND
+ + "tabledne";
+ BackupClient.create("full", BACKUP_ROOT_DIR, tableset, null);
+ }
+
+}
\ No newline at end of file
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java
new file mode 100644
index 0000000..158479b
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.master.BackupLogCleaner;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Iterables;
+
+@Category(LargeTests.class)
+public class TestBackupLogCleaner extends TestBackupBase {
+ private static final Log LOG = LogFactory.getLog(TestBackupLogCleaner.class);
+
+ // implements all test cases in 1 test since incremental full backup/
+ // incremental backup has dependencies
+ @Test
+ public void testBackupLogCleaner() throws Exception {
+
+ // #1 - create full backup for all tables
+ LOG.info("create full backup image for all tables");
+ String tablesetFull =
+ table1.getNameAsString() + BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND
+ + table2.getNameAsString() + BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND
+ + table3.getNameAsString() + BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND
+ + table4.getNameAsString();
+
+ BackupSystemTable systemTable = BackupSystemTable.getTable(TEST_UTIL.getConfiguration());
+ // Verify that we have no backup sessions yet
+ assertFalse(systemTable.hasBackupSessions());
+
+ List walFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
+ List swalFiles = convert(walFiles);
+ BackupLogCleaner cleaner = new BackupLogCleaner();
+ cleaner.setConf(TEST_UTIL.getConfiguration());
+
+ Iterable deletable = cleaner.getDeletableFiles(walFiles);
+ // We can delete all files because we do not have yet recorded backup sessions
+ assertTrue(Iterables.size(deletable) == walFiles.size());
+
+ systemTable.addWALFiles(swalFiles, "backup");
+ String backupIdFull = BackupClient.create("full", BACKUP_ROOT_DIR, tablesetFull, null);
+ assertTrue(checkSucceeded(backupIdFull));
+ // Check one more time
+ deletable = cleaner.getDeletableFiles(walFiles);
+ // We can delete wal files because they were saved into hbase:backup table
+ int size = Iterables.size(deletable);
+ assertTrue(size == walFiles.size());
+
+ List newWalFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
+ LOG.debug("WAL list after full backup");
+ convert(newWalFiles);
+
+ // New list of wal files is greater than the previous one,
+ // because new wal per RS have been opened after full backup
+ assertTrue(walFiles.size() < newWalFiles.size());
+ // TODO : verify that result files are not walFiles collection
+ Connection conn = ConnectionFactory.createConnection(conf1);
+ // #2 - insert some data to table
+ HTable t1 = (HTable) conn.getTable(table1);
+ Put p1;
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p1 = new Put(Bytes.toBytes("row-t1" + i));
+ p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ t1.put(p1);
+ }
+
+ t1.close();
+
+ HTable t2 = (HTable) conn.getTable(table2);
+ Put p2;
+ for (int i = 0; i < 5; i++) {
+ p2 = new Put(Bytes.toBytes("row-t2" + i));
+ p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ t2.put(p2);
+ }
+
+ t2.close();
+
+ // #3 - incremental backup for multiple tables
+ String tablesetIncMultiple =
+ table1.getNameAsString() + BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND
+ + table2.getNameAsString() + BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND
+ + table3.getNameAsString();
+
+ String backupIdIncMultiple =
+ BackupClient.create("incremental", BACKUP_ROOT_DIR, tablesetIncMultiple, null);
+ assertTrue(checkSucceeded(backupIdIncMultiple));
+ deletable = cleaner.getDeletableFiles(newWalFiles);
+
+ assertTrue(Iterables.size(deletable) == newWalFiles.size());
+
+ conn.close();
+
+ }
+
+ private List convert(List walFiles) {
+ List result = new ArrayList